问题标签 [karafka]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
ruby - Karafka start_from_beginning not working as expected
I've created a project to help with my understanding of Kafka. It's set up as three identical Rails apps all inside Docker with Karafka configured to consume the messages - if you create a record in one, it's replicated across to the other two. I assumed that the start_from_beginning
setting would mean that every time the Karafka server was restarted it would start from offset 0, but that does seem to be the case. Can someone please explain what I've done wrong or correct my understanding.
Here are the two significant sections from karafka.rb
...
I have already tried putting config.kafka.start_from_beginning = true
in the config section of karafka.rb
but no joy.
When I create a record in one of the apps, it's sync over to the other two. This is what I was trying to do:
- empty the database in app 3
- restart karafka server in app 3 (
with start_from_beginning = true
)
At this point, I was expecting the database to be re-created from Kafka by rewinding to offset 0 and replaying all the messages. What have I missed?
The full project is here: https://github.com/jcleary/kafka-demo
ruby-on-rails - Kafka/Ruby on Rails - 我不理解运行“bundle exec karafka server”后生成的日志
我是微服务领域的新手。我正在将一个整体 Web 应用程序迁移到一个微服务应用程序。我在将负责身份验证的 Ruby on Rails Web 应用程序与 KAFKA 连接时遇到问题。
Ruby on Rails 本身没有任何问题。POC 可在此处获得https://github.com/maelfosso/authentication-kafka。HTTP 请求工作正常。
现在我想将它与 Apache Kafka 连接起来。我正在使用Karafka。感谢他的作者,我终于成功设置了它。但是在启动它时,我得到了一个我不明白的日志。你可以在这里阅读https://gist.github.com/maelfosso/3be57de6bede85f67b36b0143fd6ae58
这是我对 Apache Kafka 的第一次体验,我尽可能简单地开始它,如快速入门部分所述
从一个 NodeJs 微服务,我可以产生一条消息。我可以使用bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic business
.
这是一些多次出现的部分
我不明白它为什么要关闭。
等待响应 2(如果我不停止响应,最多 9 个)?它在等什么?有没有我错过的配置?
我不明白这个问题。它只是从NodeJS应用程序发送的 json 消息。就是这里
还有那个消息显示了很多次
我不明白。
我检查了安装的 Kafka192.168.8.101:9092
并且它正在工作。我可以从 NodeJS 应用程序或控制台生产者生成消息,并使用控制台消费者使用它们。
请问,有什么问题吗?完整的日志文件在这里https://gist.github.com/maelfosso/3be57de6bede85f67b36b0143fd6ae58和 Ruby on Rails POC 在这里https://github.com/maelfosso/authentication-kafka
谢谢
ruby - 处理两个连续 kafka 批次之间的长时间延迟(使用 ruby/karafka 消费者)
我正在使用 karafka 从主题中读取,并调用外部服务。每次调用外部服务大约需要 300 毫秒。并且在消费者组中运行 3 个消费者(k8s 中的 3 个 pod),我预计每秒可以实现 10 个事件。我看到了这些日志,它们也证实了处理每个单独事件的 300 毫秒预期。但是,总吞吐量并没有增加。每个 karafka 进程似乎在处理两批事件之间停留了很长时间。
围绕该consume
方法进行检测意味着消费者代码本身不需要时间。
https://github.com/karafka/karafka/blob/master/lib/karafka/backends/inline.rb#L12
但是,我注意到两件事:
当我在 3 个 pod 上跟踪日志时,3 个 pod 中只有一个似乎一次发出日志。这对我来说没有意义。由于所有分区都有足够的事件,每个消费者应该能够并行消费。
虽然,上述消息大致显示每个事件 321 毫秒 (2571/8),但实际上我看到日志在处理两个批次之间长时间停滞。我很好奇,那个时间去哪儿了?
====== 编辑:
跨代理的数据分布存在一些偏差——因为我们最近将代理从 3 个扩展到总共 6 个。但是,没有一个代理处于 CPU 或磁盘压力之下。这是一个新集群,在高峰期几乎不使用 4-5% 的 cpu。
我们的数据均匀分布在 3 个分区中——我说这是因为每个分区的最后一个偏移量大致相同。
分割 | 第一次 偏移 |
最后 偏移量 |
尺寸 | 领导 节点 |
副本 节点 |
同步 副本 节点 |
离线 副本 节点 |
首选 领导者 |
复制不足 |
---|---|---|---|---|---|---|---|---|---|
[0] | 2174152 | 3567554 | 1393402 | 5 | 5,4,3 | 3,4,5 | 是的 | 不 | |
1 | 2172222 | 3566886 | 1394664 | 4 | 4,5,6 | 4,5,6 | 是的 | 不 | |
[2] | 2172110 | 3564992 | 1392882 | 1 | 1,6,4 | 1,4,6 | 是的 | 不 |
然而,我确实看到一个消费者永远落后于另外两个消费者。下表显示了我的消费者的滞后。每个分区有一个消费者进程:
分割 | 第一次偏移 | 最后偏移量 | 消费者抵消 | 落后 |
---|---|---|---|---|
0 | 2174152 | 3566320 | 2676120 | 890200 |
1 | 2172222 | 3565605 | 3124649 | 440956 |
2 | 2172110 | 3563762 | 3185587 | 378175 |
综合滞后 | 1709331 |
这是来自所有 3 个消费者的日志的屏幕截图。consume
您可以注意到每次函数调用所花费的时间和两次相邻调用之间的间隔之间的巨大差异。基本上,我想解释和/或减少等待时间。该主题中有 100k+ 事件,我的虚拟 karafka 应用程序能够快速检索它们,因此 kafka 代理不是问题。
将 max_wait_time 设置为 1 秒(之前为 5 秒)后更新
减少等待配置后,问题似乎得到了解决。现在两个连续日志的差值大致等于consume花费的时间
ruby-on-rails - How do you process a sliding window using karafka?
I have a stream of messages in a Kafka topic. For each message, I need to examine previous messages that occur within 10 seconds of the current message.
I assume that I can keep a buffer with the current window in my consumer. How do I reconstruct the window for the current offset if my consumer dies or gets replaced?
java - kafka 被设计成有很多生产者
我们一直在测试kafka(基于云)实现一个多生产者(大约27000台linux机器)、一个消费者(spring kafka listener)和一个10个partition的topic的数据传输系统,问题是当9500个生产者同时传输所有节点的 CPU 消耗高达 100%,集群出现故障并停止响应。这个 kafka 是为这种类型的架构设计的,或者你应该寻找其他选择。
这是我的设置:
Kafka 集群:每个节点 4 个基于云 karafka 的节点(4GB ram + 900GB 磁盘)
生产者:kafka-clients-1.1.1.jar + JDK 1.7
春季启动消费者配置:
任何帮助将不胜感激。谢谢