问题标签 [kafka-consumer-api]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
7679 浏览

mysql - 将 kafka 消费者消息保存到数据库

我正在为流分析制作流式数据库。谁能给我一步一步的代码来更新数据库(如 mySQL 或 Hive)中的 Kafka 消息以将其用于分析?

我设置了 Zookeeper 和 Kafka,但无法将消息保存到任何持久性数据库以将其用于报告。

0 投票
1 回答
1638 浏览

apache-kafka - 元数据响应 - 代理主机名错误

我使用 python-kafka 的 SimpleConsumer 在 kafka 代理中收听一个主题。Kafka 代理运行在主机名为 BROKER_HOST 的机器上。现在,SimpleConsumer 从代理 BROKER_HOST 请求主题元数据以获取主题 TOPIC 并获得一个元组

经纪人元数据来作为,

理想情况下,主机值必须是 BROKER_HOST(hostnameshell cmd 确认)但它是 localhost...

``主题的代理元数据如何进入 kafka 系统?显然,这会破坏系统,因为我的消费者尝试连接到其本地主机上的 9092。

0 投票
0 回答
3352 浏览

apache-kafka - java.io.IOException:连接由 kafka 中的对等方重置

我正在使用安装在 ubuntu 框中的 kafka_2.10-0.8.1.1 的 java api,并在运行高级消费者时收到java.io.IOException: Connection reset by peer 。它适用于 kafka-console-consumer.sh 命令,但不适用于 java api

0 投票
2 回答
752 浏览

apache-kafka - Kafka 0.8.2 ConsumerMetadataRequest 总是返回 ConsumerCoordinatorNotAvailableCode

我正在使用kafka 0.8.2,我想使用 fetch 和 commit offset API,如文档所述

给定消费者组的偏移量由称为偏移量协调器的特定代理维护。即,消费者需要向该特定代理发出其偏移提交和获取请求。它可以通过发出消费者元数据请求来发现当前的偏移协调器

所以我发送一个ConsumerMetadataRequest,而不是得到正确的响应,我总是得到ConsumerCoordinatorNotAvailableCode

如果尚未创建偏移量主题,代理会为消费者元数据请求或偏移量提交请求返回此错误代码。

如果我使用kafka 0.8.2beta版本,则没有问题。

我也使用 go client sarama,我__consumer_offsets在获取元数据之前创建了主题。这是我的配置:

0 投票
2 回答
9014 浏览

java - Kafka 设置从主题中读取的最大消息数

我是 Apache Kafka 的新手,正在探索 SimpleConsumer 以读取来自该主题的消息。

我使用下面的代码来做同样的事情,

这会读取特定分区中的所有可用消息;我想设置要阅读的最大消息数。在这个阶段有没有办法做到这一点?当队列中有大量消息时,我不希望所有消息都落在 JVM 堆中。

另一个问题,

以下代码返回一个 ByteBufferMessageSet。

这是否意味着,并非所有可用消息都实际进入内存?

0 投票
11 回答
117005 浏览

apache-kafka - 如何从一开始就使用 Kafka Consumer API 读取数据?

每次我运行消费者时,谁能告诉我如何从一开始就使用 Kafka Consumer API 读取消息。

0 投票
2 回答
11653 浏览

apache-kafka - Kafka消费者再平衡算法

有人可以告诉我 Kafka 消费者的再平衡算法是什么吗?我想了解分区计数和消费者线程如何影响这一点。

谢谢,

0 投票
5 回答
50726 浏览

apache-kafka - 在 KAFKA 中消费后删除消息

我正在使用 apache kafka 生成和使用大小为 5GB 的文件。我想知道是否有一种方法可以在使用主题后自动删除来自主题的消息。我有什么方法可以跟踪消费的消息吗?我不想手动删除它。

0 投票
5 回答
2068 浏览

apache-storm - Kafka Spout 多次读取相同的消息

如果我在我的 Storm 拓扑中增加 Kafka Spout 的并行度,我怎样才能阻止它多次读取同一主题中的同一消息?

0 投票
1 回答
2765 浏览

java - 在 kafka.apache.org 上运行示例时,Kafka 消费者未收到消息

我对 Kafka 很陌生,并试图在https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example上运行消费者示例,但它没有收到任何消息。

这是 Eclipse 控制台中的输出:

以下是我的消费者代码

这是 ConsumerMsgTask

这是我的 ProducerDemo

}

我通过以下命令创建了主题“test1”

这是使用在 CentOS 版本 6.5(最终版)上运行的 Kafka 0.8.2,使用 OpenJDK“1.7.0_45”。