问题标签 [apache-kafka-connect]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
1415 浏览

apache-kafka - 在分布式模式下运行 kafka connect 时的问题

我们正在测试分布式模式下的 kafka connect,以将主题记录从 kafka 拉到 HDFS。我们有两个盒子。kafka 和 zookeeper 守护进程正在其中运行的一种。我们在此框中保留了一个 kafka connect 实例。我们有另一个盒子,其中存在 HDFS 名称节点。我们在这里保留了另一个 kafka connect 实例。

我们在第一个盒子中启动了 kafka、zookeeper 和 kafka connect。我们也在第二个盒子里开始了 kafka connect。现在根据 confluent 文档,我们必须使用 REST API 启动 HDFS 连接器(或任何其他连接器)。因此,在这两个框中启动 kafka connect 后,我​​们尝试通过 REST API 启动连接器。我们尝试了以下命令:-

只要我们在这里按回车,我们就会得到以下响应:

etc/kafka/ 的 connect-distributed.properties 文件位于两个 kafka 连接节点的下方。我们也创建了上述三个主题(connect-offsets、connect-configs、connect-status)

这里有什么问题?我们是否缺少一些东西来以分布式模式启动 kafka 连接以使用 HDFS 连接器。独立模式下的 kafka connect 工作正常。

0 投票
2 回答
12244 浏览

amazon-s3 - Kafka Connect Distributed tasks.max 配置设置的理想值?

我希望生产和部署我的 Kafka Connect 应用程序。但是,我有两个关于 tasks.max 设置的问题,这是必需的并且非常重要,但是对于实际将此值设置为什么细节是模糊的。

如果我有一个包含 n 个分区的主题,我希望从中消费数据并写入某个接收器(在我的情况下,我正在写入 S3),我应该将 tasks.max 设置为什么?我应该将其设置为 n 吗?我应该将其设置为 2n 吗?直觉上,我似乎想将值设置为 n ,这就是我一直在做的事情。

如果我更改我的 Kafka 主题并增加该主题的分区怎么办?如果我将其设置为 n,我将不得不暂停我的 Kafka 连接器并增加 tasks.max?如果我设置了 2n 的值,那么我的连接器应该自动增加它运行的并行度吗?

0 投票
4 回答
60946 浏览

java - Kafka Connect 堆空间不足

启动 Kafka Connect ( connect-standalone) 后,我的任务在启动后立即失败:

在一些 Kafka 文档中提到了堆空间,告诉你用“默认”尝试它,只有在有问题时才修改它,但没有修改堆空间的说明。

0 投票
1 回答
584 浏览

apache-flink - 如何从 flink 访问/读取 kafka 主题数据?

我正在尝试从 flink 读取 kafka 数据,因为我是 kafka 和 flink 的新手,我不知道如何连接它们。

0 投票
1 回答
231 浏览

apache-kafka - kafka connect是否支持分布式模式下的主动-主动模式?

在分布式模式下测试 kafka connect 时,我们发现它在待机模式下工作,而不是在活动模式下工作。

如果我们有两个成员集群,则一次只有一个为 HDFS 接收器连接器工作,另一个在待机状态下等待。如果第一个死掉或被中断,那么第二个等待的将接手操作。

它是否支持在一个集群组 ID 下通过 HDFS 的不同接收器名称一起工作的方式,如果一个发生故障,刷新操作根本不会中断,因为第二个正在运行并将数据推送到 HDFS?

0 投票
1 回答
1517 浏览

apache-kafka - Kafka 消费者组 ID 和消费者重新平衡问题

我在我的生产服务器中使用 Kafka0.10.0和 zookeeper 3.4.6。我有 20 个主题,每个主题大约有 50 个分区。我共有 100 个消费者,每个消费者订阅了不同的主题和分区。所有消费者都拥有相同的 groupId。那么,如果为特定主题添加或删除消费者,那么附加到不同主题的消费者也会经历重新平衡吗?

我的消费者代码是:

任何帮助都会得到帮助。

0 投票
2 回答
3913 浏览

java - Kafka-connect sink 任务忽略文件偏移存储属性

我在使用 Confluent JDBC 连接器时遇到了非常奇怪的行为。我很确定它与 Confluent 堆栈无关,而是与 Kafka-connect 框架本身有关。

因此,我将offset.storage.file.filename属性定义为默认值/tmp/connect.offsets并运行我的接收器连接器。显然,我希望连接器在给定文件中保留偏移量(它在文件系统上不存在,但应该自动创建,对吧?)。文档说:

offset.storage.file.filename 用于存储连接器偏移量的文件。通过将偏移量存储在磁盘上,可以在单个节点上停止和启动独立进程,并从之前停止的位置恢复。

但是卡夫卡的行为方式完全不同。

  1. 它检查给定文件是否存在。
  2. 它不是,Kafka 只是忽略它并在 Kafka 主题中保留偏移量。
  3. 如果我手动创建给定文件,则无论如何读取都会失败(EOFException)并且偏移量会再次保留在主题中。

这是一个错误,还是更有可能是我不明白如何使用这种配置?我了解两种持久偏移量的方法之间的区别,并且文件存储更适合我的需求。

0 投票
1 回答
171 浏览

apache-kafka - 如何删除在多节点多集群 HA kafka 中运行的主题

我已经在多节点多集群环境中设置了 kafka 版本 2.11-0.10.1.0。在 kafka server.properties 中,我已经在所有 3 台机器中添加了delete.enable.topic=true 。

我正在使用删除主题的命令是 ./bin/kafka-topic.sh --zookeeper ip1:2181,ip2:2181,ip3:2181 --delete --topic topicname

但它没有删除,显示主题名称 - 删除标记

所以每次我清除 kafka-logs 和 zookeeper 日志以删除主题。

任何有任何想法使用命令提示符删除的人。

0 投票
1 回答
1506 浏览

apache-kafka - 如何将 mqtt 的有效负载数据转换为 kafka 字符串类型

现在我正在尝试从 mqtt 消息连接到 kafka(实际上是 kafka 中的火花流)

我使用了这个连接器https://github.com/evokly/kafka-connect-mqtt

和 Spark-2.1.0,Kafka - 0.10.1.1

像这样的 Spark 流输出

和生产者代码

和火花流卡夫卡消费者代码

我怎么能得到字符串而不是字节?请帮助伙计们

0 投票
1 回答
318 浏览

amazon-s3 - 在 AWS EMR 上使用 qubole/streamx 时的 AWS S3 访问问题

我使用 qubole/streamx 作为 kafka 接收器连接器来使用 kafka 中的数据并将它们存储在 AWS S3 中。我在 AIM 中创建了一个用户,权限是AmazonS3FullAccess. 然后在 hdfs-site.xml 中设置密钥 ID 和密钥,其中 dir 分配在quickstart-s3.properties.

如下配置:

快速启动-s3.properties:

hdfs-site.xml:

connect-standalone ./connect-standalone.properties ./quickstart-s3.properties并在用于启动接收器连接器时出现此错误。