0

我有以下骆驼路由,它从 TCP 套接字侦听消息并将正文发送到数据库。

from("netty:tcp://localhost:5150?sync=false&keepAlive=true")
    .transform().simple("insert into mytable (DATA) values (\"${in.body}\");") 
    .to("jdbc:mydb");

以下向第一个发送消息:

from("direct:input").to("netty:tcp://localhost:5150?sync=false&keepAlive=true");

我用这样的 JUnit 对其进行了测试:

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:applicationContext.xml")
public class OutputRoutesTest {

  @Autowired
  protected CamelContext camelContext;

  @EndpointInject(uri = "direct:input")
  private ProducerTemplate template;

  @Test
  @DirtiesContext
  public void testTCPSend() throws Exception {
    String body = "foo?";

    NotifyBuilder notify = new NotifyBuilder(camelContext).whenDone(1).create();
    try {
      template.sendBody(body);
    } finally {
      template.stop();
    }
    boolean matches = notify.matches(5, TimeUnit.SECONDS);
    assertTrue(matches);

  }
}

当我开始测试时,有以下堆栈跟踪:

12:05:36.502 [Camel (camel-1) thread #22 - NettyOrderedWorker] ERROR o.a.c.processor.DefaultErrorHandler - Failed delivery for (MessageId: ID-M249-52364-1373537133834-0-3 on ExchangeId: ID-M249-52364-1373537133834-0-4). Exhausted after delivery attempt: 3 caught: java.sql.SQLException: Data source is closed
java.sql.SQLException: Data source is closed
    at org.apache.commons.dbcp.BasicDataSource.createDataSource(BasicDataSource.java:1362) ~[commons-dbcp-1.4.jar:1.4]
    at org.apache.commons.dbcp.BasicDataSource.getConnection(BasicDataSource.java:1044) ~[commons-dbcp-1.4.jar:1.4]
    at org.apache.camel.component.jdbc.JdbcProducer.processingSqlBySettingAutoCommit(JdbcProducer.java:76) ~[camel-jdbc-2.11.0.jar:2.11.0]
    at org.apache.camel.component.jdbc.JdbcProducer.process(JdbcProducer.java:63) ~[camel-jdbc-2.11.0.jar:2.11.0]
    at org.apache.camel.util.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:61) ~[camel-core-2.11.0.jar:2.11.0]
    at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) [camel-core-2.11.0.jar:2.11.0]
    at org.apache.camel.impl.InterceptSendToEndpoint$1.process(InterceptSendToEndpoint.java:164) ~[camel-core-2.11.0.jar:2.11.0]
    at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) [camel-core-2.11.0.jar:2.11.0]
    at org.apache.camel.processor.SendProcessor$2.doInAsyncProducer(SendProcessor.java:122) ~[camel-core-2.11.0.jar:2.11.0]
    at org.apache.camel.impl.ProducerCache.doInAsyncProducer(ProducerCache.java:298) ~[camel-core-2.11.0.jar:2.11.0]
    at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:117) ~[camel-core-2.11.0.jar:2.11.0]
    at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) [camel-core-2.11.0.jar:2.11.0]
    at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99) [camel-core-2.11.0.jar:2.11.0]
    at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) [camel-core-2.11.0.jar:2.11.0]
    at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72) [camel-core-2.11.0.jar:2.11.0]
    at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) [camel-core-2.11.0.jar:2.11.0]
    at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99) [camel-core-2.11.0.jar:2.11.0]
    at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) [camel-core-2.11.0.jar:2.11.0]
    at org.apache.camel.processor.interceptor.BacklogTracerInterceptor.process(BacklogTracerInterceptor.java:84) ~[camel-core-2.11.0.jar:2.11.0]
    at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) [camel-core-2.11.0.jar:2.11.0]
    at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99) [camel-core-2.11.0.jar:2.11.0]
    at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) [camel-core-2.11.0.jar:2.11.0]
    at org.apache.camel.processor.interceptor.TraceInterceptor.process(TraceInterceptor.java:91) ~[camel-core-2.11.0.jar:2.11.0]
    at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) [camel-core-2.11.0.jar:2.11.0]
    at org.apache.camel.processor.RedeliveryErrorHandler.processErrorHandler(RedeliveryErrorHandler.java:390) [camel-core-2.11.0.jar:2.11.0]
    at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:273) [camel-core-2.11.0.jar:2.11.0]
    at org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:46) [camel-core-2.11.0.jar:2.11.0]
    at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) [camel-core-2.11.0.jar:2.11.0]
    at org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:335) [camel-core-2.11.0.jar:2.11.0]
    at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) [camel-core-2.11.0.jar:2.11.0]
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:117) [camel-core-2.11.0.jar:2.11.0]
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:80) [camel-core-2.11.0.jar:2.11.0]
    at org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:46) [camel-core-2.11.0.jar:2.11.0]
    at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) [camel-core-2.11.0.jar:2.11.0]
    at org.apache.camel.processor.UnitOfWorkProcessor.processAsync(UnitOfWorkProcessor.java:150) [camel-core-2.11.0.jar:2.11.0]
    at org.apache.camel.processor.UnitOfWorkProcessor.process(UnitOfWorkProcessor.java:117) [camel-core-2.11.0.jar:2.11.0]
    at org.apache.camel.processor.RouteInflightRepositoryProcessor.processNext(RouteInflightRepositoryProcessor.java:48) [camel-core-2.11.0.jar:2.11.0]
    at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) [camel-core-2.11.0.jar:2.11.0]
    at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) [camel-core-2.11.0.jar:2.11.0]
    at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99) [camel-core-2.11.0.jar:2.11.0]
    at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) [camel-core-2.11.0.jar:2.11.0]
    at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72) [camel-core-2.11.0.jar:2.11.0]
    at org.apache.camel.component.netty.handlers.ServerChannelHandler.processAsynchronously(ServerChannelHandler.java:118) [camel-netty-2.11.0.jar:2.11.0]
    at org.apache.camel.component.netty.handlers.ServerChannelHandler.messageReceived(ServerChannelHandler.java:102) [camel-netty-2.11.0.jar:2.11.0]
    at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) [netty-3.6.5.Final.jar:na]
    at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) [netty-3.6.5.Final.jar:na]
    at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) [netty-3.6.5.Final.jar:na]
    at org.jboss.netty.handler.execution.ChannelUpstreamEventRunnable.doRun(ChannelUpstreamEventRunnable.java:43) [netty-3.6.5.Final.jar:na]
    at org.jboss.netty.handler.execution.ChannelEventRunnable.run(ChannelEventRunnable.java:67) [netty-3.6.5.Final.jar:na]
    at org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor$ChildExecutor.run(OrderedMemoryAwareThreadPoolExecutor.java:314) [netty-3.6.5.Final.jar:na]
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:885) [na:1.6.0_05]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907) [na:1.6.0_05]
    at java.lang.Thread.run(Thread.java:619) [na:1.6.0_05]
12:05:36.503 [Camel (camel-1) thread #22 - NettyOrderedWorker] DEBUG org.apache.camel.processor.Pipeline - Message exchange has failed: so breaking out of pipeline for exchange: Exchange[Message: insert into mytable (DATA) values ("foo?");] Exception: java.sql.SQLException: Data source is closed
12:05:37.473 [Camel (camel-1) thread #23 - ShutdownTask] DEBUG o.a.c.impl.DefaultShutdownStrategy - Route: route2 preparing to shutdown complete.

消息被骆驼路由接收,但无法发送到数据库。这似乎是数据库的连接问题,但我也尝试将 TCP 端点替换为“direct:output”,并且数据已插入 base.

我哪里错了?我想我可能误解了使用 netty:tcp 组件的方式。谢谢你的帮助。

我正在使用 Camel 2.11 和 Spring 3.1.2

4

1 回答 1

1

你可以尝试设置

  .to("jdbc:mydb?resetAutoCommit=false");
于 2013-07-12T09:24:33.383 回答