0

我在 AWS MSK 上有两个 Kafka 集群(在相同的环境和区域中)。我在目标集群上设置了一个 KafkaConnect 集群,并设置了一个镜像制造商连接器来运行。连接器的提交很好,没有错误。

当我尝试检查连接器的状态时,它说,正在运行: {"name":"mirror-maker-test-connector","connector":{"state":"RUNNING","worker_id":"<ip>:<port>"},"tasks":[task_list],"type":"source"}

我看到以下异常:

[2022-01-12 19:46:33,772] DEBUG [Producer clientId=connector-producer-mirror-maker-test-connector-0] Connection with b-2.<broker_ip> disconnected (org.apache.kafka.common.network.Selector)
java.io.EOFException
    at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:120)
    at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:452)
    at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:402)
    at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:674)
    at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:576)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:481)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:551)
    at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:328)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243)
    at java.base/java.lang.Thread.run(Thread.java:829)
[2022-01-12 19:46:33,773] DEBUG [Producer clientId=connector-producer-mirror-maker-test-connector-0] Node -1 disconnected. (org.apache.kafka.clients.NetworkClient)
[2022-01-12 19:46:33,773] WARN [Producer clientId=connector-producer-mirror-maker-test-connector-0] Bootstrap broker b-2.<broker_ip>:9094 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)

我能够netcat从 Kafka Connect k8s pod 中连接到指定的代理。

以前有人遇到过这个问题吗?

4

1 回答 1

1

我让它工作 - 在提交 Mirror Maker 连接器时,必须为消费者和生产者添加 SSL 属性。

"target.cluster.security.protocol": "SSL", 
"target.cluster.ssl.truststore.location":"<certs_path>",
"target.cluster.ssl.truststore.password": "<password>"
"source.cluster.security.protocol": "SSL",
"source.cluster.ssl.truststore.location": "<certs_path>",
"source.cluster.ssl.truststore.password": "<password>"
于 2022-01-13T16:36:41.237 回答