所以Kafka Mirror Maker源代码有一个不错的readme.md。
根据您是直接运行 MM2 还是在 Kafka Connect 中运行,您的配置方式会有所不同。您直接说,在链接的readme.md中。
基本上:
默认情况下,复制的主题根据“源集群别名”重命名:
topic-1 --> source.topic-1
这可以通过覆盖 replication.policy.separator 属性来定制(默认是句点)。如果您需要对远程主题的定义方式进行更多控制,您可以实现自定义 ReplicationPolicy 并覆盖 replication.policy.class(默认为 DefaultReplicationPolicy)。
不幸的是,这意味着您不能仅通过配置代码重命名主题。(DefaultReplicationPolicy 类只允许您指定分隔符,不能指定其他内容)。这可能是因为当您指定要镜像的主题时,您使用的是正则表达式,而不是单个主题名称(即使您的源集群主题配置属性只是主题的名称 - 它仍然被视为正则表达式)。
所以,回到文档: ReplicationPolicy
是 Kafka 连接源代码中的一个 Java 接口,因此您需要实现一个自定义 Java 类,ReplicationPolicy
然后在运行 MM2 时确保它位于类路径上。
让我们假设您确实编写了这样一个类并且您调用它com.moffatt.kafka.connect.mirror.FooReplicationPolicy
。您的课程的一个好的模板是 Kafka Connect 附带的默认(显然是唯一的)复制策略类:DefaultReplicationPolicy
。您可以看到构建自己的并不会太困难。您可以轻松添加一个 Map - 无论是硬编码的还是配置的 - 查找特定的配置主题名称并将其映射到目标主题名称。
您可以通过在配置中将新类指定为:
replication.policy.class = com.moffatt.kafka.connect.mirror.FooReplicationPolicy