问题标签 [apache-kafka-mirrormaker]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
0 回答
455 浏览

apache-kafka - 设置多数据中心 Kafka 集群

我正在使用多 DC 集群设置 Kafka 集群。目的是确保如果一个 DC 出现故障,生产者和消费者仍然能够继续运营而不会出现任何问题。我遇到了两个选项,但不确定有什么区别以及它是如何工作的。

选项1:设置多个zookeeper集群(每个DC一个集群)

设置多个 Zookeeper,每个 Zookeeper 将在 DC 中拥有一组 broker。在这种情况下,我真的会同时获得主动-主动和灾难恢复吗?如果 1 DC 下降,消费者会发生什么。

选项 2:使用源和目标设置 Mirror maker

我知道这是一个集群到另一个集群的复制。但是,我如何从消费者或生产者的角度指出这两个集群?它会自动处理还是我应该手动处理?

对这些选项的任何解释表示赞赏。

0 投票
1 回答
500 浏览

apache-kafka - 运行两个 MirrorMaker 2.0 实例,停止更新主题的数据复制

我们使用 mirror-maker 2.0 尝试了以下场景,并想知道是否预期第二个场景的输出。

场景 1.) 我们使用以下属性和启动命令运行单个 mirror-maker 2.0 实例。

启动命令:/usr/bin/connect-mirror-maker.sh connect-mirror-maker.properties &

验证:在源集群(a)上创建新主题“test”,为源集群上的主题生成数据并在目标集群(b)上运行消费者,主题“a.test”以验证数据复制。

观察:按预期工作得很好。

场景 2.) 使用与上述相同的属性再运行一个 MirrorMaker 2.0 实例。

启动命令:/usr/bin/connect-mirror-maker.sh connect-mirror-maker.properties &

验证:在源集群上再创建一个“test2”主题,在源集群上生成数据到主题并在目标集群(b)上运行消费者,主题“a.test2”以验证数据复制。

观察:MM2 能够在目标集群上复制主题,a.test2 存在于目标集群 b 上,但消费者没有得到任何记录来消费。

在较新的 mirror-maker 2.0 实例日志上,主题复制后,mirror-sourceconnector 任务没有重新启动,这在主题复制后在单个实例中重新启动。

注意:没有看到错误日志。

0 投票
0 回答
570 浏览

apache-kafka - 使用 MTLS 配置连接镜像制造商,仅用于目标,纯文本用于源

我正在运行mirror maker 2.0(来自kafka 2.6的二进制文件)并且我的源集群配置为纯文本,而我的目标集群启用了ssl

我已经创建了正确的密钥,并确保第一个连接可以使用以下命令正常工作,

ssl_properties 文件如下,

security.protocol=SSL
ssl.truststore.location=/opt/keys/client.truststore
ssl.truststore.password=123456
ssl.keystore.location=/opt/keys/jacek.keystore
ssl.keystore.password=123456 ssl.key .password=密码

现在,当我尝试运行镜像制造商时,它失败并出现以下错误,镜像制造商配置文件中有类似的配置,如下所示,

错误:

org.apache.kafka.common.errors.TimeoutException:调用(callName=fetchMetadata,deadlineMs=1605011994642,trys=1,nextAllowedTryMs=1605011994743)在 1605011994643 1 次尝试后超时原因:org.apache.kafka.common .errors.TimeoutException:等待节点分配超时。调用:fetchMetadata [2020-11-10 12:40:24,642] INFO App info kafka.admin.client for adminclient-8 未注册(org.apache.kafka.common.utils.AppInfoParser:83)[2020-11-10 12 :40:24,643] INFO [AdminClient clientId=adminclient-8] 元数据更新失败 (org.apache.kafka.clients.admin.internals.AdminMetadataManager:235) org.apache.kafka.common.errors.TimeoutException: Call(callName= fetchMetadata,deadlineMs=1605012024643,trys=1,nextAllowedTryMs=-9223372036854775709)在 9223372036854775807 次尝试后超时 原因:org.apache.kafka.common.errors。TimeoutException:AdminClient 线程已退出。调用:fetchMetadata [2020-11-10 12:40:24,644] INFO Metrics scheduler closed (org.apache.kafka.common.metrics.Metrics:668) [2020-11-10 12:40:24,644] INFO Closing Reporter org .apache.kafka.common.metrics.JmxReporter (org.apache.kafka.common.metrics.Metrics:672) [2020-11-10 12:40:24,644] INFO Metrics 记者关闭 (org.apache.kafka.common. metrics.Metrics:678)[2020-11-10 12:40:24,645] ERROR 由于错误而停止(org.apache.kafka.connect.mirror.MirrorMaker:304)org.apache.kafka.connect.errors.ConnectException:无法连接和描述 Kafka 集群。检查工作人员的代理连接和安全属性。在 org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:70) 在 org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:70) kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) 在 org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) 在 org.apache.kafka.connect.util。 ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:64) ... 7 更多原因:org.apache.kafka.common.errors.TimeoutException: Call(callName=listNodes, deadlineMs=1605012024641, Trys=1, nextAllowedTryMs=1605012024742) 超时在 1605012024642 尝试 1 次后原因:org.apache.kafka.common.errors.TimeoutException:等待节点分配超时。调用:listNodes apache.kafka.common.errors.TimeoutException:调用(callName=listNodes,deadlineMs=1605012024641,trys=1,nextAllowedTryMs=1605012024742)在 1605012024642 1 次尝试后超时原因:org.apache.kafka.common.errors .TimeoutException:等待节点分配超时。调用:listNodes apache.kafka.common.errors.TimeoutException:调用(callName=listNodes,deadlineMs=1605012024641,trys=1,nextAllowedTryMs=1605012024742)在 1605012024642 1 次尝试后超时原因:org.apache.kafka.common.errors .TimeoutException:等待节点分配超时。调用:listNodes

