2

我使用高级驱动程序运行 MirrorMaker2,如此所述,./bin/connect-mirror-maker.sh mm2.properties在 k8s 部署中在 3 个 pod 中运行。

mm2.properties 文件如下所示:

clusters = source, dest

source.bootstrap.servers = ***:9092
dest.bootstrap.servers =  ***:9092

source->dest.enabled = true
dest->source.enabled = false

source->dest.topics = event\.PROD\.some_id.*

replication.factor=3

checkpoints.topic.replication.factor=3
heartbeats.topic.replication.factor=3
offset-syncs.topic.replication.factor=3

offset.storage.replication.factor=3
status.storage.replication.factor=3
config.storage.replication.factor=3

sync.topic.acls.enabled = false

这很好用,所有与event\.PROD\.some_id.*正则表达式匹配的主题都被复制。现在,当我需要在白名单中添加其他主题时,我希望能够简单地缩小所有内容,更新正则表达式,然后再次放大所有内容。

当我将白名单正则表达式更新为 时source->dest.topics = event\.PROD\.(some_id|another_id).*,匹配的主题"another_id"会在 dest 集群中创建,但不会复制任何数据,并且 mirrormaker 似乎丢失了提交偏移量:

[2020-05-28 20:33:19,496] INFO WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:424)
[2020-05-28 20:33:19,496] INFO WorkerSourceTask{id=MirrorHeartbeatConnector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:441)
[2020-05-28 20:33:19,499] INFO WorkerSourceTask{id=MirrorHeartbeatConnector-0} Finished commitOffsets successfully in 3 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:523)

这是高级驱动程序的限制,还是我做错了什么?据我了解,能够将主题动态添加到白名单是 MM2 的动机之一。

4

1 回答 1

1

我也在玩mmv2。您可以尝试设置这些配置吗?我必须启用该sync.topic.configs.enabled参数,这样我的 mmv2 才能检测到新主题及其数据。

refresh.topics.enabled = true
sync.topic.configs.enabled = true
refresh.topics.interval.seconds = 10

Pd.-我将我的回复添加为答案,因为我想粘贴配置。

于 2020-06-03T03:20:17.857 回答