问题标签 [kafka-consumer-api]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
5 回答
16632 浏览

python - Kafka Consumer:如何从 Python 中的最后一条消息开始消费

我正在使用 Kafka 0.8.1 和 Kafka python-0.9.0。在我的设置中,我设置了 2 个 kafka 代理。当我运行我的 kafka 消费者时,我可以看到它从队列中检索消息并跟踪两个代理的偏移量。一切都很好!

我的问题是,当我重新启动消费者时,它会从头开始消费消息。我所期待的是,在重新启动时,消费者会从它死前停止的地方开始消费消息。

我确实尝试跟踪 Redis 中的消息偏移量,然后在从队列中读取消息之前调用 consumer.seek 以确保我只收到以前未见过的消息。虽然这行得通,但在部署此解决方案之前,我想与大家核实一下……也许我对 Kafka 或 python-Kafka 客户端有一些误解。似乎消费者能够从中断的地方重新开始阅读是非常基本的功能。

谢谢!

0 投票
1 回答
5317 浏览

apache-spark - 如何让 Spark Streaming (Spark 1.0.0) 从 Kafka (Kafka Broker 0.8.1) 读取最新数据

我的 Spark 流应用程序从 Kafka 获取数据并对其进行处理。

如果应用程序发生故障,Kafka 中会存储大量数据,并且在下次启动 Spark Streaming 应用程序时,它会因为一次消耗的数据过多而崩溃。由于我的应用程序不关心过去的数据,因此只使用当前(最新)数据是完全可以的。

我找到了“auto.reset.offest”选项,它在 Spark 中的行为几乎没有什么不同。如果已配置,它将删除存储在 zookeeper 中的偏移量。然而,尽管它的行为出人意料,但它应该在删除后从最新的数据中获取数据。

但我发现不是。我看到在使用数据之前清除了所有偏移量。然后,由于默认行为,它应该按预期获取数据。但是由于数据过多,它仍然会崩溃。

当我使用“Kafka-Console-Consumer”清理偏移量并使用最新的数据并运行我的应用程序时,它按预期工作。

所以看起来“auto.reset.offset”不起作用,并且火花流中的kafka消费者默认从“最小”偏移量获取数据。

您对如何使用最新的 Spark 流中的 Kafka 数据有任何想法吗?

我正在使用 spark-1.0.0 和 Kafka-2.10-0.8.1。

提前致谢。

0 投票
1 回答
384 浏览

message-queue - Kafka 消费者工作队列

我们在 SOA 应用程序中有以下场景。ServiceA 产生一些需要由 ServiceB 的实例异步处理的作业。本质上,这转化为一个工作队列问题,其中每个工作人员都是 ServiceB 的一个实例。我们使用 Kafka 作为消息代理并具有以下设置。

5 经纪人 B1、B2、B3、B4 和 B5。有一个 topic(A) 有 10 个分区 (P1,P2,....P10),每个分区的复制因子为 3。假设分区分配如下 P(i) 有 B(i) 作为领导者,并且B(i+1) 和 B(i+2) 作为副本。

有 3 个 ServiceB 实例正在运行。有了这个设置,我们应该如何使用 High Level Consumer API 来实现 C1 从 3 个分区消费的消费模型;C2 来自 3 个分区;来自剩余 4 个分区的 C3

0 投票
2 回答
11675 浏览

message-queue - 消费者再平衡如何在 Kafka 中工作?

当添加或关闭新的消费者/代理时,Kafka 会触发重新平衡操作。Kafka Rebalancing 是阻塞操作吗?重新平衡操作正在进行时,Kafka 消费者是否被阻塞?

0 投票
1 回答
3244 浏览

apache-kafka - 使用 Simple Consumer 读取 Apache Kafka 中未处理的消息

我厌倦了链接

https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example

使用 SimpleConsumer 来消费消息,但是在使用它时我发现了一些突然的行为,如下所示:

消费者正在消费来自特定分区的消息。但问题是,当我的消费者正在运行并且我使用生产者将消息推送到主题时,它会使用来自该分区的消息。但是,如果我的消费者目前没有运行并且我将一些消息推送到主题并再次启动消费者,它不会消费生产者推送的消息,但它再次准备好消费现在将被推送的消息。我正在使用 LatestTime() 代替 od EarliestTime() 因为我只想使用未处理的消息。

例如

情况1

消费者正在运行:

Producer将M1、M2、M3消息推送到topic 1的partition 1

结果:消费者将消费所有三个消息。

案例 - 2

消费者没有运行