0 投票
1 回答
640 浏览

apache-kafka - 如何部署 Strimzi KafkaMirrorMaker

我正在使用 srimzi 运算符并在 k8s 上运行 kafka 集群。我想使用 Kafka Mirror Maker,我使用 CRD yml 部署了 Kafka Mirror Maker,但是我的 KMM pod 处于 crashLoopBack 状态。我没有得到问题是什么?这是我的 Kafka MirrorMaker yml

还有我的 kafka-cluster yml :

我的第二个 Kafka 集群:

我的 pod 列表及其状态:

Pod 日志:

这是我的 kafka 集群的 svc:

0 投票
1 回答
690 浏览

apache-kafka - 在源 kafka 集群上禁用 mirrormaker2 偏移同步主题

我们正在使用MirrorMaker2将一些主题从一个 kerberized kafka 集群复制到另一个 kafka 集群(严格单向)。我们不控制源 kafka 集群,我们只能访问描述和阅读要使用的特定主题。

MirrorMaker2mm2-offset-syncs在源集群中创建并维护一个主题(AdminClient因为 MM2 需要授权才能在源集群中创建和写入这些主题,或者通过 执行操作AdminClient,所以我试图理解为什么/如果我们在我们的场景中需要这些机制。

我的问题是:

  1. 在严格的单向场景中,这个源集群偏移同步主题对 Mirrormaker 有什么用处?
  2. 如果确实是多余的,是否可以禁用它或操作 mm2 而无需访问创建/生成该主题?
  3. 如果 ACL 和 Config 传播被禁用,是否可以安全地假设它AdminClient不用于其他任何事情?

在 MirrorMaker 代码中,它在启动时很容易创建偏移同步主题,然后由. 同样的情况也发生在.MirrorSourceConnectorMirrorSourceTaskMirrorSourceConnector

我没有找到关闭这些功能的方法,但老实说,我的思路中可能遗漏了一些东西。

0 投票
1 回答
256 浏览

apache-kafka - 从 Confluent Replicator 迁移到 Apache Mirror Maker 2.0 时保持相同的偏移量

我们目前正在尝试将 Confluent 复制器迁移到 Apache Open Source Mirror Maker v2.0。我们面临一个问题,当镜像制造商在同一主题上启动时,已经被复制器复制的消息再次被复制。这不应该发生,因为消息在目标集群中被复制。以下是更多详细信息:

  1. RCA:replicator 为复制消息分配一个消费者组。这个消费者组维护了源主题的偏移量。但是我们无法将相同的消费者组分配给镜像制造商 2 中的消费者配置。
  2. Mirror Maker 1.0:可以在 consumer.properties 文件中分配作为同一个消费者组工作的消息,并且在复制器停止后立即选择消息。
  3. 尝试source.cluster.consumer.group.id在 mirror maker 2.0 中以所有可用选项(在集群模式、连接独立和连接分布式模式下)运行和配置,但 mirror maker 2.0 在复制消息时将使用者组 ID 分配为 null。

如果有人做过同样的事情并试图与 mirror maker 2.0 保持相同的偏移量,那么任何指针。

0 投票
2 回答
753 浏览

apache-kafka - Kafka:将主题 A 复制到主题 B,同时对记录应用转换

我需要将集群 A 上的主题的记录镜像到集群 B 上的主题,同时在记录中添加一个字段,因为它们被代理(例如InsertField

我没有控制集群 A(但可能需要更改)并且完全控制集群 B。

我知道集群 A 正在发送序列化的 JSON。

我正在使用MirrorMaker API和 Kafka 连接来进行镜像,并且我正在尝试使用InsertField转换在记录中添加数据,因为它们被代理。

我的配置如下所示:

此代码将失败并显示错误说明:

org.apache.kafka.connect.errors.DataException:[字段插入]仅支持结构对象

有没有办法在不编写自定义转换的情况下实现这一点(我使用的是 Python,我不熟悉 Java)

非常感谢

0 投票
1 回答
209 浏览

apache-kafka - 从 Mirror maker 1.0 迁移到 Mirror maker 2.0 时保持相同的消费者组

试图将mirror maker 1.0升级到mirror maker 2.0,并观察到在关闭mirror maker 1.0并启动mirror maker 2.0时,消费者从一开始就开始收到消息(重复)。发现这篇文章强调了同样的问题。
一个。如何通过consumer group namein mirror maker 2.0 以保持与consumer group name在 mirror maker 1.0 中使用的相同?
湾。auto.offset.resetmirror maker 2.0如何配置属性?

0 投票
1 回答
584 浏览

apache-kafka - kafka mirrormaker 2 自定义分区器

我正在尝试确定是否可以将自定义分区器与 mirrormaker 2 一起使用,因此在复制到目标集群时使用了我的自定义分区器。根据这里的文档https://github.com/apache/kafka/tree/trunk/connect/mirror应该可以使用配置格式 target-alias.producer.* 覆盖 mm2 生产者设置,我已经尝试了各种格式,例如(源和目标是我的集群别名)

我可以在 mirrormaker 2 日志中看到分区器已成功加载,并且在生产者配置被转储的地方,似乎设置了自定义分区器类。但是,根据我的分区程序类(和观察)中的调试日志,它仅在 mirrormarker 2 生成到内部主题(如 mm2-offsets.source.internal)时被调用,而不是在生成到被复制到的实际主题时被调用。

谁能帮我理解上面的行为?我假设有单独的生产者客户端,一些负责编写复制的消息,一些负责更新内部主题,但如果是这样,不确定为什么自定义分区器只适用于后者。

0 投票
0 回答
206 浏览

apache-kafka - Confluent Replicator 的偏移转换如何处理时钟偏差?

我一直在阅读Confluent Replicator 的文档以评估其主动-主动多数据中心复制支持,虽然文档清楚地说明了它将消费者偏移量从一个 DC 转换到另一个 DC 的机制,但我仍然不清楚的是该机制如何处理时钟偏差。

通过阅读数据恢复白皮书可以看出,用于翻译的消息时间戳实际上CreateTime是来自生产者的时间戳。然而,所有关于如何执行偏移转换的示例似乎都暗示时间戳是单调递增的,例如这个复制器博客文章

使用时间戳的消费者偏移量转换

为什么这是一个问题?因为很难(阅读:几乎不可能)保证在商用硬件上运行的分布式系统中的时间戳单调增加,即使使用专门的硬件(例如 AWS Time Sync Service),您能做的最好的事情就是尽量减少偏差(如果您不这样做)想要使用类似于 Google Spanner 的 TrueTime 的全球时间服务。

因此,让我们考虑最简单的场景,其中 DC-1 和 DC-2 已经使用单独的主题设置了主动-主动复制,例如 DC-1 中的主题 dc1-foo 复制到 DC2 中的 dc1-foo 和 dc2-foo 从DC2 被复制到 DC-1 中的 d2-foo。我们还假设数据中心从一开始就正常运行,但是 dc-1-foo 的复制经历了一个小问题,导致一些记录在 DC-2 中重复,使得 DC-2 中的偏移量大于DC-1 中的偏移量。

给定 DC-1 中当前提交的偏移量为 O1,DC-2 为 O3,并且给定 dc1-foo 的内容(偏移量:TS 格式)为:

  • DC-1:[O1:T3]、[O2:T2]、[O3:T3]、[O4:T1]、[O5:T3]
  • DC-2:[O3:T3]、[O4:T2]、[O5:T3]、[O6:T1]、[O7:T3]

当 DC-1 中的消费者提交...

  1. 偏移量 O2,DC-2 偏移量是否转换为 O4?
  2. 偏移量 O5,DC-2 偏移量是否转换为 O3、O5 或 O7?

我的猜测是转换后的偏移量是 O4 和 O3,但只是想确认一下。