问题标签 [apache-kafka-connect]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
856 浏览

elasticsearch - Kafka Connect 到 ElasticSearch 是否可以进行升级

我收到了最终出现在 Kafka 中的事件。从这些事件中,我使用 Kafka Streams 应用程序获取 id,并将其作为另一个主题中的一对 (id, 1) 发布回 Kafka。然后我想看看 id 是否已经存在于 ElasticSearch 中,如果存在则更新其计数器,否则在 ElasticSearch 中创建一条新记录,其 id 来自 Kafka,计数器设置为 1,即记录的更新 (id, 1)到 ES。

我希望为此使用 Kafka Connect to ElasticSearch,但如果可能的话,它似乎并不那么简单。我可以看到向 ​​ES 添加记录是可行的,但是与现有记录合并似乎是我还没有发现的事情。这是否已经可行,如果可以,如何实现,如果不可以,是否计划在附近的版本中实现?

0 投票
2 回答
4471 浏览

docker - 第二个和第三个分布式 Kafka 连接器工作人员无法正常工作

使用 3 个 Kafka 集群和一个相同的 Zookeeper 集群,我启动了一个分布式连接器节点。这个节点成功运行了一个任务。然后我提出了第二个连接器,这似乎是因为任务中的一些代码确实运行了。然而,它似乎并没有保持活动状态(尽管没有抛出任何错误,但由于缺乏预期的活动而观察到不活动状态,而第一个连接器继续正常运行)。当我在每个连接器节点上调用 URLhttp://localhost:8083/connectors/mqtt/tasks时,它告诉我连接器有一个任务。我希望这是两个任务,每个节点/工作者一个。(目前工作人员配置说tasks.max = 1,但我也尝试将其设置为 3。

当我尝试调出第三个连接器时,出现错误:

尝试从 shell 再次调用连接器 POST 方法会返回错误:

我还尝试升级到今天发布的 Apache Kafka 0.10.1.1。我仍然看到问题。每个连接器都在由单个映像定义的隔离 Docker 容器上运行。它们应该是相同的。

问题可能是我试图http://localhost:8083/connectors在每个工作人员上运行 POST 请求,而我只需要在单个工作人员上运行一次,然后该连接器的任务将自动分配给其他工作人员。如果是这种情况,我该如何分配任务?我目前将最大值设置为三个,但似乎只有一个在单个工作人员上运行。

更新

我最终使用 Yuri 建议的基本相同的方法让事情运行起来。我给每个工作人员一个唯一的组 ID,然后给每个连接器任务赋予相同的名称。这允许三个连接器及其单个任务共享一个偏移量,因此在接收器连接器的情况下,它们从 Kafka 消费的消息不会重复。它们基本上作为独立连接器运行,因为工作人员具有不同的组 ID,因此不会相互通信。

如果连接器工作人员具有相同的组 ID,则您不能添加多个具有相同名称的连接器。如果您为连接器指定不同的名称,它们将具有不同的偏移量并使用重复的消息。如果您在同一个组中有三个工作人员,一个连接器和三个任务,理论上您将有一个理想的情况,其中任务共享一个偏移量并且工作人员确保任务始终运行且分布良好(每个任务消耗一个唯一的集合的分区)。在实践中,连接器框架不会创建多个任务,即使 tasks.max 设置为 3 并且主题任务正在使用时有 25 个分区。

如果有人知道我为什么会看到这种行为,请告诉我。

0 投票
0 回答
109 浏览

elasticsearch - 卡夫卡将插入到字段与分析器连接起来

我有一个简单的弹性分析仪领域。我还有一个 Kafka 主题,其中包含相同类型的 AVRO 记录(AKA 字符串/文本)。首先,我在包括此分析器在内的所有字段的弹性上创建一个索引,

指数:

Avro 文件由 kafka-connect (with schema registry) 创建。从包含一个字段 mUrl 的 sqlserver 可以为空 nvarchar(500)

错误:

[test] 的映射器与其他类型中的现有映射冲突:
[mapper [test] 具有不同的 [analyzer]]

0 投票
0 回答
2015 浏览

json - Kafka 连接与 MongoDB 连接器

