问题标签 [apache-kafka-mirrormaker]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
823 浏览

apache-kafka - 如何使用 MirrorMaker 2.0 迁移消费者偏移量?

对于 Kafka 2.7.0,我使用 MirroMaker 2.0 作为 Kafka 连接连接器,将所有主题从主 Kafka 集群复制到备份集群。

所有的主题都被完美地复制了,除了__consumer_offsets. 以下是连接配置:

在这里的一个类似问题中,接受的答案如下:

在你的 consumer.config 中添加这个:

exclude.internal.topics=false

并将其添加到您的 producer.config 中:

client.id=__admin_client

我在哪里添加这些在我的配置?

此处的连接器配置属性没有名为 的属性client.id,但我已将 的值设置exclude.internal.topicsfalse

我在这里缺少什么吗?

更新

我了解到 Kafka 2.7 及更高版本支持使用此处MirrorCheckpointTask提到的自动消费者偏移同步。

我为此创建了一个具有以下配置的连接器:

仍然没有帮助。这是正确的方法吗?有什么需要吗?

0 投票
0 回答
115 浏览

jmx - 将 mm jmx 指标集成到 check_mk

我可以成功地将 mm 生产者/消费者指标集成到 check_mk。但是有一种类型的指标(consumer-node-metrics),它有一个参数(node-id)变量。它在应用程序的不同执行之间变化。我认为我们正在使用的 jmx.jar 对此负责。

我们使用 jolokia.cfg 进行配置。下面是我的配置文件。

有没有办法让节点 id 保持不变?因为它一直在变化,然后 check_mk 无法识别它并说不受监控的服务。

请查看来自 check_mk 和 jconsole 的附加屏幕截图。

check_mk

mm上的Kafka消费者

0 投票
1 回答
48 浏览

apache-kafka - 是否可以在不使用 Kafka Connect 的情况下在 Kafka 中运行 MirrorMaker?

希望提出一种无需 Kafka Connect 即可镜像或复制一个 Kafka 环境的解决方案。很难想出任何可能的解决方案或变通方法。对 Kafka 来说非常陌生,如果有任何想法和/或指导,将不胜感激!

0 投票
0 回答
222 浏览

apache-kafka - 默认端口 8083 上的 Kafka Mirror Maker 2 的 REST API 调用未连接

在 GCP 上的自己的实例上启动 Kafka Mirror Maker 2 后,我正在运行 curl 命令以连接到 MM2 Rest API 端口:

我得到Failed to connect to localhost port 8083: Connection refused" 回应。

我也试着把

在 MM2 的配置文件中,但仍然是相同的响应

如何为 Kafka Mirror Maker 2 配置 REST 端点

0 投票
0 回答
132 浏览

apache-kafka - kafka mirror-client RemoteClusterUtils.translateOffsets() 方法读取所有记录生成偏移图

我将 MirrorMarker 2.0 与 Kafka 集群版本 2.4.1 一起使用。这是一个主动-主动配置,具有两个使用 MM 2.0 设置的集群,我正在尝试从复制到另一个集群的主题中读取。

我想在第二个集群中设置我的消费者组偏移量,以从第一个集群中上次提交的位置读取。为此,我使用 Kafka 镜像客户端 RemoteClusterUtils.translateOffsets() 方法。它需要三个参数,即 consumergroupid、ClusterAliasName 和 timeout。

TranslateOffsets 方法从 cluster.checkpoints.internal 主题中读取并返回一个映射 Map<TopicPartition, OffsetAndMetadata>,但该方法读取 checkpoints.internal 主题中的所有记录并尝试与消费者组匹配。这是一个非常缓慢的操作。如果有很多主题和多个消费者组,它将一一读取消息,然后填充地图。(如果给定非常高的超时时间,否则大多数时候它会返回一个空地图)

在 Kafka 2.4.1 版中使用 MM 2.0 时,还有其他方法可以设置消费者组偏移量吗?

0 投票
0 回答
63 浏览

apache-kafka - 覆盖 Kafka MirrorMaker2 中的默认属性

