1

librdkafka 包含rd_kafka_position获取给定主题分区的当前偏移量的函数。但评论说:

The \p offset field of each requested partition will be set to the offset
of the last consumed message + 1, or RD_KAFKA_OFFSET_INVALID in case there was
no previous message.

换句话说,如果还没有消息被消费,它不会给你任何有用的信息。

我对我刚刚订阅了一个主题的情况感兴趣,并且我已经调用rd_kafka_seek过:

  1. 寻找已知位置(在错误恢复的情况下),或
  2. 寻找到分区的最后。

在这种情况下,我想知道的是,如果要消费一条消息,则下一条消息的偏移量是多少。换句话说,在第一种情况下,它应该与传递给 的偏移量相同rd_kafka_seek,而在第二种情况下,它应该是 1 加上rd_kafka_seek调用时分区中的最后一条消息的偏移量。

不幸的是,正如评论所说,rd_kafka_position不返回此信息。如果还没有消息被消费,它给出-1001( RD_KAFKA_OFFSET_INVALID)。如果我使用一条消息然后调用rd_kafka_position,它会给出正确的偏移量。

在使用任何消息之前,我可以调用其他一些函数来获取偏移量吗?

4

1 回答 1

0

我不确定你在追求什么......“偏移”是消费者特定的东西,在大多数情况下(除了我在下面提到的两种情况)。它跟踪每个主题/分区的每个特定消费者的读取进度,如果该消费者尚未完成读取 - 该主题/分区还没有特定于消费者的偏移量。因此,在这种情况下,要求此消费者的偏移量没有任何意义 - 消费者尚未读取任何内容,因此没有与之关联的偏移量,它可以从您希望它开始的任何偏移量开始。

与消费者无关的偏移量有用的两种主要情况是:

  • 当您根据消息的时间或您在应用程序中的一些自定义错误记录/报告知道要开始处理的主题中的哪些偏移量时
  • 或者当您想从主题中的 EARLIEST 或 LATEST 可用偏移量开始时

如果您知道您希望消费者从哪个位置开始读取 - 您只需寻找该位置并让您的消费者从那时起开始消费消息。然后你可以通过询问它在任何时间点的偏移量来跟踪这个消费者的进度......

如果你想从最早或最新的位置开始 - 你可以找出那个位置是什么(使用 KAfkaAdminClient.listOffsets(),例如,在 2.5.x 版本中 - 在 Java 中,我不知道是什么Python 中的等效方法),然后再次寻找该位置并从中开始消费者。

因此,简而言之,如果消费者从主题中读取了任何内容,您只能期望获得正确的偏移量;否则 - 唯一与消费者无关的有意义信息将是您确定的最早、最新或某些特定(已知)偏移量

于 2020-07-26T00:25:02.833 回答