目前,我打算在 rails 应用程序中使用 sidekiq 进行后台作业。现在我想使用消费者-生产者机制来使用 Kafka。为此,我正在异步ruby-kafka
使用和生成消息。
我的方法:在 Job 的perform
方法中,MyNewJob < ApplicationJob
我正在调用produce
Kafka 的方法,当consumer
收到消息时,我在MyNewJob
.
它按预期工作。我想确认,这是正确的方法吗?如果不是,那么正确的方法应该是什么?
伪代码:
我将使用任何MyNewJob.perform_now
或MyNewJob.perform_later
等来调用我的工作。
my_new_job.rb
class MyNewJob < ApplicationJob
queue_as TestMixin::QUEUE_NAME
rescue_from(App::ServerError) do
retry_job wait: Rails.application.config.retry_interval
end
def perform(paylaod)
#producing message asynchronously
producer = kafka.async_producer
producer.produce(payload, topic: "topic_name")
end
def self.run_job(payload)
#code to executed previously written inside perform method above.
end
end
consumer.rb(伪代码)
#client consumer topic subscribe logic and then start consuming.
consumer.each do |message|
MyNewJob.run_job(message.payload)
end
我也在考虑另一种方法,我跳过引入ApplicationJob
类并直接在 Kafka 中生成消息而不是调用 Job。
请确认。
编辑 1:我正在使用ruby-kafka
异步生产者(kafka.async_producer
)。