我将 Spring Cloud Stream 的 DLQ 功能与 Kafka 活页夹一起使用。当消息处理失败时,消息会按预期发送到 DLQ,但是,我希望能够修改发送到 DLQ 的消息以包含一些额外的诊断信息。问题是发送到DLQ的消息是原始消息;我所做的任何突变都会被忽略。到目前为止,我解决这个问题的方法是在消息发送到 DLQ 之前拦截消息,并添加存储在另一个 bean 中的额外信息。具体来说,我尝试了这两种方法:
- 解决方案:为 DLQ 实现一个普通的 Kafka
ProducerInterceptor
。问题:实现是在 Spring 上下文之外实例化的,所以我不能注入我需要的其他 bean。Spring Kafka 已经记录了这个解决方案,但是,它需要创建一个新的ProducerFactory
bean,这意味着我不能使用底层 Spring Cloud Stream 中的 bean。 - 解决方案:实现一个Spring
ChannelInterceptor
。问题:我无法获得对 DLQ 消息通道的引用,也无法获得底层通道名称,因此我无法仅为 DLQ 消息配置拦截器。
关于如何解决这个问题的任何想法?