使用 3 个 Kafka 集群和一个相同的 Zookeeper 集群,我启动了一个分布式连接器节点。这个节点成功运行了一个任务。然后我提出了第二个连接器,这似乎是因为任务中的一些代码确实运行了。然而,它似乎并没有保持活动状态(尽管没有抛出任何错误,但由于缺乏预期的活动而观察到不活动状态,而第一个连接器继续正常运行)。当我在每个连接器节点上调用 URLhttp://localhost:8083/connectors/mqtt/tasks
时,它告诉我连接器有一个任务。我希望这是两个任务,每个节点/工作者一个。(目前工作人员配置说tasks.max = 1
,但我也尝试将其设置为 3。
当我尝试调出第三个连接器时,出现错误:
"POST /connectors HTTP/1.1" 500 90 5
(org.apache.kafka.connect.runtime.rest.RestServer:60)
ERROR IO error forwarding REST request:
(org.apache.kafka.connect.runtime.rest.RestServer:241)
java.net.ConnectException: Connection refused
尝试从 shell 再次调用连接器 POST 方法会返回错误:
{"error_code":500,"message":"IO Error trying to forward REST request:
Connection refused"}
我还尝试升级到今天发布的 Apache Kafka 0.10.1.1。我仍然看到问题。每个连接器都在由单个映像定义的隔离 Docker 容器上运行。它们应该是相同的。
问题可能是我试图http://localhost:8083/connectors
在每个工作人员上运行 POST 请求,而我只需要在单个工作人员上运行一次,然后该连接器的任务将自动分配给其他工作人员。如果是这种情况,我该如何分配任务?我目前将最大值设置为三个,但似乎只有一个在单个工作人员上运行。
更新
我最终使用 Yuri 建议的基本相同的方法让事情运行起来。我给每个工作人员一个唯一的组 ID,然后给每个连接器任务赋予相同的名称。这允许三个连接器及其单个任务共享一个偏移量,因此在接收器连接器的情况下,它们从 Kafka 消费的消息不会重复。它们基本上作为独立连接器运行,因为工作人员具有不同的组 ID,因此不会相互通信。
如果连接器工作人员具有相同的组 ID,则您不能添加多个具有相同名称的连接器。如果您为连接器指定不同的名称,它们将具有不同的偏移量并使用重复的消息。如果您在同一个组中有三个工作人员,一个连接器和三个任务,理论上您将有一个理想的情况,其中任务共享一个偏移量并且工作人员确保任务始终运行且分布良好(每个任务消耗一个唯一的集合的分区)。在实践中,连接器框架不会创建多个任务,即使 tasks.max 设置为 3 并且主题任务正在使用时有 25 个分区。
如果有人知道我为什么会看到这种行为,请告诉我。