0

在每条消息成功发送到 JMS 或失败后如何进行回调?

val jmsSink = JmsSink.textSink(
    JmsSinkSettings(connectionFactory).withQueue("My_Queue")
)
Source(Stream.from(1))
    .map(_.toString)
    .runWith(jmsSink)

更具体的例子

// creating a sourceQueue which is bound to jmsSink
val sourceQueue: SourceQueueWithComplete[String] =
    Source.queue[String](bufferSize, OverflowStrategy.backpressure)
        .to(jmsSink)
        .run()

客户将项目发送到sourceQueue

val result: Future[QueueOfferResult] = sourceQueue offer "my-item"

val result是插入项目的结果,sourceQueue并不意味着它已经发送到JMS。当项目通过接收器过程并插入到 JMS 队列时,我需要触发一个事件。

4

1 回答 1

1

为每个成功的消息调用回调(如果“回调”是指返回的函数Unit)的一种方法是创建一个Source订阅相同 JMS 队列的对应,并使用runForeach

val jmsSink = JmsSink.textSink(
  JmsSinkSettings(connectionFactory).withQueue("My_Queue")
)

Source(...)
  .map(_.toString)
  .runWith(jmsSink)

val jmsSource = JmsSource(
  JmsSourceSettings(connectionFactory).withQueue("My_Queue")
)

jmsSource.runForeach(println)

上面的示例打印通过接收器发布到队列的每条消息。

至于错误,目前如果抛出异常,您的流将关闭。例如,如果在抛出异常的情况下您想要打印异常并恢复流而不是终止它,您可以将监督策略附加到您的原始Source

val decider: Supervision.Decider = {
  case e: Exception =>
    println(s"Exception thrown: ${e.getMessage}")
    Supervision.Resume
}

val flow = Flow[String]
  .withAttributes(ActorAttributes.supervisionStrategy(decider))

Source(...)
  .map(_.toString)
  .via(flow)
  .runWith(jmsSink)

val jmsSource = ...
于 2017-12-18T15:05:31.863 回答