在针对标头断言时,我无法创建消费者端点问题。有人可以帮我吗?
我有这个文件观察器路由,它消耗文件并验证文件。如果其有效文件将更新标头并将其发送到 s3 blob。否则它将发送到错误目录。
这是我的路线:
from("file-watch:test?events=CREATE&useFileHashing=true&antInclude=**/*.txt&recursive=true")
.process(fileProcessor)
.toD("direct:validateFile")
.choice()
.when(exchange -> exchange.getIn().getHeader("isValid").equals("valid"))
.to("direct:updateMessageHeaders")
.otherwise()
.toD("direct:processErrorFiles")
.endChoice()
.end();
from("direct:validateFile")
.routeId("validateFile")
.choice()
.when(exchange -> Long.parseLong(exchange.getIn().getHeader("fileSize").toString()) > 0)
.setHeader("isValid", simple("valid"))
.otherwise()
.setHeader("isValid", simple("invalid"))
.endChoice()
.end();
final Processor fileProcessor = exchange -> {
String fileName = exchange.getIn().getHeader("CamelFileAbsolutePath").toString();
File gdisFile = new File(fileName);
exchange.getIn().setHeader("fileSize", gdisFile.length());
};
测试用例
public class RouteTest extends CamelTestSupport {
@Override
public RouteBuilder createRouteBuilder() throws Exception
{
return new Route();
}
@Test
public void header_validation() {
Map<String, Object> headers = new HashMap<>();
headers.put("fileSize", 100);
template.sendBodyAndHeaders("direct:validateFile", null, headers);
assertEquals("valid",consumer.receive("direct:validateFile").getIn().getHeader("isValid"));
}
}
例外
org.apache.camel.FailedToCreateConsumerException: Failed to create Consumer for endpoint: direct://validateFile. Reason: java.lang.IllegalArgumentException: Cannot add a 2nd consumer to the same endpoint: direct://validateFile. DirectEndpoint only allows one consumer.
at org.apache.camel.support.cache.DefaultConsumerCache.acquirePollingConsumer(DefaultConsumerCache.java:107)