0

我将使用 Kafka MirrorMaker 2 Source Connector,因此我尝试将它部署在我的沙盒 Kafka 集群上(由 Confluent docker compose构建)。

  1. 我创建了一个my_test_topic以 JSON 格式存储值的主题。并配置MirrorSourceConnector为将其复制到同一个 Kafka 集群中。目标主题source_cluster.my_test_topic是自动创建的。这是连接器配置:
{
  "name": "source-to-target-mm2-source-connector",
  "config": {
    "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "enabled": true,
    "topics": "my_test_topic",
    "source.cluster.alias": "source_cluster",
    "target.cluster.alias": "target_cluster",
    "source.cluster.bootstrap.servers": "broker:29092",
    "target.cluster.bootstrap.servers": "broker:29092"
  }
}
  1. 我通过kafkacat将数据放入源主题:
kafkacat -P -b broker:29092 -t my_test_topic

> {"item":[{"price":"100"},{"price":"120"}],"another_item":[{"height":"50","width":"60"}]}
  1. 当我尝试从复制的主题(又名目标主题)中消费数据时,我得到以下结果,我认为它是 base64 格式:
kafkacat -C -b broker:29092 -t source_cluster.my_test_topic -o beginning -e

> "eyJpdGVtIjpbeyJwcmljZSI6IjEwMCJ9LHsicHJpY2UiOiIxMjAifV0sImFub3RoZXJfaXRlbSI6W3siaGVpZ2h0IjoiNTAiLCJ3aWR0aCI6IjYwIn1dfQ=="

这显然不是我所期望的。所以我也尝试了其他一些选择:

  • 将值转换器类更改为目标主题中的org.apache.kafka.connect.storage.StringConverter消费值[B@3d5e46ef
  • 将值转换器类更改为并最终消耗来自目标主题org.apache.kafka.connect.converters.ByteArrayConverter的期望值{"item":[{"price":"100"},{"price":"120"}],"another_item":[{"height":"50","width":"60"}]}

JSONConverterStringConverter不工作而ByteArrayConverter工作的原因是什么?

此外,我尝试了其他一些连接器配置选项:

  • "source.cluster.consumer.value-deserializer": "org.apache.kafka.common.serialization.StringDeserializer"使用StringConverter
  • "source.cluster.consumer.value-deserializer": "org.apache.kafka.common.serialization.ByteArrayDeserializer"使用StringConverter

但这些都不起作用,结果输出类似于[B@5d6b605c.

在这种情况下配置是否"source.cluster.consumer.value-deserializer"有任何影响?


同时,当我使用FileStreamSourceConnector从文件中读取 JSON 时,StringConverterJSONConverter都可以正常工作,而ByteArrayConverter会抛出错误Invalid schema type for ByteArrayConverter: STRING

4

0 回答 0