2

我想要实现的场景是使用来自 Kafka 的消息,处理它,如果某些条件失败,我不希望确认消息。为此,我在 spring cloud stream 参考文档中找到,

autoCommitOffset 是否在处理消息时自动提交偏移量。如果设置为 false,则消息头中将提供一个确认头以用于延迟确认。

默认值:真。

我的问题是将 autoCommitOffset 设置为 false 后,我如何确认消息?一个代码示例将不胜感激。

4

1 回答 1

3

我在这里提供了问题的答案 https://github.com/spring-cloud/spring-cloud-stream/issues/575

本质上它归结为设置 spring.cloud.stream.kafka.bindings.input.consumer.autoCommitOffset=false

然后处理确认头:

@SpringBootApplication
@EnableBinding(Sink.class)
   public class ManuallyAcknowdledgingConsumer {

      public static void main(String[] args) {
         SpringApplication.run(ManuallyAcknowdledgingConsumer.class, args);
      }

      @StreamListener(Sink.INPUT)
      public void process(Message<?> message) {
         System.out.println(message.getPayload());
         Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
        if (acknowledgment != null) {
           System.out.println("Acknowledgment provided");
           acknowledgment.acknowledge();
        }
    }
}
于 2016-06-15T18:47:23.217 回答