0

我正在使用 Camel 从 Solace 队列中下载消息,并且很难理解事务管理。

流程是这样的,消息从 Solace 队列中下载,然后根据某些逻辑推送到多个 Solace 队列之一(称为暂存)。从这些暂存队列中,消息使用 SEDA 组件路由到处理器(bean),最后发送到另一个 Solace 队列。

我已将事务配置为

<bean id="propagationReqd" class="org.apache.camel.spring.spi.SpringTransactionPolicy">
        <property name="transactionManager" ref="jmsTransactionManager" />
        <property name="propagationBehaviorName" value="PROPAGATION_REQUIRED" />
</bean>

如果sequencer引发异常,则消息将保留在in.solace.queue上,但如果消息已从暂存队列传递到 SEDA 到 msgProcessor bean 并引发以下异常,则消息将丢失。

无法创建 JMS 事务;嵌套异常是 com.solacesystems.jms.ConfigurationException:直接传输不支持事务会话或 XA 会话

我的理解是它正在发生,因为 SEDA 不是物理队列,因此在引发异常时消息会丢失,但我认为暂存 Solace 队列会保留它。

路由配置

<camel:route id="msg.router">
    <camel:from uri="{{in.solace.queue}}" />
    <camel:transacted ref="propagationReqd" />
    <camel:to uri="direct:msgSequencer" />
</camel:route>

<camel:route id="msg.processor">
    <camel:from uri="direct:msgSequencer" />
    <camel:transacted ref="propagationReqd" />
    <camel:process ref="sequencer" />
    <camel:choice>
        <camel:when>
            <camel:simple>${headers.MsgId} == '0'</camel:simple>
            <camel:to uri="{{stage.solace.queue.0}}" />
        </camel:when>
        <camel:when>
            <camel:simple>${headers.MsgId} == '1'</camel:simple>
            <camel:to uri="{{stage.solace.queue.1}}" />
        </camel:when>
        ...
        ...
        ...
    </camel:choice>
</camel:route>

<camel:route id="msg.seda.0">
    <camel:from uri="{{stage.solace.queue.0}}" />
    <camel:transacted ref="propagationReqd" />
    <camel:to uri="seda:processor.0" />
</camel:route>

<camel:route id="msg.seda.1">
    <camel:from uri="{{stage.solace.queue.1}}" />
    <camel:transacted ref="propagationReqd" />
    <camel:to uri="seda:processor.1" />
</camel:route> 

<camel:route id="msg.process.0">
    <camel:from uri="seda:processor.0?concurrentConsumers=4&amp;waitForTaskToComplete=Never&amp;purgeWhenStopping=true" />
    <camel:transacted ref="propagationReqd" />
    <camel:process ref="msgProcessor" />
    <camel:to uri="{{final.queue}}" />
</camel:route>

<camel:route id="msg.process.1">
    <camel:from uri="seda:processor.1?concurrentConsumers=4&amp;waitForTaskToComplete=Never&amp;purgeWhenStopping=true" />
    <camel:transacted ref="propagationReqd" />
    <camel:process ref="msgProcessor" />
    <camel:to uri="{{final.queue}}" />
</camel:route>

感谢有人能指出我做错了什么?

提前致谢。

4

2 回答 2

1
  1. 该错误听起来像是与 Solace 设置连接工厂有关的配置问题。

  2. 仅供参考:在事务 b/c 中不能指望 seda 端点,它们是异步的。

  3. 如果您真的需要所有步骤的事务,我会将消息放回另一个安慰队列而不是 seda 端点

于 2016-10-06T16:43:44.320 回答
1

无法创建 JMS 事务;嵌套异常是 com.solacesystems.jms.ConfigurationException:直接传输不支持事务处理会话或 XA会话

Solace 不允许使用启用了直接传输的 JMS 连接工厂的事务。这是您的异常的原因。

此处的解决方案是禁用 bean 使用的连接工厂上的直接传输,msgProcessor以使事务能够摆脱此异常。

如果 sequencer 引发异常,则消息将保留在 in.solace.queue 上,但如果消息已从暂存队列传递到 SEDA 到 msgProcessor bean 并引发以下异常,则消息将丢失。

http://camel.apache.org/seda.html看来,SEDA 不支持恢复/事务。

如果 VM 在消息尚未处理时终止,则此组件不会实现任何类型的持久性或恢复。如果您需要持久性、可靠性或分布式 SEDA,请尝试使用 JMS 或 ActiveMQ。

于 2016-10-07T01:11:05.403 回答