2

我们正在测试 kafka 的 DR 场景。我们在不同的区域有 2 个 kafka 集群。我们正在使用 MirrorMaker2 来复制主题和消息。主题和消息能够复制。但是我们观察到偏移量没有复制。

例如,从生产者那里生成了 10 条指向 kafka 区域 1 的消息。

消费了来自消费者指向 kafka 区域 1 的 5 条消息

停止消费者指向 region1

开始消费者指向 region2

消费消息

这里的期望是区域 2 消费者应该从偏移量 6 消费

但它从偏移量 0 开始消耗

下面是属性文件

 clusters = primary, secondary
# primary cluster information
 primary.bootstrap.servers = test1-primary.com:9094,test2-primary.com.apttuscloud.io:9094,test3-primary.com:9094
 primary.security.protocol= SASL_SSL
 primary.ssl.truststore.password= dummypassword
 primary.ssl.truststore.location= /opt/bitnami/kafka/config/certs/kafka.truststore.jks
 primary.ssl.keystore.password= dummypassword
 primary.ssl.keystore.location= /opt/bitnami/kafka/config/certs/kafka.keystore.jks
 primary.ssl.endpoint.identification.algorithm=
 primary.sasl.mechanism= PLAIN
 primary.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="dummyuser" password="dummypassword";

# secondary cluster information
 secondary.bootstrap.servers = test1-secondary.com:9094,test2-secondary.com.apttuscloud.io:9094,test3-secondary.com:9094
 secondary.security.protocol= SASL_SSL
 secondary.ssl.truststore.password= dummypassword
 secondary.ssl.truststore.location= /opt/bitnami/kafka/config/certs/kafka.truststore.jks
 secondary.ssl.keystore.password= dummypassword
 secondary.ssl.keystore.location= /opt/bitnami/kafka/config/certs/kafka.keystore.jks
 secondary.ssl.endpoint.identification.algorithm=
 secondary.sasl.mechanism=PLAIN
 secondary.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="dummyuser" password="dummypassword";
# Topic Configuration
 primary->secondary.enabled = true
 primary->secondary.topics = .*

 secondary->primary.enabled = true
 secondary->primary.topics = .*

############################# Internal Topic Settings  #############################
# The replication factor for mm2 internal topics "heartbeats", "B.checkpoints.internal" and
# "mm2-offset-syncs.B.internal"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3
 checkpoints.topic.replication.factor= 3
 heartbeats.topic.replication.factor= 3
 offset-syncs.topic.replication.factor= 3

# The replication factor for connect internal topics "mm2-configs.B.internal", "mm2-offsets.B.internal" and
# "mm2-status.B.internal"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
 offset.storage.replication.factor=3
 status.storage.replication.factor=3
 config.storage.replication.factor=3

 replication.factor = 3
 refresh.topics.enabled = true
 sync.topic.configs.enabled = true
 refresh.topics.interval.seconds = 10
 topics.blacklist = .*[\-\.]internal, .*\.replica, __consumer_offsets
 groups.blacklist = console-consumer-.*, connect-.*, __.*
 primary->secondary.emit.heartbeats.enabled = true
 primary->secondary.emit.checkpoints.enabled = true

请注意,一些机密值与虚拟值一起放置

问候,

纳伦德拉·贾达夫

4

1 回答 1

2

使用 MirrorMaker 2.5,在集群之间移动消费者时,不会自动转换偏移量。

因此,在另一个集群上启动消费者时,消费者需要使用RemoteClusterUtils.translateOffsets()来找到他们在这个集群中的偏移量。

在 2.7(预计 2020 年 11 月)中,您可以让 MirrorMaker 2 自动翻译偏移量,请参阅https://cwiki.apache.org/confluence/display/KAFKA/KIP-545%3A+support+automated+consumer+offset+sync+跨+集群+in+MM+2.0

于 2020-09-24T14:16:34.387 回答