根据以下文档,我正在运行一个 kafka 集群并使用 Mirrormaker 2.0 将其复制到另一个集群:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0

具体来说,我正在运行以下命令来启动我的镜像制造商:

我的源集群中有大量数据,并且经常出现超时错误。为此,我想设置以下两个属性:

我试图将这些属性覆盖到我的镜像制造商属性中,但正在选择默认值。关于如何覆盖这些默认值的任何建议?

0 投票
0 回答
168 浏览

apache-kafka - Kafka Source Connector 将 JSON 值反序列化为未定义格式的输出

我将使用 Kafka MirrorMaker 2 Source Connector,因此我尝试将它部署在我的沙盒 Kafka 集群上(由 Confluent docker compose构建)。

  1. 我创建了一个my_test_topic以 JSON 格式存储值的主题。并配置MirrorSourceConnector为将其复制到同一个 Kafka 集群中。目标主题source_cluster.my_test_topic是自动创建的。这是连接器配置:
  1. 我通过kafkacat将数据放入源主题:
  1. 当我尝试从复制的主题(又名目标主题)中消费数据时,我得到以下结果,我认为它是 base64 格式:

这显然不是我所期望的。所以我也尝试了其他一些选择:

  • 将值转换器类更改为目标主题中的org.apache.kafka.connect.storage.StringConverter消费值[B@3d5e46ef
  • 将值转换器类更改为并最终消耗来自目标主题org.apache.kafka.connect.converters.ByteArrayConverter的期望值{"item":[{"price":"100"},{"price":"120"}],"another_item":[{"height":"50","width":"60"}]}

JSONConverterStringConverter不工作而ByteArrayConverter工作的原因是什么?

此外,我尝试了其他一些连接器配置选项:

  • "source.cluster.consumer.value-deserializer": "org.apache.kafka.common.serialization.StringDeserializer"使用StringConverter
  • "source.cluster.consumer.value-deserializer": "org.apache.kafka.common.serialization.ByteArrayDeserializer"使用StringConverter

但这些都不起作用,结果输出类似于[B@5d6b605c.

在这种情况下配置是否"source.cluster.consumer.value-deserializer"有任何影响?


同时,当我使用FileStreamSourceConnector从文件中读取 JSON 时,StringConverterJSONConverter都可以正常工作,而ByteArrayConverter会抛出错误Invalid schema type for ByteArrayConverter: STRING

0 投票
1 回答
89 浏览

apache-kafka-mirrormaker - 有没有办法在 MirrorMaker 2 中禁用偏移复制?

除了明确授予的主题之外,我无权访问“源”集群来创建任何主题。我想使用 MirrorMaker2 在源和目标之间进行复制,但未能正确配置源集群。

有没有办法在 MirrorMaker 2 中禁用偏移复制?

0 投票
1 回答
100 浏览

apache-kafka - 使用 Kafka MirrorMaker 1 复制记录时是否可以保留原始分区?

我们正在使用在 Azure 容器实例中运行的 Kafka MirrorMaker 1 将记录从 kafka 集群发送到 Microsoft Azure Eventhub。我们的主题有 10 个分区,我们在 Azure EventHub 中手动创建了一个名称和分区数相同的主题。不幸的是,这些记录在被 MirrorMaker 1 复制后失去了原来的分区。有没有办法使用 MirrorMaker 1 保留原来的分区?

0 投票
1 回答
199 浏览

apache-kafka - 在 Kafka MirrorMaker 2 上进行活动活动配置时避免主题上的集群前缀

我需要使用 MirrorMaker 2 在活动的活动设置中复制信息,但我还需要它不要像通常那样通过添加集群名称作为前缀来更改主题名称。

我能够通过修改 replication.policy.separator 以及源和目标别名(sourcetarget.cluster.alias)来抑制前缀。但这会导致无限循环,其中消息被无限复制。有什么办法可以避免这种情况,同时又没有任何前缀?

本质上,我正在寻找的是特定的复制策略,或使用与前缀不同的东西来避免无限循环的配置。