我已经实现了一个从 Kafka 队列读取消息的苍鹭拓扑。因此,我的拓扑有一个 kafka spout 和一个 bolt,它计算从队列中读取的消息数量。
当我将 say10000
消息发送到 kafka 队列时,我可以看到在 heron 拓扑中的 kafka spout 中接收到的所有消息,但是在螺栓处丢失的消息很少。
以下是苍鹭的拓扑设置
Config config = Config.newBuilder()
.setUserConfig("topology.max.spout.pending", 100000)
.setUserConfig("topology.message.timeout.secs", 100000)
.setNumContainers(1)
.setPerContainerCpu(3)
.setPerContainerRamInGigabytes(4)
.setDeliverySemantics("ATLEAST_ONCE")
.build();
任何指针都会有所帮助。
编辑:我正在使用苍鹭的流 API。我用螺栓替换了计数螺栓,但在螺栓log
的日志中看到了相同的消息丢失问题log
processingGraphBuilder.newSource(kafkaSource)
.log();
编辑 2:我通过完全删除 streamlet API 解决了这个问题。我使用基本的 spout 和 bolt API 重新实现了所有内容,并对 spout 进行了确认。这解决了这个问题。我猜这是因为流 API 中的 spout 没有发生确认