1

我需要从 Kafka 主题中读取所有消息,然后处理并退出(无需永远像守护进程一样运行)。我已经编写了如下代码,如果主题中的消息可用,则它可以达到目的,如果主题为空(或提到的 Group_id 没有新消息),它将等到下一条消息到达,如果没有可用的消息,我需要立即退出过程。请查看我的代码并建议是否有更好的方法来实现这一点。我正在使用ruby​​-kafka 1.3.0 gem

require 'kafka'
khost = 'xxx.xxx.xxx.xxx'
kport = 'xxxx'

kafka = Kafka.new(["#{khost}:#{kport}"] )
consumer = kafka.consumer(group_id: "my-consumer")
consumer.subscribe("my-topic")

consumer.each_batch do |batch|
    $msg = batch
    consumer.stop  # stop after reading first batch
end 

# Process messages here

$msg.messages.each do |message|
  puts message.value
end 

我还找到了一种方法 kafka.fetch_messages,但是我没有找到一个选项来维护 group_id和跟踪已处理的消息而不添加额外的代码。

4

0 回答 0