1

尝试将此模式 id 转换 SMT ( https://github.com/OneCricketeer/schema-registry-transfer-smt ) 应用到镜像制造商集群中,但它无法识别 SMT。如果我将 SMT 用作 kafka 连接工作程序配置,它就可以工作。

我们在 Kafka 2.4/Confluent 5.4 上并使用 MM2 进行复制。

plugin.path=/opt/connectors
transforms=avroSchemaTransfer

transforms.avroSchemaTransfer.src.schema.registry.url=<source schema registry>
transforms.avroSchemaTransfer.dest.schema.registry.url=<Destination schema registry>

transforms.avroSchemaTransfer.avro.topics=mytopic:false
transforms.avroSchemaTransfer.type=cricket.jmoore.kafka.connect.transforms.SchemaRegistryTransfer

错误 :

org.apache.kafka.connect.runtime.rest.errors.BadRequestException: Connector configuration is invalid and contains the following 2 error(s):
Missing required configuration "transforms.avroSchemaTransfer.type" which has no default value.
Invalid value null for configuration transforms.avroSchemaTransfer.type: Not a Transformation
You can also find the above list of errors at the endpoint `/{connectorType}/config/validate`
    at org.apache.kafka.connect.runtime.AbstractHerder.maybeAddConfigErrors(AbstractHerder.java:560)
    at org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$700(DistributedHerder.java:125)
    at org.apache.kafka.connect.runtime.distributed.DistributedHerder$6.call(DistributedHerder.java:745)
    at org.apache.kafka.connect.runtime.distributed.DistributedHerder$6.call(DistributedHerder.java:742)
    at org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:342)
    at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:282)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
4

1 回答 1

0

必须使用 MirrorSourceConnector 连接器配置转换属性,并且可以如下实现。

us-west->us-east.transforms=avroSchemaTransfer

us-west->us-east.transforms.avroSchemaTransfer.src.schema.registry.url=<source schema registry>
us-west->us-east.transforms.avroSchemaTransfer.dest.schema.registry.url=<Destination schema registry>

us-west->us-east.transforms.avroSchemaTransfer.avro.topics=mytopic:false
us-west->us-east.transforms.avroSchemaTransfer.type=cricket.jmoore.kafka.connect.transforms.SchemaRegistryTransfer
于 2020-05-12T06:05:15.930 回答