0

在针对标头断言时,我无法创建消费者端点问题。有人可以帮我吗?

我有这个文件观察器路由,它消耗文件并验证文件。如果其有效文件将更新标头并将其发送到 s3 blob。否则它将发送到错误目录。

这是我的路线:

  from("file-watch:test?events=CREATE&useFileHashing=true&antInclude=**/*.txt&recursive=true")
                .process(fileProcessor)
                .toD("direct:validateFile")
                .choice()
                .when(exchange -> exchange.getIn().getHeader("isValid").equals("valid"))
                .to("direct:updateMessageHeaders")
                .otherwise()
                .toD("direct:processErrorFiles")
                .endChoice()
                .end();

        from("direct:validateFile")
                .routeId("validateFile")
                .choice()
                .when(exchange -> Long.parseLong(exchange.getIn().getHeader("fileSize").toString()) > 0)
                .setHeader("isValid", simple("valid"))
                .otherwise()
                .setHeader("isValid", simple("invalid"))
                .endChoice()
                .end();
 final Processor fileProcessor = exchange -> {
        String fileName = exchange.getIn().getHeader("CamelFileAbsolutePath").toString();
        File gdisFile = new File(fileName);
        exchange.getIn().setHeader("fileSize", gdisFile.length());
    };

测试用例

public class RouteTest extends CamelTestSupport {

    @Override
    public RouteBuilder createRouteBuilder() throws Exception
    {
        return new Route();
    }
    @Test
    public void header_validation() {

        Map<String, Object> headers = new HashMap<>();
        headers.put("fileSize", 100);

        template.sendBodyAndHeaders("direct:validateFile", null, headers);
assertEquals("valid",consumer.receive("direct:validateFile").getIn().getHeader("isValid"));
    }
}

例外

org.apache.camel.FailedToCreateConsumerException: Failed to create Consumer for endpoint: direct://validateFile. Reason: java.lang.IllegalArgumentException: Cannot add a 2nd consumer to the same endpoint: direct://validateFile. DirectEndpoint only allows one consumer.
at org.apache.camel.support.cache.DefaultConsumerCache.acquirePollingConsumer(DefaultConsumerCache.java:107)

4

2 回答 2

1

你打电话时

template.sendBodyAndHeaders

Camel 执行即发即弃 - 它将您的数据发送到端点并且不等待结果。

你需要使用

tamplate.requestBodyAndHeaders

此方法等待数据通过所有路由并将结果返回给您。

于 2022-01-24T15:27:49.083 回答
0

使用 requestBodyAndHeader 而不是 sendBodyAndHeader 这样您就可以将响应作为返回值,然后您可以断言它。

https://camel.apache.org/manual/producertemplate.html

于 2022-01-15T01:58:33.400 回答