12

我在使用 Confluent JDBC 连接器时遇到了非常奇怪的行为。我很确定它与 Confluent 堆栈无关,而是与 Kafka-connect 框架本身有关。

因此,我将offset.storage.file.filename属性定义为默认值/tmp/connect.offsets并运行我的接收器连接器。显然,我希望连接器在给定文件中保留偏移量(它在文件系统上不存在,但应该自动创建,对吧?)。文档说:

offset.storage.file.filename 用于存储连接器偏移量的文件。通过将偏移量存储在磁盘上,可以在单个节点上停止和启动独立进程,并从之前停止的位置恢复。

但是卡夫卡的行为方式完全不同。

  1. 它检查给定文件是否存在。
  2. 它不是,Kafka 只是忽略它并在 Kafka 主题中保留偏移量。
  3. 如果我手动创建给定文件,则无论如何读取都会失败(EOFException)并且偏移量会再次保留在主题中。

这是一个错误,还是更有可能是我不明白如何使用这种配置?我了解两种持久偏移量的方法之间的区别,并且文件存储更适合我的需求。

4

2 回答 2

7

offset.storage.file.filename仅在连接器中使用。它用于在输入数据源上放置一个书签并记住它停止读取它的位置。创建的文件包含文件行号(对于文件源)或表行号(对于 jdbc 源或一般数据库)。

在分布式模式下运行 Kafka Connect 时,此文件将替换为默认命名的 Kafka 主题,该主题connect-offsets应被复制以容忍故障。

接收器连接器而言,无论使用哪种插件或模式(独立/分布式),它们都将上次停止读取输入主题的位置存储在一个名为__consumer_offsets任何 Kafka 消费者的内部主题中。这允许使用传统工具,如kafka-consumer-groups.sh命令行工具,接收器连接器的滞后程度。

Confluent Kafka 复制器尽管是源连接器,但可能是一个例外,因为它从远程 Kafka 读取,并且可能使用 Kafka 消费者。

我同意文档不清楚,无论连接器类型是什么(源或接收器)都需要此设置,但它仅由源连接器使用。这个设计决策背后的原因是单个 Kafka Connect 工作程序(我的意思是单个 JVM 进程)可以运行多个连接器,可能包括源连接器和接收器连接器。换句话说,这个设置是工人级别的设置,而不是连接器的设置。

于 2018-09-08T20:18:45.023 回答
1

该属性offset.storage.file.filename仅适用于以独立模式运行的源连接器的工作人员。如果您在某个源的 Kafka 主题中看到 Kafka 持久偏移量,则说明您在分布式模式下运行。您应该使用提供的脚本启动连接器connect-standalone这里有不同模式的描述。在不同模式下运行的说明在这里

于 2017-02-07T13:55:39.647 回答