我正在使用 Camel 从 Solace 队列中下载消息,并且很难理解事务管理。
流程是这样的,消息从 Solace 队列中下载,然后根据某些逻辑推送到多个 Solace 队列之一(称为暂存)。从这些暂存队列中,消息使用 SEDA 组件路由到处理器(bean),最后发送到另一个 Solace 队列。
我已将事务配置为
<bean id="propagationReqd" class="org.apache.camel.spring.spi.SpringTransactionPolicy">
<property name="transactionManager" ref="jmsTransactionManager" />
<property name="propagationBehaviorName" value="PROPAGATION_REQUIRED" />
</bean>
如果sequencer引发异常,则消息将保留在in.solace.queue上,但如果消息已从暂存队列传递到 SEDA 到 msgProcessor bean 并引发以下异常,则消息将丢失。
无法创建 JMS 事务;嵌套异常是 com.solacesystems.jms.ConfigurationException:直接传输不支持事务会话或 XA 会话
我的理解是它正在发生,因为 SEDA 不是物理队列,因此在引发异常时消息会丢失,但我认为暂存 Solace 队列会保留它。
路由配置:
<camel:route id="msg.router">
<camel:from uri="{{in.solace.queue}}" />
<camel:transacted ref="propagationReqd" />
<camel:to uri="direct:msgSequencer" />
</camel:route>
<camel:route id="msg.processor">
<camel:from uri="direct:msgSequencer" />
<camel:transacted ref="propagationReqd" />
<camel:process ref="sequencer" />
<camel:choice>
<camel:when>
<camel:simple>${headers.MsgId} == '0'</camel:simple>
<camel:to uri="{{stage.solace.queue.0}}" />
</camel:when>
<camel:when>
<camel:simple>${headers.MsgId} == '1'</camel:simple>
<camel:to uri="{{stage.solace.queue.1}}" />
</camel:when>
...
...
...
</camel:choice>
</camel:route>
<camel:route id="msg.seda.0">
<camel:from uri="{{stage.solace.queue.0}}" />
<camel:transacted ref="propagationReqd" />
<camel:to uri="seda:processor.0" />
</camel:route>
<camel:route id="msg.seda.1">
<camel:from uri="{{stage.solace.queue.1}}" />
<camel:transacted ref="propagationReqd" />
<camel:to uri="seda:processor.1" />
</camel:route>
<camel:route id="msg.process.0">
<camel:from uri="seda:processor.0?concurrentConsumers=4&waitForTaskToComplete=Never&purgeWhenStopping=true" />
<camel:transacted ref="propagationReqd" />
<camel:process ref="msgProcessor" />
<camel:to uri="{{final.queue}}" />
</camel:route>
<camel:route id="msg.process.1">
<camel:from uri="seda:processor.1?concurrentConsumers=4&waitForTaskToComplete=Never&purgeWhenStopping=true" />
<camel:transacted ref="propagationReqd" />
<camel:process ref="msgProcessor" />
<camel:to uri="{{final.queue}}" />
</camel:route>
感谢有人能指出我做错了什么?
提前致谢。