1

我遵循了以下文档,并且我有一个生产者和消费者使用 Kinesis Stream 工作得很好。我想了解在发生任何异常时如何处理生产者(源)中的错误。

根据 Spring Stream 错误处理文档,我尝试了以下方法:

@StreamListener("errorChannel")
@ServiceActivator(inputChannel = "errorChannel")

两者都不起作用。我在生产者方法中显式抛出 RuntimeException 并期望它将在“errorChannel”中但无法接收到相同的。

如果有人成功做到这一点,请帮助我弄清楚这一点或与我分享方法。

4

1 回答 1

0

注意:几个月前我遇到了这个确切的问题。我们使用 Kafka 代替 Kinesis。因为,您已经标记了 spring-cloud-stream-binder-kafka,所以我在同一基础上提供输入。

您可以为您的生产者编写自定义回调,此回调可以告诉您消息是失败还是成功发布。失败时,记录消息的元数据。

下面是一个小代码片段,可以更好地解释我所说的回调。

请注意,这是针对 Kafka 的。相应地更改 Kinesis 的代码。

public class ProduceToKafka {

  private ProducerRecord<String, String> message = null;

 // TracerBulletProducer class has producer properties
  private KafkaProducer<String, String> myProducer = TracerBulletProducer
      .createProducer();

  public void publishMessage(String string) {

    ProducerRecord<String, String> message = new ProducerRecord<>(
        "topicName", string);

    myProducer.send(message, new MyCallback(message.key(), message.value()));
  }

  class MyCallback implements Callback {

    private final String key;
    private final String value;

    public MyCallback(String key, String value) {
      this.key = key;
      this.value = value;
    }


    @Override
    public void onCompletion(RecordMetadata metadata, Exception exception) {
      if (exception == null) {
        log.info("--------> All good !!");
      } else {
        log.info("--------> not so good  !!");
        log.info(metadata.toString());
        log.info("" + metadata.serializedValueSize());
        log.info(exception.getMessage());

      }
    }
  }

}

我写了一篇关于在 Spring 抽象的不同级别处理生产者故障的中型文章。这可以帮助你。看看: https ://medium.com/@akhil.ghatiki/kafka-producer-failure-handling-at-multiple-levels-of-spring-abstractions-e530edb02a6c

于 2020-08-23T10:38:37.403 回答