问题标签 [ruby-kafka]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
2915 浏览

ruby - Ruby Kafka 未捕获异常:找不到组协调员

我使用 Apache Kafka 作为 Docker 容器https://hub.docker.com/r/wurstmeister/kafka/

我能够使用 Spring Kafka 从我的 Java 应用程序成功连接到 Kafka。

但是当我尝试通过 Ruby Kafka 从 Ruby 应用程序连接到 Kafka 时,我收到以下错误:

Java 和 Ruby 应用程序之间的唯一区别是 Ruby 应用程序位于我本地网络中的另一台机器上,但我可以从 Ruby 机器上看到 Kafka 机器以及那里的所有端口。

如何发现问题并解决?

更新

0 投票
1 回答
3985 浏览

apache-kafka - 无法连接到任何种子经纪人

我正在尝试使用ruby-kafkagem 发送消息,但出现错误Could not connect to any of the seed brokers

https://github.com/zendesk/ruby-kafka

我创建了krb5.keytab用于身份验证的文件。

有谁知道我为什么会收到这个错误?

0 投票
2 回答
18386 浏览

docker - 卡夫卡生产者说“unknown_topic_or_partition”

几天来,我一直试图让kafka-docker工作,但我不知道自己做错了什么。现在,我无法使用我的 ruby​​-kafka 客户端访问任何主题,因为节点“不存在”。这是我的 docker-compose.yml 文件:

我指定“KAFKA_AUTO_CREATE_TOPICS_ENABLE:'false'”,因为我想手动创建主题,所以我进入我的第一个代理容器并输入:

./kafka-topics.sh --create --zookeeper 172.19.0.2:2181 --topic test1 --partitions 4 --replication-factor 3

一切似乎都很好:

./kafka-topics.sh --list --zookeeper 172.19.0.2:2181 -> test1

但是,当我尝试这样做时:

./kafka-console-producer.sh --broker-list localhost:9092 --topic test1

它说:

WARN 获取相关 ID 为 24 的元数据时出错:{test1=UNKNOWN_TOPIC_OR_PARTITION} (org.apache.kafka.clients.NetworkClient)

如果我再次创建主题,它说它已经存在,所以我不知道发生了什么。

0 投票
1 回答
80 浏览

ruby - ruby-kafka: is it possible to publish to two kafka instances at the same time

Current flow of the project that I'm working on involves pushing to a local kafka using ruby-kafka gem.
Now the need arose to add producer for the remote kafka, and duplicate also messages there.
And I'm looking for a better way, than calling Kafka.new(...) twice...
Could you please help me, and do you happen to have any ideas?

0 投票
1 回答
482 浏览

ruby-on-rails - 如何检查 Ruby-Kafka 重试是否有效?

在文档中提到生产者重试将消息发送到基于max_retries.

所以我关闭了 Kafka,然后尝试了我的制作人。我收到这个错误

这是有道理的,但是在retries那之后永远不会发生。我已经从内到外阅读了文档,但我无法弄清楚这retries实际上是如何触发的?

这是我的代码:

文档链接:

https://www.rubydoc.info/gems/ruby-kafka/Kafka/Producer#initialize-instance_method

先感谢您。

0 投票
0 回答
976 浏览

ruby-on-rails - 如何有效地使用 Kafka 代替 Sidekiq 在 Rails 中进行后台作业?

目前,我打算在 rails 应用程序中使用 sidekiq 进行后台作业。现在我想使用消费者-生产者机制来使用 Kafka。为此,我正在异步ruby-kafka使用和生成消息。

我的方法:在 Job 的perform方法中,MyNewJob < ApplicationJob我正在调用produceKafka 的方法,当consumer收到消息时,我在MyNewJob.

它按预期工作。我想确认,这是正确的方法吗?如果不是,那么正确的方法应该是什么?

伪代码:

我将使用任何MyNewJob.perform_nowMyNewJob.perform_later等来调用我的工作。

my_new_job.rb

consumer.rb(伪代码)

我也在考虑另一种方法,我跳过引入ApplicationJob类并直接在 Kafka 中生成消息而不是调用 Job。

请确认。

编辑 1:我正在使用ruby-kafka异步生产者(kafka.async_producer)。

0 投票
1 回答
320 浏览

ruby-on-rails - Racecar 不使用 DeliveryBoy (kafka-ruby) 编写的消息

我有一个问题,旨在同时使用 zendesk 的送货员/赛车包装器来处理 Kafka 事件。

我使用 Racecar 使用事件,但使用 Sidekiq 计划作业通过 DeliveryBoy 以延迟的时间间隔将事件发回以重新处理到同一主题。

它工作了很多次,但从那以后,Racecar 已经停止使用这些重新处理事件。

DeliveryBoy 仍然确认它已将消息附加到主题:

但消费者不接受该事件。

我知道通过其他方式写入主题的事件正在被 Racecar 接收,似乎唯一不消耗的事件是 DeliveryBoy 编写的事件。

我已经改变了client_idDeliveryBoy 与client_idRacecar 的不同(用一个写,另一个听),但这并没有帮助。

任何建议表示赞赏。

0 投票
0 回答
173 浏览

ruby - ruby-kafka 读取所有消息主题并退出

我需要从 Kafka 主题中读取所有消息,然后处理并退出(无需永远像守护进程一样运行)。我已经编写了如下代码,如果主题中的消息可用,则它可以达到目的,如果主题为空(或提到的 Group_id 没有新消息),它将等到下一条消息到达,如果没有可用的消息,我需要立即退出过程。请查看我的代码并建议是否有更好的方法来实现这一点。我正在使用ruby​​-kafka 1.3.0 gem

我还找到了一种方法 kafka.fetch_messages,但是我没有找到一个选项来维护 group_id和跟踪已处理的消息而不添加额外的代码。

0 投票
1 回答
57 浏览

ruby - 等到值出现在哈希中

最近,我被分配了一个任务来构建一个 REST API 请求,该请求负责将消息发送到 Kafka 的入站通道,然后等待出站通道的输出。一切都很顺利,直到我遇到与等待此特定消息相关的问题。

值得指出的是,在成功到达后,消息会被写入全局消息持有者,这只是底层的 ruby​​ 哈希。下面是监控哈希的函数,直到后者被填充一些值。

以这种方式实施它是否合适?此时我应该尝试什么?
注意:Kafka 消费者在单独的线程中运行。

更新

我刚刚前往 ruby​​ 文档并偶然发现了一些关于频道的有趣部分。据我所知,通道是 ruby​​tines 之间通信的最佳选择(只是 goroutines 的一个花哨的名称,但在 ruby​​ 生态系统中:))
0 投票
0 回答
155 浏览

ruby-on-rails - 在 Rails 中使用 RSpec 测试互连进程(特别是 Kafka 进程)

我有一个 Rails 应用程序,它与工头一起运行多个进程

流程 1:使用 Racecar Gem ( https://github.com/zendesk/racecar )的 Kafka 消费者

进程 2:根据进程 1 使用的消息发送通知的服务

我想使用 Fake Kafka gem ( https://github.com/catawiki/fake-kafka ) 在 RSpec 功能规范中发布一条消息,并让我的 Racecar 消费者使用这条消息,然后测试该服务做了什么它的意思是。

如何在我的测试环境中运行 Kafka 进程,以便我可以测试消费消息是否会导致服务中的正确最终行为?