0

我正在使用 Spring 云流 kafka 流活页夹编写一个 Kafka 流应用程序。

当消费者将消息发布到输出主题时,可能会出现类似Serialization errorNetwork error的错误。

在这段代码中 -

@Bean
public Function<KStream<Object, String>, KStream<Object, String>> process() {
    return (input) -> {
        KStream<Object, String> kt = input.flatMapValues(v -> Arrays.asList(v.toUpperCase().split("\\W+")));
        return kt;
    };
}

这里在产生错误的同时将消息返回到输出主题,如果发生错误,如何处理它。除了RetryTemplate之外,Kafka 流绑定器中是否有任何机制?

4

1 回答 1

1

请参阅Spring for Apache Kafka 文档

当反序列化器无法反序列化消息时,Spring 无法处理该问题,因为它发生在 poll() 返回之前。为了解决这个问题,2.2 版本引入了 ErrorHandlingDeserializer2。这个反序列化器委托给一个真正的反序列化器(键或值)。如果委托未能反序列化记录内容,则 ErrorHandlingDeserializer2 在包含原因和原始字节的标头中返回空值和 DeserializationException。当您使用记录级 MessageListener 时,如果 ConsumerRecord 包含键或值的 DeserializationException 标头,则使用失败的 ConsumerRecord 调用容器的 ErrorHandler。记录不会传递给侦听器。

于 2020-06-08T12:39:24.643 回答