最初,我创建了一个名为“quickstart-events”的主题,然后在其中生成了一些消息,然后使用消费者组“quickstartGroup”从 kafka-console-consumer 使用它,现在我想将该组从源复制到目标。
当我运行描述命令来描述源集群中的组时
~/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group quickstartGroup
我得到的输出是
Consumer group 'quickstartGroup' has no active members.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
quickstartGroup quickstart-events 1 9 12 3 - - -
quickstartGroup quickstart-events 0 9 12 3 - - - -
在这里,主题正在被复制,但是当我运行命令来描述目标集群中的组时
kafka-consumer-groups.sh --bootstrap-server localhost:9093 --describe --group quickstartGroup
得到错误为
Error: Consumer group 'quickstartGroup' does not exist
My Mirror Maker 2 属性文件内容为:
# Licensed to the Apache Software Foundation (ASF) under A or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see org.apache.kafka.clients.consumer.ConsumerConfig for more details
# Sample MirrorMaker 2.0 top-level configuration file
# Run with ./bin/connect-mirror-maker.sh connect-mirror-maker.properties
# specify any number of cluster aliases
clusters = A, B
# connection information for each cluster
# This is a comma separated host:port pairs for each cluster
# for e.g. "A_host1:9092, A_host2:9092, A_host3:9092"
A.bootstrap.servers = localhost:9092
B.bootstrap.servers = localhost:9093
# enable and configure individual replication flows
A->B.enabled = true
# regex which defines which topics gets replicated. For eg "foo-.*"
A->B.topics = quickstart-events.*
A->B.groups = quickstartGroup.*
# Setting replication factor of newly created remote topics
replication.factor=2
############################# Internal Topic Settings #############################
# The replication factor for mm2 internal topics "heartbeats", "B.checkpoints.internal" and
# "mm2-offset-syncs.B.internal"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
checkpoints.topic.replication.factor=1
heartbeats.topic.replication.factor=1
offset-syncs.topic.replication.factor=1
# The replication factor for connect internal topics "mm2-configs.B.internal", "mm2-offsets.B.internal" and
# "mm2-status.B.internal"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offset.storage.replication.factor=1
status.storage.replication.factor=1
config.storage.replication.factor=1
# customize as needed
# replication.policy.separator = _
# sync.topic.acls.enabled = false
# emit.heartbeats.interval.seconds = 5
groups.exclude = ''
replication.policy.class=ch.mawileo.kafka.mm2.PrefixlessReplicationPolicy
PS:我使用的是 Kafka 2.8