2

我需要将集群 A 上的主题的记录镜像到集群 B 上的主题,同时在记录中添加一个字段,因为它们被代理(例如InsertField

我没有控制集群 A(但可能需要更改)并且完全控制集群 B。

我知道集群 A 正在发送序列化的 JSON。

我正在使用MirrorMaker API和 Kafka 连接来进行镜像,并且我正在尝试使用InsertField转换在记录中添加数据,因为它们被代理。

我的配置如下所示:

connector.class=org.apache.kafka.connect.mirror.MirrorSourceConnector
topics=.*
source.cluster.alias=upstream
source.cluster.bootstrap.servers=source:9092
target.cluster.bootstrap.servers=target:9092

# ByteArrayConverter to avoid MirrorMaker to re-encode messages
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter


transforms=InsertSource1
transforms.InsertSource1.type=org.apache.kafka.connect.transforms.InsertField$Value
transforms.InsertSource1.static.field=test_inser
transforms.InsertSource1.static.value=test_value
name=somerandomname

此代码将失败并显示错误说明:

org.apache.kafka.connect.errors.DataException:[字段插入]仅支持结构对象

有没有办法在不编写自定义转换的情况下实现这一点(我使用的是 Python,我不熟悉 Java)

非常感谢

4

2 回答 2

3

在当前版本Apache Kafka(2.6.0) 中,您无法将 InsertField单消息转换 (SMT) 应用于MirrorMaker 2.0记录。

解释

MirrorMaker 2.0基于Kafka Connect框架,并在内部由 MirrorMaker 2.0 驱动程序设置MirrorSourceConnector

源连接器在轮询记录后立即应用 SMT (在此步骤中没有转换器(例如ByteArrayConverterJsonConverter):它们在应用SMT使用)。

SourceRecord表示为带有BYTES_SCHEMA schema的字节数组。同时InsertField转换需要 Type.STRUCT带有模式的记录。

因此,由于无法将记录确定为Struct,因此不应用转换。

参考

  1. KIP-382:MirrorMaker 2.0
  2. 如何在 Kafka Connect 中使用单个消息转换

其他资源

  1. MirrorMaker 2.0 的 Docker-compose 游乐场
于 2020-12-06T14:28:00.417 回答
0

如评论所述,字节数组转换器没有结构/架构信息,因此无法使用您正在使用的转换(添加字段)。

然而,这并不意味着不能使用任何变换


如果您要发送 JSON 消息,则必须发送架构和有效负载信息。

于 2020-11-29T00:28:45.463 回答