1

对于 Kafka 2.7.0,我使用 MirroMaker 2.0 作为 Kafka 连接连接器,将所有主题从主 Kafka 集群复制到备份集群。

所有的主题都被完美地复制了,除了__consumer_offsets. 以下是连接配置:

{
    "name": "test-connector",
    "config": {
      "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
      "topics.blacklist": "some-random-topic",
      "replication.policy.separator": "",
      "source.cluster.alias": "",
      "target.cluster.alias": "",
      "exclude.internal.topics":"false",
      "tasks.max": "10",
      "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
      "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
      "source.cluster.bootstrap.servers": "xx.xx.xxx.xx:9094",
      "target.cluster.bootstrap.servers": "yy.yy.yyy.yy:9094",
      "topics": "test-topic-from-primary,primary-kafka-connect-offset,primary-kafka-connect-config,primary-kafka-connect-status,__consumer_offsets"
    }
}

在这里的一个类似问题中,接受的答案如下:

在你的 consumer.config 中添加这个:

exclude.internal.topics=false

并将其添加到您的 producer.config 中:

client.id=__admin_client

我在哪里添加这些在我的配置?

此处的连接器配置属性没有名为 的属性client.id,但我已将 的值设置exclude.internal.topicsfalse

我在这里缺少什么吗?

更新

我了解到 Kafka 2.7 及更高版本支持使用此处MirrorCheckpointTask提到的自动消费者偏移同步。

我为此创建了一个具有以下配置的连接器:

{
    "name": "mirror-checkpoint-connector",
    "config": {
      "connector.class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
      "sync.group.offsets.enabled": "true",
      "source.cluster.alias": "",
      "target.cluster.alias": "",
      "exclude.internal.topics":"false",
      "tasks.max": "10",
      "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
      "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
      "source.cluster.bootstrap.servers": "xx.xx.xxx.xx:9094",
      "target.cluster.bootstrap.servers": "yy.yy.yyy.yy:9094",
      "topics": "__consumer_offsets"
    }
}

仍然没有帮助。这是正确的方法吗?有什么需要吗?

4

2 回答 2

2

您不想复制 consumer_offsets。由于各种原因,从 src 到目标集群的偏移量不会相同。

MirrorMaker2 提供了进行偏移平移的能力。它将使用从 src 集群生成的转换偏移量填充目标集群。 https://cwiki.apache.org/confluence/display/KAFKA/KIP-545%3A+support+automated+consumer+offset+sync+across+clusters+in+MM+2.0

于 2021-05-05T15:18:16.230 回答
0

__consumer_offsets 默认被忽略

topics.exclude = [.*[\-\.]internal, .*\.replica, __.*]

你需要覆盖这个配置

于 2021-10-25T12:13:44.223 回答