0

我正在运行 kafka mirror maker 2 的一些用例。我现在让它以分布式模式运行,并且复制“似乎”正在运行,但目标集群上的结果与源不匹配。

我的分布式道具文件有以下内容。我的分布式道具文件有我的目标集群。我很确定这是正确的,因为当我将它设置为源集群时,它会在源集群上创建复制主题

bootstrap.servers=**target-cluster:9092**
name=mm2-distributor
group.id=mm2-connect-cluster
client.id=mm2-client-id
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
...
...
rest.port=8083

我的 source-connector.json 文件具有以下内容:

{
    "name": "mm2-connect-cluster",
    "config":{
    "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
        "name": "mm2-connect-cluster",
        "group.id": "mm2-connect-cluster",
        "client.id": "mm2-client-id",
        "topics": "test.*",
        "tasks.max": "8",
        "source.cluster.alias": "source",
        "target.cluster.alias": "target",
        "source.cluster.bootstrap.servers": "source-cluster:9092",
        "target.cluster.bootstrap.servers": "target-cluster:9092",
        "source->target.enabled": "true",
        "target->source.enabled": "false",
        "offset-syncs.topic.replication.factor": "4",
        "topics.blacklist": ".*[\\-\\.]internal, .*\\.replica, __consumer_offsets",
        "groups.blacklist": "console-consumer-.*, connect-.*, __.*",
        "topic.creation.enabled": "true",
        "topic.creation.default.replication.factor": "4",
        "topic.creation.default.partitions": "25"
    }
}

在开始分发器和 REST 调用之后,我开始向该test主题发送一些测试消息。源代码如下所示:

消息@来源

目标端消息与源不匹配,如下所示:

消息@目标

我很好奇这是否是预期的结果,如果是这样,我将如何让它正确显示消息?

谢谢

4

1 回答 1

0

并非您的所有消息都是 JSON,因此这表明您使用了错误的转换器

如果您希望按原样传输消息,则不需要转换,因此您应该将 BytesConverter 用于键和值

您看到“模式”字段的原因是因为您设置了schemas.enable=true. 但是,如果您展开列并查看“有效负载”,您应该会看到它们确实匹配

于 2021-04-03T15:47:53.137 回答