9

我正在尝试在 2 个集群之间设置复制,但不希望更改主题名称。例如,如果我有一个名为“some_topic”的主题,它会自动复制到“cluster1.some_topic”,我很确定这可以完成,但还没有找到正确的配置来更改它

我当前的配置“mirrormaker2.properties”

# Sample MirrorMaker 2.0 top-level configuration file
# Run with ./bin/connect-mirror-maker.sh connect-mirror-maker.properties 

# specify any number of cluster aliases
clusters = cluster1, cluster2

# connection information for each cluster
cluster1.bootstrap.servers = host1:9092,host2:9092,host3:9092
cluster2.bootstrap.servers = rep_host1:9092,rep_host2:9092,rep_host3:9092

# enable and configure individual replication flows
cluster1->cluster2.enabled = true
cluster1->cluster2.topics = sometopic.*

# customize as needed
# replication.policy.separator = _
# sync.topic.acls.enabled = false
# emit.heartbeats.interval.seconds = 5

以供参考:

4

6 回答 6

13

要“禁用”主题前缀并同时正确镜像主题属性,我必须提供一个自定义的复制策略,该策略也覆盖该topicSource方法。否则非默认主题属性(例如,"cleanup.policy=compact")没有被镜像,即使在重新启动镜像制造商之后也是如此。

这是对我有用的完整程序:

  1. 将以下自定义复制策略编译并打包到 .jar 文件中(完整的源代码可以在这里找到):
public class PrefixlessReplicationPolicy extends DefaultReplicationPolicy {

  private static final Logger log = LoggerFactory.getLogger(PrefixlessReplicationPolicy.class);

  private String sourceClusterAlias;

  @Override
  public void configure(Map<String, ?> props) {
    super.configure(props);
    sourceClusterAlias = (String) props.get(MirrorConnectorConfig.SOURCE_CLUSTER_ALIAS);
    if (sourceClusterAlias == null) {
      String logMessage = String.format("Property %s not found", MirrorConnectorConfig.SOURCE_CLUSTER_ALIAS);
      log.error(logMessage);
      throw new RuntimeException(logMessage);
    }
  }

  @Override
  public String formatRemoteTopic(String sourceClusterAlias, String topic) {
    return topic;
  }

  @Override
  public String topicSource(String topic) {
    return topic == null ? null : sourceClusterAlias;
  }

  @Override
  public String upstreamTopic(String topic) {
    return null;
  }
}
  1. 将 .jar 复制到${KAFKA_HOME/libs目录中
  2. 通过设置以下replication.policy.class属性,将 Mirror Maker 2 配置为使用该复制策略${KAFKA_HOME}/config/mm2.properties
  replication.policy.class=ch.mawileo.kafka.mm2.PrefixlessReplicationPolicy
于 2020-03-10T14:03:47.903 回答
8

我能够使用此设置删除前缀:

"replication.policy.separator": ""
"source.cluster.alias": "",
"target.cluster.alias": "",

如果您的情况需要别名设置,我知道您应该使用其他 replicationPolicy 类。默认情况下使用 DefaultReplicationPolicy 类(https://kafka.apache.org/24/javadoc/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.html

于 2020-01-20T11:26:33.480 回答
5

我认为上面的答案是不恰当的。

在 Mirror Maker 2.0 中,如果要保持主题不变,则必须实现 ReplicationPolicy。

您可以参考 DefaultReplicationPolicy.class,然后覆盖formatRemoteTopic(),之后您必须删除sourceClusterAlias + separator。最后在配置replication.policy.classmm2.properties

我定义了MigrationReplicationPolicy.class

replication.policy.class = org.apache.kafka.connect.mirror.MigrationReplicationPolicy

你应该看看MirrorClientConfig,class,我知道你会明白的

于 2020-02-17T06:37:14.000 回答
3

使用 Kafka ConfluentINC 连接器映像版本 5.4.2 管理推送复制属性是:

connector.class=org.apache.kafka.connect.mirror.MirrorSourceConnector
target.cluster.alias= 
replication.factor=3
tasks.max=3
topics=.*
source.cluster.alias= 
target.cluster.bootstrap.servers=<broker1>,<broker2>,<broker3>
replication.policy.separator= 
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
source.cluster.bootstrap.servers=<broker1>,<broker2>,<broker3>
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter

1)在3个参数后面留空:source.cluster.alias、replication.policy.separator、target.cluster.alias。

2) 在目标 Kafka 上设置此镜像连接器,而不是在源上(仅执行拉取)

此外,您还可以使用 Conductor 或 Kafka Connector UI Landoop 图像 -landoop/kafka-connect-ui

这仍处于测试场景中,但看起来很有希望。

于 2020-06-11T08:14:25.633 回答
2

从 Kafka 3.0.0 开始,设置就足够了

replication.policy.class=org.apache.kafka.connect.mirror.IdentityReplicationPolicy

此外,marcin-wieloch 的答案https://stackoverflow.com/a/60619233/12008693中的 PrefixlessReplicationPolicy不再适用于 3.0.0 (NullPointerException)。

于 2022-02-05T00:20:50.620 回答
1

我正在尝试在 2 个集群之间设置复制,但在两个集群中都需要相同的主题名称,而无需在 connect-mirror-maker.properties 中提供别名。

默认情况下,复制的主题会根据源集群别名进行重命名。

    Source --> Target
    topic-1 --> source.topic-1

您可以通过在连接器属性文件下将以下属性设置为空白来避免重命名主题。默认情况下,replication.policy.separator 属性是一个句点,然后通过将其与 source.cluster.alias 一起设置为空白,目标主题将与源主题具有相同的名称。

replication.policy.separator=
source.cluster.alias=
target.cluster.alias=

于 2021-09-07T10:06:45.863 回答