我有一个 Kafka 连接设置正在运行,其中源连接器从文本文件中读取结构化记录并以 JSON 格式(带有模式)存储到主题中。有一个接收器连接器正在运行,它将这些消息插入到 Cassandra 表中。虽然此设置运行良好,但我需要引入另一个接收器连接器来将这些消息也传输到 HDFS。所以我尝试实现 HDFSSinkConnector (CP 3.0)。但是这个连接器希望消息是 AVRO 格式的,因此会抛出诸如“无法将数据反序列化到 Avro”之类的错误。
有没有办法让我可以将 JSON 消息从源主题复制并转换为 Avro 格式的另一个主题,并将 HDFS 接收器连接器指向要读取的新主题?可以使用 Kafka Streams 完成吗?
我的分布式连接配置文件包含——
...
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
...
我在主题中的信息如下 -
{"schema":{"type":"struct",
"fields":[{"type":"string","optional":false,"field":"id"},
{"type":"string","optional":false,"field":"name"},
{"type":"integer","optional":false,"field":"amount"}
],
"optional":false,
"name":"myrec",
"version":1
},
"payload":{"id":"A123","name":"Sample","amount":75}
}
谁可以帮我这个事?提前感谢...