我将使用 Kafka MirrorMaker 2 Source Connector,因此我尝试将它部署在我的沙盒 Kafka 集群上(由 Confluent docker compose构建)。
- 我创建了一个
my_test_topic
以 JSON 格式存储值的主题。并配置MirrorSourceConnector
为将其复制到同一个 Kafka 集群中。目标主题source_cluster.my_test_topic
是自动创建的。这是连接器配置:
{
"name": "source-to-target-mm2-source-connector",
"config": {
"connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"enabled": true,
"topics": "my_test_topic",
"source.cluster.alias": "source_cluster",
"target.cluster.alias": "target_cluster",
"source.cluster.bootstrap.servers": "broker:29092",
"target.cluster.bootstrap.servers": "broker:29092"
}
}
- 我通过kafkacat将数据放入源主题:
kafkacat -P -b broker:29092 -t my_test_topic
> {"item":[{"price":"100"},{"price":"120"}],"another_item":[{"height":"50","width":"60"}]}
- 当我尝试从复制的主题(又名目标主题)中消费数据时,我得到以下结果,我认为它是 base64 格式:
kafkacat -C -b broker:29092 -t source_cluster.my_test_topic -o beginning -e
> "eyJpdGVtIjpbeyJwcmljZSI6IjEwMCJ9LHsicHJpY2UiOiIxMjAifV0sImFub3RoZXJfaXRlbSI6W3siaGVpZ2h0IjoiNTAiLCJ3aWR0aCI6IjYwIn1dfQ=="
这显然不是我所期望的。所以我也尝试了其他一些选择:
- 将值转换器类更改为目标主题中的
org.apache.kafka.connect.storage.StringConverter
消费值[B@3d5e46ef
- 将值转换器类更改为并最终消耗来自目标主题
org.apache.kafka.connect.converters.ByteArrayConverter
的期望值{"item":[{"price":"100"},{"price":"120"}],"another_item":[{"height":"50","width":"60"}]}
JSONConverter和StringConverter不工作而ByteArrayConverter工作的原因是什么?
此外,我尝试了其他一些连接器配置选项:
"source.cluster.consumer.value-deserializer": "org.apache.kafka.common.serialization.StringDeserializer"
使用StringConverter"source.cluster.consumer.value-deserializer": "org.apache.kafka.common.serialization.ByteArrayDeserializer"
使用StringConverter
但这些都不起作用,结果输出类似于[B@5d6b605c
.
在这种情况下配置是否"source.cluster.consumer.value-deserializer"
有任何影响?
同时,当我使用FileStreamSourceConnector从文件中读取 JSON 时,StringConverter和JSONConverter都可以正常工作,而ByteArrayConverter会抛出错误Invalid schema type for ByteArrayConverter: STRING
。