0

我已经实现了一个从 Kafka 队列读取消息的苍鹭拓扑。因此,我的拓扑有一个 kafka spout 和一个 bolt,它计算从队列中读取的消息数量。

当我将 say10000消息发送到 kafka 队列时,我可以看到在 heron 拓扑中的 kafka spout 中接收到的所有消息,但是在螺栓处丢失的消息很少。

以下是苍鹭的拓扑设置

 Config config = Config.newBuilder()
                    .setUserConfig("topology.max.spout.pending", 100000)

                    .setUserConfig("topology.message.timeout.secs", 100000)
                    .setNumContainers(1)
                    .setPerContainerCpu(3)
                    .setPerContainerRamInGigabytes(4)
                    .setDeliverySemantics("ATLEAST_ONCE")
                    .build();

任何指针都会有所帮助。

编辑:我正在使用苍鹭的流 API。我用螺栓替换了计数螺栓,但在螺栓log的日志中看到了相同的消息丢失问题log

processingGraphBuilder.newSource(kafkaSource)
                      .log();

编辑 2:我通过完全删除 streamlet API 解决了这个问题。我使用基本的 spout 和 bolt API 重新实现了所有内容,并对 spout 进行了确认。这解决了这个问题。我猜这是因为流 API 中的 spout 没有发生确认

4

2 回答 2

1

简单的答案:不应该放弃。

几个问题: - 在 heronui 中,你的 spout 的所有时间发出和确认计数是多少?- 在heronui中,你的bolt的所有时间执行、确认和失败计数是多少?

于 2018-11-07T18:12:47.710 回答
1

当您说消息被丢弃时,您是否看到失败计数指标中记录了失败,或者只是您在 bolt 中的执行计数与 spout 的发出计数不相符?

在 Storm 兼容模式下,指标是根据样本计算的(我认为默认为 5%)。因此,计数可能会超出该范围。例如,根据流的采样时间,您可以发送 100 个元组,执行计数可以是 80 或 120。

于 2018-11-07T13:08:33.420 回答