1

使用 Stomp WebSockets 连接到 ActiveMQ 服务器,我的 extjs5 浏览器 web 应用程序有许多主题订阅范围为面板。

下面是一个带有标题的主题订阅示例:

var sub = this.mqClient.subscribe('/topic/Status', this.onStatusMsg, 
{"ack":"auto","persistent":true,"activemq.retroactive":true,"selector":"TaskId = 1531","activemq.subscriptionName":"status-1531"} );

当用户完成主题订阅后,她会关闭触发 this.mqClient.unsubscribe( sub ) 调用的面板。

重新打开该面板会创建一个具有相同 activemq.subscriptionName 的订阅,这会触发此错误响应:

javax.jms.JMSException: Durable consumer is in use for client: 2015-05-28 15:29:32-0700.0.9276173142716289 and subscriptionName: status-1531
    at org.apache.activemq.broker.region.TopicRegion.addConsumer(TopicRegion.java:127)
    at org.apache.activemq.broker.region.RegionBroker.addConsumer(RegionBroker.java:427)
    at org.apache.activemq.broker.jmx.ManagedRegionBroker.addConsumer(ManagedRegionBroker.java:244)
    at org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:102)
    at org.apache.activemq.advisory.AdvisoryBroker.addConsumer(AdvisoryBroker.java:104)
    at org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:102)
    at org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:102)
    at org.apache.activemq.broker.MutableBrokerFilter.addConsumer(MutableBrokerFilter.java:107)
    at org.apache.activemq.broker.TransportConnection.processAddConsumer(TransportConnection.java:667)
    at org.apache.activemq.command.ConsumerInfo.visit(ConsumerInfo.java:348)
    at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:335)
    at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:189)
    at org.apache.activemq.transport.AbstractInactivityMonitor.onCommand(AbstractInactivityMonitor.java:270)
    at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
    at org.apache.activemq.transport.ws.jetty8.StompSocket.sendToActiveMQ(StompSocket.java:125)
    at org.apache.activemq.transport.stomp.ProtocolConverter.sendToActiveMQ(ProtocolConverter.java:199)
    at org.apache.activemq.transport.stomp.ProtocolConverter.onStompSubscribe(ProtocolConverter.java:663)
    at org.apache.activemq.transport.stomp.ProtocolConverter.onStompCommand(ProtocolConverter.java:257)
    at org.apache.activemq.transport.ws.jetty8.StompSocket.onMessage(StompSocket.java:82)
    at org.eclipse.jetty.websocket.WebSocketConnectionRFC6455$WSFrameHandler.onFrame(WebSocketConnectionRFC6455.java:850)
    at org.eclipse.jetty.websocket.WebSocketParserRFC6455.parseNext(WebSocketParserRFC6455.java:349)
    at org.eclipse.jetty.websocket.WebSocketConnectionRFC6455.handle(WebSocketConnectionRFC6455.java:225)
    at org.eclipse.jetty.io.nio.SelectChannelEndPoint.handle(SelectChannelEndPoint.java:696)
    at org.eclipse.jetty.io.nio.SelectChannelEndPoint$1.run(SelectChannelEndPoint.java:53)
    at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:608)
    at org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:543)
    at java.lang.Thread.run(Thread.java:745)

如何关闭持久主题订阅?

4

1 回答 1

2

如果订阅当前处于活动状态(意味着订阅者当前已连接并订阅),则在 STOMP 中删除持久订阅是一个两步过程。

首先,客户端必须取消订阅,以便订阅变为非活动状态:

String frame = "UNSUBSCRIBE\n" + "destination:/topic/MyTopic" + "\n\n" + Stomp.NULL;

现在订阅处于非活动状态,可以通过使用最初用于创建它的订阅名称发出取消订阅来永久删除它,如下所示:

String frame = "UNSUBSCRIBE\n" + "destination:/topic/MyTopic" + "\n" + "activemq.subscriptionName:MySubscriptionName\n\n" + Stomp.NULL;

这将从 ActiveMQ 消息存储中删除订阅和它保存的所有存储消息。

于 2015-05-29T19:49:45.970 回答