我使用高级驱动程序运行 MirrorMaker2,如此处所述,./bin/connect-mirror-maker.sh mm2.properties
在 k8s 部署中在 3 个 pod 中运行。
mm2.properties 文件如下所示:
clusters = source, dest
source.bootstrap.servers = ***:9092
dest.bootstrap.servers = ***:9092
source->dest.enabled = true
dest->source.enabled = false
source->dest.topics = event\.PROD\.some_id.*
replication.factor=3
checkpoints.topic.replication.factor=3
heartbeats.topic.replication.factor=3
offset-syncs.topic.replication.factor=3
offset.storage.replication.factor=3
status.storage.replication.factor=3
config.storage.replication.factor=3
sync.topic.acls.enabled = false
这很好用,所有与event\.PROD\.some_id.*
正则表达式匹配的主题都被复制。现在,当我需要在白名单中添加其他主题时,我希望能够简单地缩小所有内容,更新正则表达式,然后再次放大所有内容。
当我将白名单正则表达式更新为 时source->dest.topics = event\.PROD\.(some_id|another_id).*
,匹配的主题"another_id"
会在 dest 集群中创建,但不会复制任何数据,并且 mirrormaker 似乎丢失了提交偏移量:
[2020-05-28 20:33:19,496] INFO WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:424)
[2020-05-28 20:33:19,496] INFO WorkerSourceTask{id=MirrorHeartbeatConnector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:441)
[2020-05-28 20:33:19,499] INFO WorkerSourceTask{id=MirrorHeartbeatConnector-0} Finished commitOffsets successfully in 3 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:523)
这是高级驱动程序的限制,还是我做错了什么?据我了解,能够将主题动态添加到白名单是 MM2 的动机之一。