我有一个类似于下面的代码:
KafkaConsumer<Long, String> kafkaConsumer = new KafkaConsumer<>(properties);
partitions = Collections.singletonList(new TopicPartition(topic, partition));
kafkaConsumer.assign(partitions);
kafkaConsumer.seekToBeginning(List.of(partition));
ConsumerRecords<Long, String> records = kafkaConsumer.poll(Duration.ofMillis(pollDuration));
运行代码时,我无法获取此特定主题中的所有消息。例如,如果我在这个分区中有 24 条记录,我只能得到 20 条记录。
这个问题并不总是发生。当我们在运行我的应用程序的同一台机器上使用 docker 制作 Kafka (wurstmeister/kafka:1.1.0) 时,我能够从该特定分区获取所有记录。
但是当我在另一台机器上制作相同的 docker-compose up 并连接到它时,就会发生这个问题。