1

目前,我打算在 rails 应用程序中使用 sidekiq 进行后台作业。现在我想使用消费者-生产者机制来使用 Kafka。为此,我正在异步ruby-kafka使用和生成消息。

我的方法:在 Job 的perform方法中,MyNewJob < ApplicationJob我正在调用produceKafka 的方法,当consumer收到消息时,我在MyNewJob.

它按预期工作。我想确认,这是正确的方法吗?如果不是,那么正确的方法应该是什么?

伪代码:

我将使用任何MyNewJob.perform_nowMyNewJob.perform_later等来调用我的工作。

my_new_job.rb

class MyNewJob < ApplicationJob

  queue_as TestMixin::QUEUE_NAME

  rescue_from(App::ServerError) do
    retry_job wait: Rails.application.config.retry_interval
  end

  def perform(paylaod)
    #producing message asynchronously
    producer = kafka.async_producer
    producer.produce(payload, topic: "topic_name")
  end

  def self.run_job(payload)
    #code to executed previously written inside perform method above.
  end

end

consumer.rb(伪代码)

  #client consumer topic subscribe logic and then start consuming.
  consumer.each do |message|
    MyNewJob.run_job(message.payload)
  end

我也在考虑另一种方法,我跳过引入ApplicationJob类并直接在 Kafka 中生成消息而不是调用 Job。

请确认。

编辑 1:我正在使用ruby-kafka异步生产者(kafka.async_producer)。

4

0 回答 0