我尝试将 Apache Kafka Connect 与 MongoDB 接收器连接器(连接器)一起使用

当我使用 Avro 格式时,除了一个问题外,它都可以工作,我必须用一个分区创建主题,因为连接器使用 record.kafkaOffset() 作为新 Mongo 记录的 _id(所以对于多个分区,我得到不同的相同 id记录)。

我该如何解决?

我想用 Json 测试它,所以我为此创建了一个新主题。并将转换器配置更改为 JsonConverter。当我运行它时,我收到以下错误:

java.lang.ClassCastException:java.util.HashMap 无法在 org.apache.kafka.connect.mongodb.MongodbSinkTask.put(MongodbSinkTask.java:106) 处转换​​为 org.apache.kafka.connect.data.Struct。 org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:176) 在 org.apache.kafka.connect.runtime 的 apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:280)。 WorkerSinkTaskThread.iteration(WorkerSinkTaskThread.java:90) 在 org.apache.kafka.connect.runtime.WorkerSinkTaskThread.execute(WorkerSinkTaskThread.java:58) 在 org.apache.kafka.connect.util.ShutdownableThread.run(ShutdownableThread.java: 82)

此连接器是否适用于 Json?

0 投票
2 回答
3872 浏览

apache-kafka - Kafka 连接器 - 无法停止重新平衡

我正在使用kafka connector confluent 3.0.1版本。我创建了一个名为new-group的新组,上面大约有​​20个主题。这些主题大部分都很忙。但是很遗憾,当我启动连接器框架时,系统无法停止重新平衡,所有主题的重新平衡大约需要 2 分钟。我不知道原因。一些错误信息是:

我不知道这是否与持续再平衡有关。

我知道如果 KafkaConsumer.poll() 比配置的 timeout 长,kafka 将撤销分区并因此触发重新平衡,但我很确定每次轮询不会那么长。任何人都可以给我一些线索?

0 投票
0 回答
333 浏览

java - 如何以编程方式/动态创建包含使用 Kafka 和 Java 的工作人员的数据管道?

在一个非常高的层次上,我将提供一个多租户平台界面,以允许用户创建“项目”——为各种生产者类型转换为 Kafka 主题——RSS 提要、SQL Server 查询和文件位置。

例如,要检索 RSS 数据,我相信我们需要创建 Kafka 作品以按计划获取 RSS 数据 - 因此需要动态添加新的多线程工作人员(或工作人员) - 而不是通过配置文件。

应用程序抽象层将用户与 Kafka 和 ElasticSearch 分开。我不能允许所有用户通过控制中心进行访问。

在我可能提供某种类型的托管服务的另一个用例中,我可以使用 CC。

鉴于这种用途,我想了解一些有关实现此目的的适当方法的详细信息——因为似乎仍然存在一些差距——即我们不能使用配置文件,因为需要动态创建主题。

如何使用 Kafka 和 Java 以编程方式/动态创建数据管道和工作程序?

0 投票
3 回答
3583 浏览

apache-kafka - Kafka Connect offset.storage.topic 未收到消息(即如何访问 Kafka Connect 偏移元数据?)

我正在设置一个 Kafka Connect 分布式模式应用程序,该应用程序将是一个 Kafka 到 S3 管道。我正在使用 Kafka 0.10.1.0-1 和 Kafka Connect 3.1.1-1。到目前为止,事情进展顺利,但对于我正在使用的大型系统而言,重要的一个方面需要了解 Kafka -> FileSystem 管道的偏移信息。根据文档,offset.storage.topic配置将是分布式模式应用程序用于存储偏移信息的位置。考虑到 Kafka 如何在“新”Kafka 中存储消费者偏移量,这是有道理的。但是,在使用 FileStreamSinkConnector 进行一些测试后,没有任何内容被写入 myoffset.storage.topic默认值:connect-offsets.

具体来说,我使用 Python Kafka 生产者将数据推送到主题,并使用 Kafka Connect 和 FileStreamSinkConnect 将数据从主题输出到文件。这可以正常工作并按照我期望连接器的行为表现。此外,当我停止连接器并启动连接器时,应用程序会记住主题中的状态并且没有数据重复。但是,当我去offset.storage.topic查看存储了哪些偏移元数据时,主题中没有任何内容。

