1

Kafka Consumer API Committed 返回空偏移量。不知道出了什么问题。

val partitions = new util.HashSet[TopicPartition]

for (partitionInfo <- consumer.partitionsFor(topic)) {
  partitions.add(new TopicPartition(partitionInfo.topic, partitionInfo.partition))
}

consumer.assign(partitions)

val latestOffset= consumer.committed(partitions)

println ("LatestOffset Object "+ latestOffset)

val map = latestOffset.map{ case(t, o) => t.partition() -> o.offset()}

打印输出:

{partition-1=null, partition-0=null}

最终结果我想得到一个分区图->偏移量{"0"->200,"1"->100}

为什么 LatestOffset 对象偏移元数据显示为空?如果我使用 consumer.position 它可以工作。

使用卡夫卡客户端:

<dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>2.4.1</version>
    </dependency>
4

1 回答 1

2

它之所以显示,是null因为您的消费者从未消费过,因此从未提交任何偏移量。

让您的消费者使用来自指定主题的消息,提交它们,然后一些值将从该committed方法中显示出来。

于 2020-10-28T20:12:58.890 回答