11

我知道必须有办法做到这一点,但我无法弄清楚这一点。读取队列中的所有消息后,我需要停止 kafka 消费者。

有人可以提供这方面的任何信息吗?

4

4 回答 4

8

您可以在启动消费者时传递参数:-consumer-timeout-ms 和一个值,如果在此期间没有读取任何消息,它将引发异常。例如,如果最近 2 秒内没有新消息到达,则停止消费者: kafka.consumer.ConsoleConsumer -consumer-timeout-ms 2000

您可以在此处查看此选项和所有其他输入选项

于 2014-12-18T20:29:09.760 回答
5

目前,Kafka 版本 2.11-2.1.1 有一个名为kafka-console-consumer.sh.

它有一个新标志:--timeout-ms.

基本上,这个标志是在没有新日志等待时退出前等待的最长时间。它以毫秒为单位。

您可以在阅读所有消息后使用此属性来结束您的控制台使用者。

于 2019-09-30T19:37:39.797 回答
5

您可以将 SimpleConsumerShell 与 no-wait-at-logend 选项一起使用。见SystemTools-SimpleConsumerShell

例如:

./kafka-run-class.bat kafka.tools.SimpleConsumerShell --broker-list localhost:9092 --topic kafkademo --partition 0 --no-wait-at-logend
于 2016-03-07T14:54:39.577 回答
1

如果您对使用 Scala 客户端没有死心,请尝试使用kafkacat选项-e告诉它在达到分区结束时退出。

例如消费来自 mytopic 分区 2 的所有消息,然后退出:

$ kafkacat -b mybroker -t mytopic -p 2 -o beginning -e

或者消费最后 3000 条消息然后退出:

$ kafkacat -b mybroker -t mytopic -p 2 -o -3000 -e
于 2014-12-18T21:36:58.780 回答