我正在使用 Mirror Maker 2 从一个 AWS MSK 集群迁移到另一个。源集群运行 Kafka 2.4.1.1,目标集群运行 2.7。
我的 MirrorMaker2 使用 Kafka 2.7 SDK 在 M5.large EC2 实例上运行。
我希望将所有主题和消费者偏移量从复制$SOURCE_CLUSTER
到$TARGET_CLUSTER
.
我的testTopic
似乎被正确复制(包括消费者组偏移量)。我相信这是因为当我使用from和之后的 fromkafkacat
消费时,消息不会在目标上重新消费,因为偏移量已更新(由 MirrorMaker) on ,因此消息不会被重新消费。testTopic
$SOURCE_CLUSTER
$TARGET_CLUSTER
$TARGET_CLUSTER
但是,当我检查一些较大的主题时,似乎偏移量正在以每秒 2-3 的速度更新,正如我尝试在下面演示的那样。
在这里,我正在描述该组MyConsumerGroup
。$SOURCE_CLUSTER
[ec2-user@ip-x-x-x-x ~]$ k/bin/kafka-consumer-groups.sh --bootstrap-server $SOURCE_CLUSTER --describe --group MyConsumerGroup
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
MyConsumerGroup MyLargeTopic 4 772259 772259 0 <some consumer id> rdkafka
MyConsumerGroup MyLargeTopic 8 821326 821326 0 <some consumer id> rdkafka
MyConsumerGroup MyLargeTopic 9 786077 786077 0 <some consumer id> rdkafka
MyConsumerGroup MyLargeTopic 7 844962 844964 2 <some consumer id> rdkafka
MyConsumerGroup MyLargeTopic 0 784451 784451 0 <some consumer id> rdkafka
MyConsumerGroup MyLargeTopic 3 845682 845682 0 <some consumer id> rdkafka
MyConsumerGroup MyLargeTopic 6 827488 827488 0 <some consumer id> rdkafka
MyConsumerGroup MyLargeTopic 2 843823 843823 0 <some consumer id> rdkafka
MyConsumerGroup MyLargeTopic 5 818343 818343 0 <some consumer id> rdkafka
MyConsumerGroup MyLargeTopic 1 802264 802264 0 <some consumer id> rdkafka
在这里,我正在描述该组MyConsumerGroup
。$TARGET_CLUSTER
[ec2-user@ip-x-x-x-x ~]$ k/bin/kafka-consumer-groups.sh --bootstrap-server $TARGET_CLUSTER --describe --group MyConsumerGroup
Consumer group 'MyConsumerGroup' has no active members.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
MyConsumerGroup MyLargeTopic 7 832171 288324 -543847 - - -
MyConsumerGroup MyLargeTopic 6 814062 260857 -553205 - - -
MyConsumerGroup MyLargeTopic 5 801912 254791 -547121 - - -
MyConsumerGroup MyLargeTopic 4 758982 249167 -509815 - - -
MyConsumerGroup MyLargeTopic 9 770665 238708 -531957 - - -
MyConsumerGroup MyLargeTopic 8 806443 267920 -538523 - - -
MyConsumerGroup MyLargeTopic 3 831331 283500 -547831 - - -
MyConsumerGroup MyLargeTopic 2 831028 250147 -580881 - - -
MyConsumerGroup MyLargeTopic 1 789425 272097 -517328 - - -
MyConsumerGroup MyLargeTopic 0 768326 245568 -522758 - - -
上述命令的后续运行显示LOG-END-OFFSET
每秒递增 2-3。
我的mm2.properties
文件是:
[ec2-user@ip-x-x-x-x ~]$ cat k/config/mm2.properties
clusters = source, target
source.bootstrap.servers=$SOURCE_CLUSTER
target.bootstrap.servers=$TARGET_CLUSTER
# Source and target clusters configurations.
source.config.storage.replication.factor = 3
target.config.storage.replication.factor = 3
source.offset.storage.replication.factor = 3
target.offset.storage.replication.factor = 3
source.status.storage.replication.factor = 3
target.status.storage.replication.factor = 3
source->target.enabled = true
target->source.enabled = false
source->target.sync.group.offsets.enabled=true
source->target.producer.override.**compression.type=gzip
source->target.emit.heartbeats.enabled = true
source->target.emit.checkpoints.enabled = true
source.producer.override.batch.size = 327680
# Mirror maker configurations.
offset-syncs.topic.replication.factor = 3
heartbeats.topic.replication.factor = 3
checkpoints.topic.replication.factor = 3
topics = .*
groups = .*
replication.policy.class=com.amazonaws.kafka.samples.CustomMM2ReplicationPolicy
source.cluster.producer.enable.idempotence = true
target.cluster.producer.enable.idempotence = true
tasks.max = 1
replication.factor = 3
refresh.topics.enabled = true
source.producer.compression.type=gzip
target.producer.compression.type=gzip
source.producer.connections.max.idle.ms=180000
producer.enable.idempotence=true
# Enable heartbeats and checkpoints.
# customize as needed
sync.topic.acls.enabled = false
谁能解释为什么LOG-END-OFFSET
我的目标集群上升如此缓慢?没有消费者连接,$TARGET_CLUSTER
因此所有更新都通过 MirrorMaker2。