5

我正在尝试使用 MirrorMaker 2.0 复制 Kafka 集群。我正在使用以下 mm2.properties:

name = mirror-site1-site2
topics = .*
connector.class = org.apache.kafka.connect.mirror.MirrorSourceConnector
tasks.max = 1
plugin.path=/usr/share/java/kafka/plugin
clusters = site1, site2

# for demo, source and target clusters are the same
source.cluster.alias = site1
target.cluster.alias = site2

site1.sasl.mechanism=SCRAM-SHA-256
site1.security.protocol=SASL_PLAINTEXT
site1.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
   username="<someuser>" \
   password="<somepass>";

site2.sasl.mechanism=SCRAM-SHA-256
site2.security.protocol=SASL_PLAINTEXT
site2.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
   username="<someuser>" \
   password="<somepass>";

site1.bootstrap.servers = <IP1>:9093, <IP2>:9093, <IP3>:9093, <IP4>:9093
site2.bootstrap.servers = <IP5>:9093, <IP6>:9093, <IP7>:9093, <IP8>:9093

site1->site2.enabled = true
site1->site2.topics = topic1


# use ByteArrayConverter to ensure that records are not re-encoded
key.converter = org.apache.kafka.connect.converters.ByteArrayConverter
value.converter = org.apache.kafka.connect.converters.ByteArrayConverter

所以这是问题所在,mm2 似乎总是复制 x3 消息:

# Manual message production: 

 kafkacat -P -b <IP1>:9093,<IP2>:9093,<IP3>:9093,<IP4>:9093 -t "topic1"


# Result in the source topic (site1 cluster): 

% Reached end of topic topic1 [2] at offset 405
Message1
% Reached end of topic topic1 [2] at offset 406
Message2
% Reached end of topic topic1 [6] at offset 408
Message3
% Reached end of topic topic1 [2] at offset 407

 kafkacat -P -b <IP5>:9093,<IP6>:9093,<IP7>:9093,<IP8>:9093 -t "site1.topic1"

# Result in the target topic (site2 cluster): 

% Reached end of topic site1.titi [2] at offset 1216
Message1
Message1
Message1
% Reached end of topic site1.titi [2] at offset 1219
Message2
Message2
Message2
% Reached end of topic site1.titi [6] at offset 1229
Message3
Message3
Message3

我尝试使用来自 confluent 包的 Kafka 和直接来自 Apache 的 kafka_2.13-2.4.0,两者都使用 Debian 10.1。

我首先在 confluent 5.4 中鼓励这种行为,认为这可能是他们的包中的一个错误,因为他们有复制器并且不应该真正关心 mm2,但我直接从 Apache 复制了与 kafka_2.13-2.4.0 完全相同的问题,没有任何改变。

我知道 mm2 还不是幂等的,不能保证一次交付。在我的测试中(我尝试了很多东西,包括生产者调整或更大批量的数千条消息)。在所有这些测试中,mm2 总是复制 X3 的所有消息。

我错过了什么吗,有人鼓励同样的事情吗?作为具有相同软件包的旧版 mm1 的站点注释,我没有这个问题。

感谢任何帮助...谢谢!


即使变更日志没有让我对改进非常有信心,但这次我再次尝试从 kafka 2.4.1 运行 mm2。=> 这些奇怪的重复总是没有变化。

我在新服务器上安装了这个版本,以确保我遇到的奇怪行为与服务器无关。

当我使用 ACL 时,我需要特殊权限吗?我把“全部”认为它不能更宽容......即使 mm2 不是幂等的,我也会尝试与此相关的权利。

更让我吃惊的是,我找不到任何报告这样的问题,我肯定做错了什么,但那是什么问题......

4

2 回答 2

5

您需要connector.class = org.apache.kafka.connect.mirror.MirrorSourceConnector从配置中删除,因为这告诉 Mirror Maker 将此类用于它生成的 Heartbeats 和检查点连接器以及复制数据的 Source 连接器,并且此类使它们的行为与 Source 连接器完全相同,这就是为什么每次复制 3 条消息,实际上生成了 3 个源连接器。

于 2020-06-09T13:38:24.913 回答
1

对客户端配置启用幂等性将解决此问题。默认情况下,它将设置为 false。将以下内容添加到 mm2.properties 文件中

source.cluster.producer.enable.idempotence = true
target.cluster.producer.enable.idempotence = true
于 2020-04-14T12:46:37.457 回答