这是我使用的命令:

kafka-console-consumer --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --topic connect-offsets --from-beginning

让此命令运行一分钟左右后,我收到此消息:

Processed a total of 0 messages

总结一下,我有两个问题:

  1. 为什么即使我的分布式应用程序正确保持状态,偏移元数据也没有写入应该存储它的主题?
  2. 如何访问 Kafka Connect 分布式模式应用程序的偏移元数据信息?这对于我的团队对我们系统的 Lambda 架构实施来说是 100% 必要的。

谢谢您的帮助。

0 投票
1 回答
1868 浏览

streaming - Zeppelin 6.5 + 用于结构化流 2.0.2 的 Apache Kafka 连接器

我正在尝试运行一个 zeppelin 笔记本,其中包含带有 Kafka 连接器的 spark 结构化流示例。

这是我的环境:

这是我的飞艇笔记本中的代码:

这是我运行笔记本时遇到的错误:

import org.apache.spark.sql.functions.{explode, split} java.lang.ClassNotFoundException:找不到数据源:kafka。请在https://cwiki.apache.org/confluence/display/SPARK/Third+Party+Projects找到软件包 在 org.apache.spark.sql.execution.datasources.DataSource.lookupDataSource(DataSource.scala:148) 在 org.apache.spark.sql.execution.datasources.DataSource.providingClass$lzycompute(DataSource.scala:79) 在 org .apache.spark.sql.execution.datasources.DataSource.providingClass(DataSource.scala:79) 在 org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:218) 在 org.apache.spark .sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:80) at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:80) at org.apache.spark.sql .execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:30) at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:124) ... 86 被忽略 原因:java.lang.ClassNotFoundException : 卡夫卡。DefaultSource 在 scala.reflect.internal.util.AbstractFileClassLoader.findClass(AbstractFileClassLoader.scala:62) 在 java.lang.ClassLoader.loadClass(ClassLoader.java:424) 在 java.lang.ClassLoader.loadClass(ClassLoader.java:357)在 org.apache.spark.sql.execution.datasources.DataSource$$anonfun$5$$anonfun$apply$1.apply(DataSource.scala:132) 在 org.apache.spark.sql.execution.datasources.DataSource$$anonfun $5$$anonfun$apply$1.apply(DataSource.scala:132) 在 scala.util.Try$.apply(Try.scala:192)132)在 org.apache.spark.sql.execution.datasources.DataSource$$anonfun$5$$anonfun$apply$1.apply(DataSource.scala:132) 在 scala.util.Try$.apply(Try.scala:192 )132)在 org.apache.spark.sql.execution.datasources.DataSource$$anonfun$5$$anonfun$apply$1.apply(DataSource.scala:132) 在 scala.util.Try$.apply(Try.scala:192 )

任何帮助建议将不胜感激。

谢谢

0 投票
0 回答
231 浏览

hdfs - 如何将火花对 rdd 作为文件存储到 HDFS 中?

嗨,我创建了一个包含 3 个分区和 2 个副本的 kafka 主题。我尝试从 kafka 发布消息/记录以触发流式传输(对于某些进程),然后将数据存储到 HDFS 中。我试图将 RDD 对存储为文本文件,但不工作。

此代码不起作用,

控制台输出:

事实上我的 pom.xml

0 投票
0 回答
1328 浏览

elasticsearch - logstash kafka 输入性能/配置调优

我使用 logstash 将数据从 Kafka 传输到 Elasticsearch,但出现以下错误:

我尝试调整会话超时(至 30000)和最大轮询记录(至 250)。

该主题每秒以 avro 格式生成 1000 个事件。有 10 个分区(2 个服务器)和两个 logstash 实例,每个实例有 5 个使用者线程。

我对每秒约 100-300 个事件的其他主题没有任何问题。

我认为这应该是一个配置问题,因为我在 Kafka 和 Elasticsearch 之间也有第二个连接器,它工作正常(confluent 的 kafka-connect-elasticsearch)

主要目的是比较 kafka connect 和 logstash 作为连接器。也许任何人也有一些一般的经验?