我正在使用ruby-kafka
gem,我有一个简单的生产者和消费者:
制片人
require 'ruby-kafka'
class Producer
def initialize(kafka_client: Kafka)
@kafka_client = kafka_client.new(['localhost:9092'], client_id: 'my-application')
end
def produce
# Instantiate a new producer.
producer = @kafka_client.async_producer
# Add a message to the producer buffer.
3.times { producer.produce("hello1", topic: "twitter_tweets") }
# Deliver the messages to Kafka.
producer.deliver_messages
producer.shutdown
end
end
消费者
require 'ruby-kafka'
class Consumer
def initialize(kafka_client: Kafka)
@kafka_client = kafka_client.new(['localhost:9092'], client_id: 'my-application')
end
def consume
consumer = @kafka_client.consumer(group_id: "my-consumer")
consumer.subscribe("twitter_tweets")
consumer.each_message do |message|
puts message.topic, message.partition
puts message.offset, message.key, message.value
end
end
end
当我运行消费者时,我得到一个错误
Using snappy compression requires adding a dependency on the `snappy` gem to your Gemfile. (LoadError)
cannot load such file -- snappy (LoadError)
在我的config/producer-config.sh
我有这个(没碰它)
# specify the compression codec for all data generated: none, gzip, snappy, lz4, zstd
compression.type=none
当我使用 CLI 运行生产者和消费者时,它工作正常并且我没有收到任何错误。这是一个错误还是我错过了什么?