注意:几个月前我遇到了这个确切的问题。我们使用 Kafka 代替 Kinesis。因为,您已经标记了 spring-cloud-stream-binder-kafka,所以我在同一基础上提供输入。
您可以为您的生产者编写自定义回调,此回调可以告诉您消息是失败还是成功发布。失败时,记录消息的元数据。
下面是一个小代码片段,可以更好地解释我所说的回调。
请注意,这是针对 Kafka 的。相应地更改 Kinesis 的代码。
public class ProduceToKafka {
private ProducerRecord<String, String> message = null;
// TracerBulletProducer class has producer properties
private KafkaProducer<String, String> myProducer = TracerBulletProducer
.createProducer();
public void publishMessage(String string) {
ProducerRecord<String, String> message = new ProducerRecord<>(
"topicName", string);
myProducer.send(message, new MyCallback(message.key(), message.value()));
}
class MyCallback implements Callback {
private final String key;
private final String value;
public MyCallback(String key, String value) {
this.key = key;
this.value = value;
}
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
log.info("--------> All good !!");
} else {
log.info("--------> not so good !!");
log.info(metadata.toString());
log.info("" + metadata.serializedValueSize());
log.info(exception.getMessage());
}
}
}
}
我写了一篇关于在 Spring 抽象的不同级别处理生产者故障的中型文章。这可以帮助你。看看:
https ://medium.com/@akhil.ghatiki/kafka-producer-failure-handling-at-multiple-levels-of-spring-abstractions-e530edb02a6c