我正在尝试使用教程中指定的Replicator 将模式注册表从本地集群迁移到 Confluent Cloud 。它设法复制了除少数之外的所有主题,我不知道为什么......它吐出以下错误:
ERROR Failed to translate schema registry record (io.confluent.connect.replicator.schemas.SchemaTranslator:188)
io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Error while registering schema; error code: 500; error code: 500
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:292)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:352)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:495)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:486)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:481)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:212)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:267)
at io.confluent.connect.replicator.schemas.SchemaTranslator.translateSchema(SchemaTranslator.java:211)
at io.confluent.connect.replicator.schemas.SchemaTranslator.translateSchemas(SchemaTranslator.java:166)
at io.confluent.connect.replicator.schemas.SchemaTranslator.translateCollectedRecords(SchemaTranslator.java:135)
at io.confluent.connect.replicator.ReplicatorSourceTask.translateCollectedRecords(ReplicatorSourceTask.java:566)
at io.confluent.connect.replicator.ReplicatorSourceTask.poll(ReplicatorSourceTask.java:558)
at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:289)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:256)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
它无法复制的模式看起来与它设法复制的模式没有任何不同。
我该如何调试它?
我正在使用 Confluent 平台 6.0.0。
connect-standalone.properties:
#value.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter you want to apply
# it to
key.converter.schemas.enable=false
value.converter.schemas.enable=false
# The internal converter used for offsets and config data is configurable and must be specified, but most users will
# always want to use the built-in default. Offset and config data is never visible outside of Kafka Connect in this format.
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
# Store offsets on local filesystem
offset.storage.file.filename=/tmp/connect.offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=5000
ssl.endpoint.identification.algorithm=https
sasl.mechanism=PLAIN
request.timeout.ms=20000
retry.backoff.ms=500
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="****" password="****";
security.protocol=SASL_SSL
consumer.ssl.endpoint.identification.algorithm=https
consumer.sasl.mechanism=PLAIN
consumer.request.timeout.ms=20000
consumer.retry.backoff.ms=500
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="****" password="****";
consumer.security.protocol=SASL_SSL
consumer.offset.start=earliest
producer.ssl.endpoint.identification.algorithm=https
producer.sasl.mechanism=PLAIN
producer.request.timeout.ms=20000
# x4
producer.retry.backoff.ms=500
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="****" password="****";
producer.security.protocol=SASL_SSL
config.storage.replication.factor=1
offset.storage.replication.factor=1
istatus.storage.replication.factor=1
quickstart-replicator.properties:
# basic connector configuration
name=replicator-v28
connector.class=io.confluent.connect.replicator.ReplicatorSourceConnector
key.converter=io.confluent.connect.replicator.util.ByteArrayConverter
value.converter=io.confluent.connect.replicator.util.ByteArrayConverter
header.converter=io.confluent.connect.replicator.util.ByteArrayConverter
tasks.max=4
# source cluster connection info
src.kafka.bootstrap.servers=localhost:9092
# destination cluster connection info
dest.kafka.ssl.endpoint.identification.algorithm=https
dest.kafka.sasl.mechanism=PLAIN
dest.kafka.request.timeout.ms=20000
dest.kafka.bootstrap.servers=<kafka-cloud-url>:9092
retry.backoff.ms=500
dest.kafka.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="****" password="****";
dest.kafka.security.protocol=SASL_SSL
# Schema Registry migration topics to replicate from source to destination
# topic.whitelist indicates which topics are of interest to replicator
topic.whitelist=_schemas
# schema.registry.topic indicates which of the topics in the ``whitelist`` contains schemas
schema.registry.topic=_schemas
# Connection settings for destination Confluent Cloud Schema Registry
schema.registry.url=https://<cloud-schema-registry-url>
schema.registry.client.basic.auth.credentials.source=USER_INFO
schema.registry.client.basic.auth.user.info=****:****
confluent.topic.replication.factor=3