有没有办法根据偏移量使用来自 Kafka 主题的消息。我的意思是我有一个之前在主题中发布的偏移量 ID。现在我需要根据我传递的偏移量 Id 从主题中获取消息。
2 回答
0
只需使用带有所需参数的 Kafka 消费者,例如
- 引导服务器:(逗号分隔的服务器名称:端口号)
- 主题:(主题名称)
- 分区:(分区号)
- 偏移量:(偏移值)
- max-messages :(要消耗的最大消息数)
sh kafka-console-consumer.sh --bootstrap-server server1:9092,server2:9092,server3:9092 --topic test_topic --partition 0 --offset 43212345 --max-messages 1
于 2021-10-20T07:13:18.207 回答
0
使用 Java Kafka 消费者库,但是您还必须知道分区号。
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String (properties);
long desiredOffset = 10000;
TopicPartition partition = new TopicPartition("some-topic", 0);
consumer.assign(Arrays.asList(partition));
consumer.seek(partition, desiredOffset);
bool found= false;
while(found != true){
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for(ConsumerRecord<String,String> record: records){
if(record.offset() == desiredOffset){
System.out.println(record)
found= true;
break;
}
}
}
consumer.close();
需要考虑的事情是,可以根据 Kafka 主题中的清理策略配置删除具有所需偏移量的记录。请记住 Kafka 是一个流平台。只有在调试时才按偏移量读取消息。
于 2021-10-20T22:54:48.357 回答