对于 Kafka 2.7.0,我使用 MirroMaker 2.0 作为 Kafka 连接连接器,将所有主题从主 Kafka 集群复制到备份集群。
所有的主题都被完美地复制了,除了__consumer_offsets
. 以下是连接配置:
{
"name": "test-connector",
"config": {
"connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
"topics.blacklist": "some-random-topic",
"replication.policy.separator": "",
"source.cluster.alias": "",
"target.cluster.alias": "",
"exclude.internal.topics":"false",
"tasks.max": "10",
"key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"source.cluster.bootstrap.servers": "xx.xx.xxx.xx:9094",
"target.cluster.bootstrap.servers": "yy.yy.yyy.yy:9094",
"topics": "test-topic-from-primary,primary-kafka-connect-offset,primary-kafka-connect-config,primary-kafka-connect-status,__consumer_offsets"
}
}
在这里的一个类似问题中,接受的答案如下:
在你的 consumer.config 中添加这个:
exclude.internal.topics=false
并将其添加到您的 producer.config 中:
client.id=__admin_client
我在哪里添加这些在我的配置?
此处的连接器配置属性没有名为 的属性client.id
,但我已将 的值设置exclude.internal.topics
为false
。
我在这里缺少什么吗?
更新
我了解到 Kafka 2.7 及更高版本支持使用此处MirrorCheckpointTask
提到的自动消费者偏移同步。
我为此创建了一个具有以下配置的连接器:
{
"name": "mirror-checkpoint-connector",
"config": {
"connector.class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
"sync.group.offsets.enabled": "true",
"source.cluster.alias": "",
"target.cluster.alias": "",
"exclude.internal.topics":"false",
"tasks.max": "10",
"key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"source.cluster.bootstrap.servers": "xx.xx.xxx.xx:9094",
"target.cluster.bootstrap.servers": "yy.yy.yyy.yy:9094",
"topics": "__consumer_offsets"
}
}
仍然没有帮助。这是正确的方法吗?有什么需要吗?