我们正在测试 kafka 的 DR 场景。我们在不同的区域有 2 个 kafka 集群。我们正在使用 MirrorMaker2 来复制主题和消息。主题和消息能够复制。但是我们观察到偏移量没有复制。
例如,从生产者那里生成了 10 条指向 kafka 区域 1 的消息。
消费了来自消费者指向 kafka 区域 1 的 5 条消息
停止消费者指向 region1
开始消费者指向 region2
消费消息
这里的期望是区域 2 消费者应该从偏移量 6 消费
但它从偏移量 0 开始消耗
下面是属性文件
clusters = primary, secondary
# primary cluster information
primary.bootstrap.servers = test1-primary.com:9094,test2-primary.com.apttuscloud.io:9094,test3-primary.com:9094
primary.security.protocol= SASL_SSL
primary.ssl.truststore.password= dummypassword
primary.ssl.truststore.location= /opt/bitnami/kafka/config/certs/kafka.truststore.jks
primary.ssl.keystore.password= dummypassword
primary.ssl.keystore.location= /opt/bitnami/kafka/config/certs/kafka.keystore.jks
primary.ssl.endpoint.identification.algorithm=
primary.sasl.mechanism= PLAIN
primary.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="dummyuser" password="dummypassword";
# secondary cluster information
secondary.bootstrap.servers = test1-secondary.com:9094,test2-secondary.com.apttuscloud.io:9094,test3-secondary.com:9094
secondary.security.protocol= SASL_SSL
secondary.ssl.truststore.password= dummypassword
secondary.ssl.truststore.location= /opt/bitnami/kafka/config/certs/kafka.truststore.jks
secondary.ssl.keystore.password= dummypassword
secondary.ssl.keystore.location= /opt/bitnami/kafka/config/certs/kafka.keystore.jks
secondary.ssl.endpoint.identification.algorithm=
secondary.sasl.mechanism=PLAIN
secondary.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="dummyuser" password="dummypassword";
# Topic Configuration
primary->secondary.enabled = true
primary->secondary.topics = .*
secondary->primary.enabled = true
secondary->primary.topics = .*
############################# Internal Topic Settings #############################
# The replication factor for mm2 internal topics "heartbeats", "B.checkpoints.internal" and
# "mm2-offset-syncs.B.internal"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3
checkpoints.topic.replication.factor= 3
heartbeats.topic.replication.factor= 3
offset-syncs.topic.replication.factor= 3
# The replication factor for connect internal topics "mm2-configs.B.internal", "mm2-offsets.B.internal" and
# "mm2-status.B.internal"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offset.storage.replication.factor=3
status.storage.replication.factor=3
config.storage.replication.factor=3
replication.factor = 3
refresh.topics.enabled = true
sync.topic.configs.enabled = true
refresh.topics.interval.seconds = 10
topics.blacklist = .*[\-\.]internal, .*\.replica, __consumer_offsets
groups.blacklist = console-consumer-.*, connect-.*, __.*
primary->secondary.emit.heartbeats.enabled = true
primary->secondary.emit.checkpoints.enabled = true
请注意,一些机密值与虚拟值一起放置
问候,
纳伦德拉·贾达夫