问题标签 [apache-kafka-mirrormaker]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
apache-kafka - 设置多数据中心 Kafka 集群
我正在使用多 DC 集群设置 Kafka 集群。目的是确保如果一个 DC 出现故障,生产者和消费者仍然能够继续运营而不会出现任何问题。我遇到了两个选项,但不确定有什么区别以及它是如何工作的。
选项1:设置多个zookeeper集群(每个DC一个集群)
设置多个 Zookeeper,每个 Zookeeper 将在 DC 中拥有一组 broker。在这种情况下,我真的会同时获得主动-主动和灾难恢复吗?如果 1 DC 下降,消费者会发生什么。
选项 2:使用源和目标设置 Mirror maker
我知道这是一个集群到另一个集群的复制。但是,我如何从消费者或生产者的角度指出这两个集群?它会自动处理还是我应该手动处理?
对这些选项的任何解释表示赞赏。
apache-kafka - 运行两个 MirrorMaker 2.0 实例,停止更新主题的数据复制
我们使用 mirror-maker 2.0 尝试了以下场景,并想知道是否预期第二个场景的输出。
场景 1.) 我们使用以下属性和启动命令运行单个 mirror-maker 2.0 实例。
启动命令:/usr/bin/connect-mirror-maker.sh connect-mirror-maker.properties &
验证:在源集群(a)上创建新主题“test”,为源集群上的主题生成数据并在目标集群(b)上运行消费者,主题“a.test”以验证数据复制。
观察:按预期工作得很好。
场景 2.) 使用与上述相同的属性再运行一个 MirrorMaker 2.0 实例。
启动命令:/usr/bin/connect-mirror-maker.sh connect-mirror-maker.properties &
验证:在源集群上再创建一个“test2”主题,在源集群上生成数据到主题并在目标集群(b)上运行消费者,主题“a.test2”以验证数据复制。
观察:MM2 能够在目标集群上复制主题,a.test2 存在于目标集群 b 上,但消费者没有得到任何记录来消费。
在较新的 mirror-maker 2.0 实例日志上,主题复制后,mirror-sourceconnector 任务没有重新启动,这在主题复制后在单个实例中重新启动。
注意:没有看到错误日志。
apache-kafka - 使用 MTLS 配置连接镜像制造商,仅用于目标,纯文本用于源
我正在运行mirror maker 2.0(来自kafka 2.6的二进制文件)并且我的源集群配置为纯文本,而我的目标集群启用了ssl
我已经创建了正确的密钥,并确保第一个连接可以使用以下命令正常工作,
ssl_properties 文件如下,
security.protocol=SSL
ssl.truststore.location=/opt/keys/client.truststore
ssl.truststore.password=123456
ssl.keystore.location=/opt/keys/jacek.keystore
ssl.keystore.password=123456 ssl.key .password=密码
现在,当我尝试运行镜像制造商时,它失败并出现以下错误,镜像制造商配置文件中有类似的配置,如下所示,
错误:
org.apache.kafka.common.errors.TimeoutException:调用(callName=fetchMetadata,deadlineMs=1605011994642,trys=1,nextAllowedTryMs=1605011994743)在 1605011994643 1 次尝试后超时原因:org.apache.kafka.common .errors.TimeoutException:等待节点分配超时。调用:fetchMetadata [2020-11-10 12:40:24,642] INFO App info kafka.admin.client for adminclient-8 未注册(org.apache.kafka.common.utils.AppInfoParser:83)[2020-11-10 12 :40:24,643] INFO [AdminClient clientId=adminclient-8] 元数据更新失败 (org.apache.kafka.clients.admin.internals.AdminMetadataManager:235) org.apache.kafka.common.errors.TimeoutException: Call(callName= fetchMetadata,deadlineMs=1605012024643,trys=1,nextAllowedTryMs=-9223372036854775709)在 9223372036854775807 次尝试后超时 原因:org.apache.kafka.common.errors。TimeoutException:AdminClient 线程已退出。调用:fetchMetadata [2020-11-10 12:40:24,644] INFO Metrics scheduler closed (org.apache.kafka.common.metrics.Metrics:668) [2020-11-10 12:40:24,644] INFO Closing Reporter org .apache.kafka.common.metrics.JmxReporter (org.apache.kafka.common.metrics.Metrics:672) [2020-11-10 12:40:24,644] INFO Metrics 记者关闭 (org.apache.kafka.common. metrics.Metrics:678)[2020-11-10 12:40:24,645] ERROR 由于错误而停止(org.apache.kafka.connect.mirror.MirrorMaker:304)org.apache.kafka.connect.errors.ConnectException:无法连接和描述 Kafka 集群。检查工作人员的代理连接和安全属性。在 org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:70) 在 org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:70) kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) 在 org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) 在 org.apache.kafka.connect.util。 ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:64) ... 7 更多原因:org.apache.kafka.common.errors.TimeoutException: Call(callName=listNodes, deadlineMs=1605012024641, Trys=1, nextAllowedTryMs=1605012024742) 超时在 1605012024642 尝试 1 次后原因:org.apache.kafka.common.errors.TimeoutException:等待节点分配超时。调用:listNodes apache.kafka.common.errors.TimeoutException:调用(callName=listNodes,deadlineMs=1605012024641,trys=1,nextAllowedTryMs=1605012024742)在 1605012024642 1 次尝试后超时原因:org.apache.kafka.common.errors .TimeoutException:等待节点分配超时。调用:listNodes apache.kafka.common.errors.TimeoutException:调用(callName=listNodes,deadlineMs=1605012024641,trys=1,nextAllowedTryMs=1605012024742)在 1605012024642 1 次尝试后超时原因:org.apache.kafka.common.errors .TimeoutException:等待节点分配超时。调用:listNodes
apache-kafka - 如何部署 Strimzi KafkaMirrorMaker
我正在使用 srimzi 运算符并在 k8s 上运行 kafka 集群。我想使用 Kafka Mirror Maker,我使用 CRD yml 部署了 Kafka Mirror Maker,但是我的 KMM pod 处于 crashLoopBack 状态。我没有得到问题是什么?这是我的 Kafka MirrorMaker yml
还有我的 kafka-cluster yml :
我的第二个 Kafka 集群:
我的 pod 列表及其状态:
Pod 日志:
这是我的 kafka 集群的 svc:
apache-kafka - 在源 kafka 集群上禁用 mirrormaker2 偏移同步主题
我们正在使用MirrorMaker2将一些主题从一个 kerberized kafka 集群复制到另一个 kafka 集群(严格单向)。我们不控制源 kafka 集群,我们只能访问描述和阅读要使用的特定主题。
MirrorMaker2mm2-offset-syncs
在源集群中创建并维护一个主题(AdminClient
因为 MM2 需要授权才能在源集群中创建和写入这些主题,或者通过 执行操作AdminClient
,所以我试图理解为什么/如果我们在我们的场景中需要这些机制。
我的问题是:
- 在严格的单向场景中,这个源集群偏移同步主题对 Mirrormaker 有什么用处?
- 如果确实是多余的,是否可以禁用它或操作 mm2 而无需访问创建/生成该主题?
- 如果 ACL 和 Config 传播被禁用,是否可以安全地假设它
AdminClient
不用于其他任何事情?
在 MirrorMaker 代码中,它在启动时很容易创建偏移同步主题,然后由. 同样的情况也发生在.MirrorSourceConnector
MirrorSourceTask
MirrorSourceConnector
我没有找到关闭这些功能的方法,但老实说,我的思路中可能遗漏了一些东西。
apache-kafka - 从 Confluent Replicator 迁移到 Apache Mirror Maker 2.0 时保持相同的偏移量
我们目前正在尝试将 Confluent 复制器迁移到 Apache Open Source Mirror Maker v2.0。我们面临一个问题,当镜像制造商在同一主题上启动时,已经被复制器复制的消息再次被复制。这不应该发生,因为消息在目标集群中被复制。以下是更多详细信息:
- RCA:replicator 为复制消息分配一个消费者组。这个消费者组维护了源主题的偏移量。但是我们无法将相同的消费者组分配给镜像制造商 2 中的消费者配置。
- Mirror Maker 1.0:可以在 consumer.properties 文件中分配作为同一个消费者组工作的消息,并且在复制器停止后立即选择消息。
- 尝试
source.cluster.consumer.group.id
在 mirror maker 2.0 中以所有可用选项(在集群模式、连接独立和连接分布式模式下)运行和配置,但 mirror maker 2.0 在复制消息时将使用者组 ID 分配为 null。
如果有人做过同样的事情并试图与 mirror maker 2.0 保持相同的偏移量,那么任何指针。
apache-kafka - Kafka:将主题 A 复制到主题 B,同时对记录应用转换
我需要将集群 A 上的主题的记录镜像到集群 B 上的主题,同时在记录中添加一个字段,因为它们被代理(例如InsertField
)。
我没有控制集群 A(但可能需要更改)并且完全控制集群 B。
我知道集群 A 正在发送序列化的 JSON。
我正在使用MirrorMaker API和 Kafka 连接来进行镜像,并且我正在尝试使用InsertField
转换在记录中添加数据,因为它们被代理。
我的配置如下所示:
此代码将失败并显示错误说明:
org.apache.kafka.connect.errors.DataException:[字段插入]仅支持结构对象
有没有办法在不编写自定义转换的情况下实现这一点(我使用的是 Python,我不熟悉 Java)
非常感谢
apache-kafka - 从 Mirror maker 1.0 迁移到 Mirror maker 2.0 时保持相同的消费者组
试图将mirror maker 1.0升级到mirror maker 2.0,并观察到在关闭mirror maker 1.0并启动mirror maker 2.0时,消费者从一开始就开始收到消息(重复)。发现这篇文章强调了同样的问题。
一个。如何通过consumer group name
in mirror maker 2.0 以保持与consumer group name
在 mirror maker 1.0 中使用的相同?
湾。auto.offset.reset
mirror maker 2.0如何配置属性?
apache-kafka - kafka mirrormaker 2 自定义分区器
我正在尝试确定是否可以将自定义分区器与 mirrormaker 2 一起使用,因此在复制到目标集群时使用了我的自定义分区器。根据这里的文档https://github.com/apache/kafka/tree/trunk/connect/mirror应该可以使用配置格式 target-alias.producer.* 覆盖 mm2 生产者设置,我已经尝试了各种格式,例如(源和目标是我的集群别名)
我可以在 mirrormaker 2 日志中看到分区器已成功加载,并且在生产者配置被转储的地方,似乎设置了自定义分区器类。但是,根据我的分区程序类(和观察)中的调试日志,它仅在 mirrormarker 2 生成到内部主题(如 mm2-offsets.source.internal)时被调用,而不是在生成到被复制到的实际主题时被调用。
谁能帮我理解上面的行为?我假设有单独的生产者客户端,一些负责编写复制的消息,一些负责更新内部主题,但如果是这样,不确定为什么自定义分区器只适用于后者。
apache-kafka - Confluent Replicator 的偏移转换如何处理时钟偏差?
我一直在阅读Confluent Replicator 的文档以评估其主动-主动多数据中心复制支持,虽然文档清楚地说明了它将消费者偏移量从一个 DC 转换到另一个 DC 的机制,但我仍然不清楚的是该机制如何处理时钟偏差。
通过阅读数据恢复白皮书可以看出,用于翻译的消息时间戳实际上CreateTime
是来自生产者的时间戳。然而,所有关于如何执行偏移转换的示例似乎都暗示时间戳是单调递增的,例如这个复制器博客文章:
为什么这是一个问题?因为很难(阅读:几乎不可能)保证在商用硬件上运行的分布式系统中的时间戳单调增加,即使使用专门的硬件(例如 AWS Time Sync Service),您能做的最好的事情就是尽量减少偏差(如果您不这样做)想要使用类似于 Google Spanner 的 TrueTime 的全球时间服务。
因此,让我们考虑最简单的场景,其中 DC-1 和 DC-2 已经使用单独的主题设置了主动-主动复制,例如 DC-1 中的主题 dc1-foo 复制到 DC2 中的 dc1-foo 和 dc2-foo 从DC2 被复制到 DC-1 中的 d2-foo。我们还假设数据中心从一开始就正常运行,但是 dc-1-foo 的复制经历了一个小问题,导致一些记录在 DC-2 中重复,使得 DC-2 中的偏移量大于DC-1 中的偏移量。
给定 DC-1 中当前提交的偏移量为 O1,DC-2 为 O3,并且给定 dc1-foo 的内容(偏移量:TS 格式)为:
- DC-1:[O1:T3]、[O2:T2]、[O3:T3]、[O4:T1]、[O5:T3]
- DC-2:[O3:T3]、[O4:T2]、[O5:T3]、[O6:T1]、[O7:T3]
当 DC-1 中的消费者提交...
- 偏移量 O2,DC-2 偏移量是否转换为 O4?
- 偏移量 O5,DC-2 偏移量是否转换为 O3、O5 或 O7?
我的猜测是转换后的偏移量是 O4 和 O3,但只是想确认一下。