0

我正在使用 Mirror Maker 2 从一个 AWS MSK 集群迁移到另一个。源集群运行 Kafka 2.4.1.1,目标集群运行 2.7。

我的 MirrorMaker2 使用 Kafka 2.7 SDK 在 M5.large EC2 实例上运行。

我希望将所有主题和消费者偏移量从复制$SOURCE_CLUSTER$TARGET_CLUSTER.

我的testTopic似乎被正确复制(包括消费者组偏移量)。我相信这是因为当我使用from和之后的 fromkafkacat消费时,消息不会在目标上重新消费,因为偏移量已更新(由 MirrorMaker) on ,因此消息不会被重新消费。testTopic$SOURCE_CLUSTER$TARGET_CLUSTER$TARGET_CLUSTER

但是,当我检查一些较大的主题时,似乎偏移量正在以每秒 2-3 的速度更新,正如我尝试在下面演示的那样。

在这里,我正在描述该组MyConsumerGroup$SOURCE_CLUSTER

[ec2-user@ip-x-x-x-x ~]$ k/bin/kafka-consumer-groups.sh --bootstrap-server $SOURCE_CLUSTER --describe --group MyConsumerGroup

GROUP                  TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                  HOST            CLIENT-ID
MyConsumerGroup MyLargeTopic           4          772259          772259          0               <some consumer id> rdkafka
MyConsumerGroup MyLargeTopic           8          821326          821326          0               <some consumer id> rdkafka
MyConsumerGroup MyLargeTopic           9          786077          786077          0               <some consumer id> rdkafka
MyConsumerGroup MyLargeTopic           7          844962          844964          2               <some consumer id> rdkafka
MyConsumerGroup MyLargeTopic           0          784451          784451          0               <some consumer id> rdkafka
MyConsumerGroup MyLargeTopic           3          845682          845682          0               <some consumer id> rdkafka
MyConsumerGroup MyLargeTopic           6          827488          827488          0               <some consumer id> rdkafka
MyConsumerGroup MyLargeTopic           2          843823          843823          0               <some consumer id> rdkafka
MyConsumerGroup MyLargeTopic           5          818343          818343          0               <some consumer id> rdkafka
MyConsumerGroup MyLargeTopic           1          802264          802264          0               <some consumer id> rdkafka

在这里,我正在描述该组MyConsumerGroup$TARGET_CLUSTER

[ec2-user@ip-x-x-x-x ~]$ k/bin/kafka-consumer-groups.sh --bootstrap-server $TARGET_CLUSTER --describe --group MyConsumerGroup

Consumer group 'MyConsumerGroup' has no active members.

GROUP                  TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
MyConsumerGroup MyLargeTopic           7          832171          288324          -543847         -               -               -
MyConsumerGroup MyLargeTopic           6          814062          260857          -553205         -               -               -
MyConsumerGroup MyLargeTopic           5          801912          254791          -547121         -               -               -
MyConsumerGroup MyLargeTopic           4          758982          249167          -509815         -               -               -
MyConsumerGroup MyLargeTopic           9          770665          238708          -531957         -               -               -
MyConsumerGroup MyLargeTopic           8          806443          267920          -538523         -               -               -
MyConsumerGroup MyLargeTopic           3          831331          283500          -547831         -               -               -
MyConsumerGroup MyLargeTopic           2          831028          250147          -580881         -               -               -
MyConsumerGroup MyLargeTopic           1          789425          272097          -517328         -               -               -
MyConsumerGroup MyLargeTopic           0          768326          245568          -522758         -               -               -

上述命令的后续运行显示LOG-END-OFFSET每秒递增 2-3。

我的mm2.properties文件是:

[ec2-user@ip-x-x-x-x ~]$ cat k/config/mm2.properties
clusters = source, target
source.bootstrap.servers=$SOURCE_CLUSTER
target.bootstrap.servers=$TARGET_CLUSTER

# Source and target clusters configurations.
source.config.storage.replication.factor = 3
target.config.storage.replication.factor = 3

source.offset.storage.replication.factor = 3
target.offset.storage.replication.factor = 3

source.status.storage.replication.factor = 3
target.status.storage.replication.factor = 3

source->target.enabled = true
target->source.enabled = false

source->target.sync.group.offsets.enabled=true
source->target.producer.override.**compression.type=gzip
source->target.emit.heartbeats.enabled = true
source->target.emit.checkpoints.enabled = true

source.producer.override.batch.size = 327680

# Mirror maker configurations.
offset-syncs.topic.replication.factor = 3
heartbeats.topic.replication.factor = 3
checkpoints.topic.replication.factor = 3

topics = .*
groups = .*
replication.policy.class=com.amazonaws.kafka.samples.CustomMM2ReplicationPolicy
source.cluster.producer.enable.idempotence = true
target.cluster.producer.enable.idempotence = true

tasks.max = 1
replication.factor = 3
refresh.topics.enabled = true

source.producer.compression.type=gzip
target.producer.compression.type=gzip
source.producer.connections.max.idle.ms=180000

producer.enable.idempotence=true

# Enable heartbeats and checkpoints.

# customize as needed
sync.topic.acls.enabled = false

谁能解释为什么LOG-END-OFFSET我的目标集群上升如此缓慢?没有消费者连接,$TARGET_CLUSTER因此所有更新都通过 MirrorMaker2。

4

0 回答 0