1

最初,我创建了一个名为“quickstart-events”的主题,然后在其中生成了一些消息,然后使用消费者组“quickstartGroup”从 kafka-console-consumer 使用它,现在我想将该组从源复制到目标。

当我运行描述命令来描述源集群中的组时

~/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group quickstartGroup

我得到的输出是

Consumer group 'quickstartGroup' has no active members.


GROUP           TOPIC             PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
quickstartGroup quickstart-events 1          9               12              3               -               -               -
quickstartGroup quickstart-events 0          9               12              3               -               -               -               -

在这里,主题正在被复制,但是当我运行命令来描述目标集群中的组时

kafka-consumer-groups.sh --bootstrap-server localhost:9093 --describe --group quickstartGroup

得到错误为

Error: Consumer group 'quickstartGroup' does not exist

My Mirror Maker 2 属性文件内容为:

# Licensed to the Apache Software Foundation (ASF) under A or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
# 
#    http://www.apache.org/licenses/LICENSE-2.0
# 
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see org.apache.kafka.clients.consumer.ConsumerConfig for more details

# Sample MirrorMaker 2.0 top-level configuration file
# Run with ./bin/connect-mirror-maker.sh connect-mirror-maker.properties 

# specify any number of cluster aliases
clusters = A, B

# connection information for each cluster
# This is a comma separated host:port pairs for each cluster
# for e.g. "A_host1:9092, A_host2:9092, A_host3:9092"
A.bootstrap.servers = localhost:9092
B.bootstrap.servers = localhost:9093

# enable and configure individual replication flows
A->B.enabled = true

# regex which defines which topics gets replicated. For eg "foo-.*"
A->B.topics = quickstart-events.*

A->B.groups = quickstartGroup.*

# Setting replication factor of newly created remote topics
replication.factor=2

############################# Internal Topic Settings  #############################
# The replication factor for mm2 internal topics "heartbeats", "B.checkpoints.internal" and
# "mm2-offset-syncs.B.internal"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
checkpoints.topic.replication.factor=1
heartbeats.topic.replication.factor=1
offset-syncs.topic.replication.factor=1

# The replication factor for connect internal topics "mm2-configs.B.internal", "mm2-offsets.B.internal" and
# "mm2-status.B.internal"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offset.storage.replication.factor=1
status.storage.replication.factor=1
config.storage.replication.factor=1

# customize as needed
# replication.policy.separator = _
# sync.topic.acls.enabled = false
# emit.heartbeats.interval.seconds = 5

groups.exclude = ''

replication.policy.class=ch.mawileo.kafka.mm2.PrefixlessReplicationPolicy


PS:我使用的是 Kafka 2.8

4

0 回答 0