问题标签 [kafka-consumer-api]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
0 回答
1115 浏览

java - 自动重新连接在kafka中不起作用

我使用的是 kafka-0.8.1.1 版本,在这个版本中,自动重新连接不能通过 java 代码工作。我有属性文件

当我启动消费者线程时,它已连接到 kafka 服务器,在运行服务器的线程之间失去与 kafka 服务器的连接 2 小时。如果它重新连接 kafka 消费者没有收到任何消息,并且它也不会在流动。

我们如何检测它丢失连接并自动重新连接消费者线程。

提前致谢

0 投票
1 回答
3716 浏览

java - 发送字节数组到storm kafka bolt

我写了一个风暴拓扑。我基本上想以字节数组的形式将 avro 模式中的元组发送到 kafka 主题。

这就是我设置螺栓的方式:

这就是我转换为字节数组的方式

当我以以下方式发出元组时,我在 kafka 主题中看不到任何内容(向 kafka 发送字节流):

但相反,如果我将字节数组转换为字符串,然后转换为 kafka 主题,它可以工作:

如下所示:

我究竟做错了什么?如何使用 Storm kafka bolt 将字节流发送到 kafka 主题?

0 投票
1 回答
2101 浏览

spring - Spring整数kafka是否支持动态主题创建

我是弹簧整合 kafka 的新手,我了解 kafka-oubound-channel 适配器。但是有没有一种方法可以让我无需在上下文 xml 中设置而以编程方式创建主题?

即:根据我给转换器的消息,我想将消息发布到为此消息类型创建的 kafka 主题。

更新:

下面是我最终做的事情。将欢迎任何更好的解决方案。

-->

0 投票
1 回答
4189 浏览

c# - 配置 kafka-net 停止发送最新消息

我在带有 kafka-net 插件的 Red Hat VM 上使用 kafka 0.8.1.1。如何配置我的消费者以停止接收来自 kafka 的早期消息?

我的消费者代码:

0 投票
1 回答
1837 浏览

java - Kafka 简单消费者和消息大小 - 它是否读取部分消息?

我正在使用简单的消费者从 kafka 主题中读取数据,并且对获取大小与主题中消息大小的关系有疑问。

可以说,我在主题中的每条消息都是 10 kb。

当我从偏移量 0 开始消费时,提取大小为 16 kb(只是为了这个问题),它是否读取了 1 条完整消息和另一条 6kb 的部分消息?

链接指出以下内容,但我没有看到它发生

作为一种优化,允许服务器在消息集的末尾返回部分消息。客户应该处理这种情况。

0 投票
1 回答
5351 浏览

java - 无法使用 kafka Producer API 与 kafka 服务器通信

我已经在单个节点上设置了 kafka 并启动了 zookeeper 和 kafka 服务器。我在控制台上为内部生产者和消费者测试了它,它运行良好。但是当我在控制台上运行内部 kafka 消费者和我的自定义生产者时不起作用。

下面是我的生产者类

当控件到达 producer.send() 时,它会在 3 次尝试后停止,但出现以下异常

0 投票
1 回答
1807 浏览

java - 卡夫卡高级消费者

我正在尝试使用高级消费者批量读取 Kafka 主题中的消息。在这批读取期间,我的线程必须在某个时候停止。

要么,一旦主题中的所有消息都用完。或在即将读取消息时获取最大偏移量并停止直到达到最大偏移量。

我尝试在高级消费者中使用代码,但 KafkaStream 上的迭代器方法似乎是一个阻塞调用,并等待另一条消息进来。

所以3个问题,

  1. 我怎么知道没有更多消息要从该主题中读取?

  2. 如果我对上述问题有答案,我该如何阻止它再听这个话题?

  3. 有没有办法在批量读取开始时找到最大偏移量(我认为简单的消费者可以做到这一点)并使高级消费者在那个时候停止?

0 投票
1 回答
4673 浏览

hadoop - Apache Kafka 中的魔术字节

有人知道 Kafka 中的魔术字节吗?它用于什么以及如何使用?

生产者可以将任何魔术字节值发送到 Kafka 分区吗?

请分享有关此的任何文档。

0 投票
1 回答
6472 浏览

java - Kafka 简单消费者间歇性丢失消息

我有一个 Kafka 应用程序,我一直在使用 kafka-console-consumer.sh 来消费消息,如下所示:

它提供了我通过 Kafka 消费者写入 Kafka 代理的所有消息,没有任何遗漏。

最近,我将应用程序部署在无法访问 zookeeperhost 的不同环境中(由于某种原因)。所以我使用 kafka-simple-consumer-shell.sh 代替如下:

但是有了这个,我看到很少有消息(大约 5000 条中的 2-4 条)丢失。有人可以解释一下 kafka-simple-consumer-shell.sh 如何读取消息。

我怀疑有些消息可能会发送到某个不同的分区,因为我只是从分区 0 读取,所以我不会每次都收到所有消息。但是我不知道如何检查有多少个分区?其他分区的 id 是什么?我试过 1 但它不起作用。

有人可以帮忙吗。

0 投票
1 回答
353 浏览

apache-kafka - kafka 消费者代码没有完全运行

我正在尝试一个简单的 kafka 生产者消费者客户端 api,我的生产者类工作正常,因为我可以从控制台看到消费者中的消息但是当我运行消费者代码时没有显示任何内容,我不知道问题是什么或我在哪里我做错了

这是生产者代码是 -

消费阶层是——

为了检查,我在 testConsumer() 方法中使用 sysout 应用了 2 次检查,所以在运行时只显示 check1 即代码没有到达 check2,我认为有一些问题consumer.createMessageStreams(topicCount);,所以是什么原因以及如何解决它?