7

我们目前基本上使用以下简化机制来确认消息:

  @KafkaListener(topics = "someTopic")
  public void listen(final String message, final Acknowledgment ack) {
    try {
        processMessage(message);
        ack.acknowledge();
    } catch (final IOException e) {
        // do not acknowledge here since we can temporarily not process the message
    } 

基本上,只要我们暂时无法处理消息(在 IOExceptions 的情况下),我们希望稍后再接收它。

但这不起作用,因为确认假定同一分区中的所有先前消息都已成功处理。在我们的 IOException 案例中,失败的消息将被跳过,但可能会被同一分区上具有更高索引的不同消息确认。

我们有一些解决这个问题的想法,但这意味着一些讨厌的解决方法,以避免直接在 KafkaListener 方法中调用确认。我们的用例是一个非常具体的用例,还是更像是 spring kafka 用户会假设的“默认”行为?

有针对此类问题的 spring-kafka 解决方案吗?或者你有一个“正确”解决这个问题的想法吗?

4

1 回答 1

3

这就是卡夫卡的工作方式;在继续下一条消息之前,您可以启用重试以尝试通过您的 IOException。

您可以配置错误句柄以将失败消息发布到另一个主题以稍后重播。或者,错误处理程序可以停止容器以防止任何新的交付。

当容器重新启动时,消息将被重播。

于 2017-01-04T14:12:25.133 回答