我需要将集群 A 上的主题的记录镜像到集群 B 上的主题,同时在记录中添加一个字段,因为它们被代理(例如InsertField
)。
我没有控制集群 A(但可能需要更改)并且完全控制集群 B。
我知道集群 A 正在发送序列化的 JSON。
我正在使用MirrorMaker API和 Kafka 连接来进行镜像,并且我正在尝试使用InsertField
转换在记录中添加数据,因为它们被代理。
我的配置如下所示:
connector.class=org.apache.kafka.connect.mirror.MirrorSourceConnector
topics=.*
source.cluster.alias=upstream
source.cluster.bootstrap.servers=source:9092
target.cluster.bootstrap.servers=target:9092
# ByteArrayConverter to avoid MirrorMaker to re-encode messages
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
transforms=InsertSource1
transforms.InsertSource1.type=org.apache.kafka.connect.transforms.InsertField$Value
transforms.InsertSource1.static.field=test_inser
transforms.InsertSource1.static.value=test_value
name=somerandomname
此代码将失败并显示错误说明:
org.apache.kafka.connect.errors.DataException:[字段插入]仅支持结构对象
有没有办法在不编写自定义转换的情况下实现这一点(我使用的是 Python,我不熟悉 Java)
非常感谢