在文档中提到生产者重试将消息发送到基于max_retries
.
所以我关闭了 Kafka,然后尝试了我的制作人。我收到这个错误
Fetching cluster metadata from kafka://localhost:9092
[topic_metadata] Opening connection to localhost:9092 with client id MYCLIENTID
ERROR -- : [topic_metadata] Failed to connect to localhost:9092: Connection refused
DEBUG -- : Closing socket to localhost:9092
ERROR -- : Failed to fetch metadata from kafka://localhost:9092
Completed 500 Internal Server Error in 486ms (ActiveRecord: 33.9ms)
这是有道理的,但是在retries
那之后永远不会发生。我已经从内到外阅读了文档,但我无法弄清楚这retries
实际上是如何触发的?
这是我的代码:
def self.deliver_message(kafka, message, topic, transactional_id)
producer = kafka.producer(idempotent: true,
transactional_id: transactional_id,
required_acks: :all,
max_retries: 5,
retry_backoff: 5)
producer.produce(message, topic: topic)
producer.deliver_messages
end
文档链接:
https://www.rubydoc.info/gems/ruby-kafka/Kafka/Producer#initialize-instance_method
先感谢您。