0

我将 Spring Cloud Stream 的 DLQ 功能与 Kafka 活页夹一起使用。当消息处理失败时,消息会按预期发送到 DLQ,但是,我希望能够修改发送到 DLQ 的消息以包含一些额外的诊断信息。问题是发送到DLQ的消息是原始消息;我所做的任何突变都会被忽略。到目前为止,我解决这个问题的方法是在消息发送到 DLQ 之前拦截消息,并添加存储在另一个 bean 中的额外信息。具体来说,我尝试了这两种方法:

  1. 解决方案:为 DLQ 实现一个普通的 Kafka ProducerInterceptor问题:实现是在 Spring 上下文之外实例化的,所以我不能注入我需要的其他 bean。Spring Kafka 已经记录了这个解决方案,但是,它需要创建一个新的ProducerFactorybean,这意味着我不能使用底层 Spring Cloud Stream 中的 bean。
  2. 解决方案:实现一个Spring ChannelInterceptor问题:我无法获得对 DLQ 消息通道的引用,也无法获得底层通道名称,因此我无法仅为 DLQ 消息配置拦截器。

关于如何解决这个问题的任何想法?

4

1 回答 1

1

问题:我无法获得对 DLQ 消息通道的引用,也无法获得底层通道名称,因此我无法仅为 DLQ 消息配置拦截器。

错误通道名称是

destinationName.group.errors

AbstractMessageChannel您可以从应用程序上下文中获取它...

context.getBean("destinationName.group.errors", AbstractMessageChannel.class)

...并添加拦截器。

或者,根本不使用 binder 的 DLQ 机制并添加您自己的错误处理程序:

@ServiceActivator(inputChannel = "destinationName.group.errors")
void errors(Message<?> error) {
    ...
}
于 2020-06-09T15:35:43.413 回答