1

我想将 MirrorMaker 作为独立连接器运行。到目前为止,我还没有找到任何有关配置的文档。据我想象,以下内容会复制myTopic.

现在在目标集群中,我需要主题有另一个名称foo(不是自动重命名)。这是直接支持MirrorSourceConnector还是我需要其他方法?

connector.class = org.apache.kafka.connect.mirror.MirrorSourceConnector
tasksMax = 2
topics = myTopic
source.cluster.bootstrap.servers = sourceHost:9092
target.cluster.bootstrap.servers = sinkHost:9092
4

1 回答 1

4

所以Kafka Mirror Maker源代码有一个不错的readme.md

根据您是直接运行 MM2 还是在 Kafka Connect 中运行,您的配置方式会有所不同。您直接说,在链接的readme.md中。

基本上:

默认情况下,复制的主题根据“源集群别名”重命名:

topic-1 --> source.topic-1

这可以通过覆盖 replication.policy.separator 属性来定制(默认是句点)。如果您需要对远程主题的定义方式进行更多控制,您可以实现自定义 ReplicationPolicy 并覆盖 replication.policy.class(默认为 DefaultReplicationPolicy)。

不幸的是,这意味着您不能仅通过配置代码重命名主题。(DefaultReplicationPolicy 类只允许您指定分隔符,不能指定其他内容)。这可能是因为当您指定要镜像的主题时,您使用的是正则表达式,而不是单个主题名称(即使您的源集群主题配置属性只是主题的名称 - 它仍然被视为正则表达式)。

所以,回到文档: ReplicationPolicy是 Kafka 连接源代码中的一个 Java 接口,因此您需要实现一个自定义 Java 类,ReplicationPolicy然后在运行 MM2 时确保它位于类路径上。

让我们假设您确实编写了这样一个类并且您调用它com.moffatt.kafka.connect.mirror.FooReplicationPolicy。您的课程的一个好的模板是 Kafka Connect 附带的默认(显然是唯一的)复制策略类:DefaultReplicationPolicy。您可以看到构建自己的并不会太困难。您可以轻松添加一个 Map - 无论是硬编码的还是配置的 - 查找特定的配置主题名称并将其映射到目标主题名称。

您可以通过在配置中将新类指定为: replication.policy.class = com.moffatt.kafka.connect.mirror.FooReplicationPolicy

于 2020-12-02T20:20:05.803 回答