3

有没有办法根据偏移量使用来自 Kafka 主题的消息。我的意思是我有一个之前在主题中发布的偏移量 ID。现在我需要根据我传递的偏移量 Id 从主题中获取消息。

4

2 回答 2

0

只需使用带有所需参数的 Kafka 消费者,例如

  • 引导服务器:(逗号分隔的服务器名称端口号)
  • 主题:(主题名称)
  • 分区:(分区号)
  • 偏移量:(偏移值)
  • max-messages :(要消耗的最大消息数)

sh kafka-console-consumer.sh --bootstrap-server server1:9092,server2:9092,server3:9092 --topic test_topic --partition 0 --offset 43212345 --max-messages 1

于 2021-10-20T07:13:18.207 回答
0

使用 Java Kafka 消费者库,但是您还必须知道分区号。

KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String (properties);
long desiredOffset = 10000;
TopicPartition partition = new TopicPartition("some-topic", 0);
consumer.assign(Arrays.asList(partition));
consumer.seek(partition, desiredOffset);
bool found= false;
while(found != true){
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for(ConsumerRecord<String,String> record: records){
        if(record.offset() == desiredOffset){
            System.out.println(record)
            found= true;
            break;
        }
    }
}

consumer.close();

需要考虑的事情是,可以根据 Kafka 主题中的清理策略配置删除具有所需偏移量的记录。请记住 Kafka 是一个流平台。只有在调试时才按偏移量读取消息。

于 2021-10-20T22:54:48.357 回答