producer 现在将 m4、m5 m6 messgae 推送到主题 1 的分区 1

现在调用消费者

结果:消费者不使用消息 m4、m5、m6,但如果我检查偏移量,则它设置为 7。这意味着生产者在生成消息时已将偏移量提前到 7,因此消费者现在将使用来自偏移量 7 的消息

理想情况下,当消费者再次出现时,它应该从 m4 读取消息,请提供帮助。

0 投票
1 回答
315 浏览

apache-kafka - 直到我在调试器中暂停执行,消费者才发现任何消息

我正在使用 kafka 高级消费者。当我启动消费者时,它会找到所有新消息。当我使用 Java kafka 生产者生成新消息时,它会发现它们。但是一分钟后,它继续循环,但没有找到新消息。当我在调试器中暂停执行时,消费者突然开始寻找要消费的消息。我在 Java 中使用 0.8.0 版。请注意,在发生错误时,使用消息的进程将在单独的“错误”主题中生成消息。当我停止产生这些错误消息时,我就不再遇到这个问题了。

0 投票
3 回答
38410 浏览

java - Kafka - 关闭(kafka.server.KafkaServer),启动 Kafka-Server-Start 出现问题

我将从头开始。我有 openSuse 13.2,还有 jdk_1.7.0_51、scala-2.11.4 和 gradle-2.2.1。我已经下载了 kafka-0.8.2-bet-src 的源代码并运行 ./gradlew 命令,因为它们在自述文件中写入。所有命令都成功运行,除了测试(93% 成功,19 个失败)顺便说一句,我得到了我的 releaseTarGz(kafka_2.11-0.8.2-beta.tgz)。所以一切顺利,我运行 zookeeper 并且它开始正确,但是当我运行 kafka-server-start 时,我得到:

这是我的 server.properties 文件:

还有我的 /etc/hosts 文件:

有人可以帮我吗?

0 投票
2 回答
5515 浏览

java - 基准卡夫卡 - 平庸的表现

我通过在 EC2 服务器上流式传输 1k 大小的消息来对 Kafka 0.8.1.1 进行基准测试。

我在两台 m3.xlarge 服务器上安装了 zookeeper,配置如下:

其次,我在具有 32Gb RAM 和额外 6 个 SSD 驱动器的 i2.2xlarge 机器上安装了单个 Kafka 服务器,其中每个磁盘分区为/mnt/a , mnt/b, etc..... 在服务器上,我有一个代理,端口 9092 上的单个主题和 8 个复制因子为 1 的分区:

我所有的测试都是从另一个实例完成的,实例之间的延迟小于 1 毫秒。当分区键是从 0 到 7 的随机数时,我使用一个线程生产者和 8 个线程消费者编写了生产者/消费者 Java 客户端。我通过提供自定义编码器使用 Json 序列化每条消息。

我的消费者生产者属性如下:

现在,当我发送 100k 条消息时,我每秒收到 10k 条消息和大约 1 毫秒的延迟。

这意味着我每秒有 10 兆字节,相当于 80Mb/s,这还不错,但我希望位于同一区域的那些实例具有更好的性能。

我在配置中遗漏了什么吗?

0 投票
4 回答
14392 浏览

apache-kafka - 读取所有消息后终止 Kafka 控制台使用者

我知道必须有办法做到这一点,但我无法弄清楚这一点。读取队列中的所有消息后,我需要停止 kafka 消费者。

有人可以提供这方面的任何信息吗?

0 投票
2 回答
13916 浏览

apache-kafka - kafka 消费者会话超时

我们有一个应用程序,消费者读取消息,线程执行许多操作,包括在向另一个主题生成消息之前访问数据库。在线程上消费和产生消息之间的时间可能需要几分钟。一旦为新主题生成消息,就会完成提交以指示我们已完成对消费者队列消息的工作。由于这个原因,自动提交被禁用。

我正在使用高级消费者,我注意到 Zookeeper 和 kafka 会话超时,因为我们在消费者队列上做任何事情需要很长时间,所以每次线程返回以从消费者那里读取更多内容时,kafka 最终都会重新平衡队列,并且消费者在一段时间后读取新消息之前开始需要很长时间。

我可以将 Zookeeper 会话超时设置得非常高,以免造成问题,但随后我必须相应地调整重新平衡参数,并且 kafka 在一段时间内不会接收新的消费者以及其他副作用。

我有什么办法来解决这个问题?有没有办法让 kafka 和 zookeeper 心跳,让双方都开心?如果我使用一个简单的消费者,我还会遇到同样的问题吗?