问题标签 [kafka-consumer-api]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
119 浏览

apache-kafka - 将消息从 Kafka 拉到两个目的地

我能够获取 kafka 消息并将其插入 hdfs。我希望能够使用 BI 工具提取同一组消息。

有没有办法做到这一点?我需要创建 2 个消费者吗?还是2个消费群体?

好心提醒。

谢谢

0 投票
1 回答
2653 浏览

apache-kafka - Kafka:如何获取主题的最后修改时间,即添加到主题的任何分区的最后一条消息

我们的用例是从 kafka 中删除过时/未使用的主题,即如果一个主题(在所有分区上)在过去 7 天内没有任何新消息,那么我们会将其视为过时/未使用并删除它。

许多谷歌结果建议在消息中添加时间戳,然后对其进行解析。对于灵魂可以工作的新主题和消息,但我们现有的主题和消息中没有任何时间戳。

我怎样才能得到这个工作?

0 投票
5 回答
36979 浏览

java - 简单的 Kafka 消费者示例不起作用

我有一个简单的类来使用来自 kafka 服务器的消息。大部分代码抄自org.apache.kafka.clients.consumer.KafkaConsumer.java的注释。

我正在使用“org.apache.kafka:kafka-clients:0.8.2.0”。它抛出异常

我应该如何配置 key.deserializer?

0 投票
5 回答
34097 浏览

java - Kafka - 使用高级消费者的延迟队列实现

想要使用高级消费者 api 实现延迟消费者

大意:

  • 按键生成消息(每个消息都包含创建时间戳)这确保每个分区都按生成时间排序消息。
  • auto.commit.enable=false(将在每个消息处理后显式提交)
  • 消费一条消息
  • 检查消息时间戳并检查是否已经过了足够的时间
  • 处理消息(此操作永远不会失败)
  • 提交 1 偏移量

    /li>

关于这个实现的一些担忧:

  1. 提交每个偏移量可能会减慢 ZK
  2. consumer.commitOffsets 可以抛出异常吗?如果是,我将使用相同的消息两次(可以用幂等消息解决)
  3. 等待很长时间而不提交偏移量的问题,例如延迟时间为 24 小时,将从迭代器获取下一个,休眠 24 小时,处理并提交(ZK 会话超时?)
  4. ZK 会话如何在不提交新偏移量的情况下保持活动状态?(设置一个 hive zookeeper.session.timeout.ms 可以解决死消费者而不识别它)
  5. 我还缺少其他问题吗?

谢谢!

0 投票
4 回答
12515 浏览

python - 如何在程序中停止 Python Kafka Consumer?

我正在做 Python Kafka 消费者(尝试在http://kafka-python.readthedocs.org/en/latest/apidoc/kafka.consumer.html中使用 kafka.consumer.SimpleConsumer 或 kafka.consumer.simple.SimpleConsumer )。当我运行以下代码时,它会一直运行,即使所有消息都已消耗。我希望消费者在消费完所有消息后会停止。怎么做?我也不知道如何使用 stop() 函数(它在基类 kafka.consumer.base.Consumer 中)。

更新

我使用信号处理程序来调用 consumer.stop()。一些错误信息被打印到屏幕上。但是程序仍然卡在for循环中。当新消息进来时,消费者消费它们并打印它们。我也试过client.close()。但同样的结果。

我需要一些方法来优雅地停止 for 循环。

欢迎任何帮助。谢谢。

0 投票
1 回答
1307 浏览

python - 如何使用 kafka.consumer.SimpleConsumer,seek()

API 文档在这里: http: //kafka-python.readthedocs.org/en/latest/apidoc/kafka.consumer.html

但是当我运行以下代码时,异常是%d 格式:需要一个数字,而不是 NoneType

当我使用以下代码时,异常是seek() got an unexpected keyword argument 'partition'

任何想法?谢谢。

0 投票
0 回答
178 浏览

apache-kafka - kafka.consumer.simple.SimpleConsumer.offsets 和 kafka.consumer.simple.SimpleConsumer.fetch_offsets 有什么区别

API 文档在这里: http: //kafka-python.readthedocs.org/en/latest/apidoc/kafka.consumer.html

kafka.consumer.simple.SimpleConsumer.offsets 和 kafka.consumer.simple.SimpleConsumer.fetch_offsets 有什么区别(https://github.com/mumrah/kafka-python/blob/adbd4ac052e4a5b40cfc2a3589b7adbcb656afe5/kafka/consumer/simple.py )?

如何获取一个主题的某个分区的所有消息的偏移量?如何获取未消费消息的偏移量?如何获取已消费消息的偏移量?似乎 offsets 和 fetch_offsets 都是消费消息的偏移量。

0 投票
4 回答
15801 浏览

apache-spark - 如何将 Spark 消耗的最新偏移量保存到 ZK 或 Kafka 并在重启后可以读取

Kafka 0.8.2用于从 AdExchange 接收数据,然后Spark Streaming 1.4.1将数据存储到MongoDB.

我的问题是当我重新启动我的Spark Streaming工作时,例如更新新版本、修复错误、添加新功能。它将继续读取当时最新offsetkafka数据,然后在重新启动作业期间我将丢失 AdX 推送到 kafka 的数据。

我尝试了类似的auto.offset.reset -> smallest方法,但它会从 0 -> last 然后数据很大并且在 db 中重复。

我也尝试设置特定的group.idconsumer.idSpark但它是一样的。

如何将offset消耗的最新火花保存到zookeeper或者kafka然后可以从该火花读取到最新offset

0 投票
1 回答
669 浏览

apache-kafka - kafka 主题分区再平衡通知

我正在使用 Kafka 0.8.1.1

是否有任何 API(回调等)可以用来找到消费者lost partitions或消费者?newly added partitions

0 投票
0 回答
698 浏览

apache-kafka - 如何使用 Unix 时间戳通过 SimpleConsumer API 获取偏移量?

我正在尝试使用SimpleConsumer 示例

我修改代码中的偏移量:

当我使用kafka.api.OffsetRequest.EarliestTime()or时效果很好kafka.api.OffsetRequest.LatestTime()。但是当我将它设置为 UNIX TIMESTAMP 时,它此时不会返回消息。

例如

我将时间戳设置为 1439196000000L,即 2015/8/10 16:40:0。但是,它会在该时间前一小时左右返回一条消息。

  1. 这是分配时间戳的正确方法吗?时间戳应该是 13 位,而不是 10 位,对吧?
  2. 我在中国,使用北京时间。有关系吗?
  3. Kafka是否有可能有任何参数来设置集群的时间?