我正在使用 Spring Cloud Stream 开发 Kafka Streams。在消息处理应用程序中,可能会产生错误。因此,不应再次提交并重试该消息。
我的申请方法——
@Bean
public Function<KStream<Object, String>, KStream<String, Long>> process() {
return (input) -> {
KStream<Object, String> kt = input.flatMapValues(v -> Arrays.asList(v.toUpperCase().split("\\W+")));
KGroupedStream<String, String> kgt =kt.map((k, v) -> new KeyValue<>(v, v)).groupByKey(Grouped.with(Serdes.String(), Serdes.String()));
KTable<Windowed<String>, Long> ktable = kgt.windowedBy(TimeWindows.of(500)).count();
KStream<String, WordCount> kst =ktable.toStream().map((k,v) -> {
WordCount wc = new WordCount();
wc.setWord(k.key());
wc.setCount(v);
wc.setStart(new Date(k.window().start()));
wc.setEnd(new Date(k.window().end()));
dao.insert(wc);
return new KeyValue<>(k.key(),wc);
});
return kst.map((k,v) -> new KeyValue<>(k, v.getCount()));
};
}
这里如果DAO插入方法失败,消息不应该发布到输出主题,并且应该重试相同消息的处理。
我们如何配置 kafka 流绑定器来做到这一点?非常感谢您对此的任何帮助。