我正在尝试使用 MirrorMaker 2.0 复制 Kafka 集群。我正在使用以下 mm2.properties:
name = mirror-site1-site2
topics = .*
connector.class = org.apache.kafka.connect.mirror.MirrorSourceConnector
tasks.max = 1
plugin.path=/usr/share/java/kafka/plugin
clusters = site1, site2
# for demo, source and target clusters are the same
source.cluster.alias = site1
target.cluster.alias = site2
site1.sasl.mechanism=SCRAM-SHA-256
site1.security.protocol=SASL_PLAINTEXT
site1.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
username="<someuser>" \
password="<somepass>";
site2.sasl.mechanism=SCRAM-SHA-256
site2.security.protocol=SASL_PLAINTEXT
site2.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
username="<someuser>" \
password="<somepass>";
site1.bootstrap.servers = <IP1>:9093, <IP2>:9093, <IP3>:9093, <IP4>:9093
site2.bootstrap.servers = <IP5>:9093, <IP6>:9093, <IP7>:9093, <IP8>:9093
site1->site2.enabled = true
site1->site2.topics = topic1
# use ByteArrayConverter to ensure that records are not re-encoded
key.converter = org.apache.kafka.connect.converters.ByteArrayConverter
value.converter = org.apache.kafka.connect.converters.ByteArrayConverter
所以这是问题所在,mm2 似乎总是复制 x3 消息:
# Manual message production:
kafkacat -P -b <IP1>:9093,<IP2>:9093,<IP3>:9093,<IP4>:9093 -t "topic1"
# Result in the source topic (site1 cluster):
% Reached end of topic topic1 [2] at offset 405
Message1
% Reached end of topic topic1 [2] at offset 406
Message2
% Reached end of topic topic1 [6] at offset 408
Message3
% Reached end of topic topic1 [2] at offset 407
kafkacat -P -b <IP5>:9093,<IP6>:9093,<IP7>:9093,<IP8>:9093 -t "site1.topic1"
# Result in the target topic (site2 cluster):
% Reached end of topic site1.titi [2] at offset 1216
Message1
Message1
Message1
% Reached end of topic site1.titi [2] at offset 1219
Message2
Message2
Message2
% Reached end of topic site1.titi [6] at offset 1229
Message3
Message3
Message3
我尝试使用来自 confluent 包的 Kafka 和直接来自 Apache 的 kafka_2.13-2.4.0,两者都使用 Debian 10.1。
我首先在 confluent 5.4 中鼓励这种行为,认为这可能是他们的包中的一个错误,因为他们有复制器并且不应该真正关心 mm2,但我直接从 Apache 复制了与 kafka_2.13-2.4.0 完全相同的问题,没有任何改变。
我知道 mm2 还不是幂等的,不能保证一次交付。在我的测试中(我尝试了很多东西,包括生产者调整或更大批量的数千条消息)。在所有这些测试中,mm2 总是复制 X3 的所有消息。
我错过了什么吗,有人鼓励同样的事情吗?作为具有相同软件包的旧版 mm1 的站点注释,我没有这个问题。
感谢任何帮助...谢谢!
即使变更日志没有让我对改进非常有信心,但这次我再次尝试从 kafka 2.4.1 运行 mm2。=> 这些奇怪的重复总是没有变化。
我在新服务器上安装了这个版本,以确保我遇到的奇怪行为与服务器无关。
当我使用 ACL 时,我需要特殊权限吗?我把“全部”认为它不能更宽容......即使 mm2 不是幂等的,我也会尝试与此相关的权利。
更让我吃惊的是,我找不到任何报告这样的问题,我肯定做错了什么,但那是什么问题......