0

我有一个类似于下面的代码:

KafkaConsumer<Long, String> kafkaConsumer = new KafkaConsumer<>(properties);
partitions = Collections.singletonList(new TopicPartition(topic, partition));
kafkaConsumer.assign(partitions);
kafkaConsumer.seekToBeginning(List.of(partition));

ConsumerRecords<Long, String> records = kafkaConsumer.poll(Duration.ofMillis(pollDuration));

运行代码时,我无法获取此特定主题中的所有消息。例如,如果我在这个分区中有 24 条记录,我只能得到 20 条记录。

这个问题并不总是发生。当我们在运行我的应用程序的同一台机器上使用 docker 制作 Kafka (wurstmeister/kafka:1.1.0) 时,我能够从该特定分区获取所有记录。

但是当我在另一台机器上制作相同的 docker-compose up 并连接到它时,就会发生这个问题。

4

1 回答 1

0

尝试增加fetch.min.bytes,并且可能fetch.max.wait.ms也增加。

fetch.min.bytes 服务器应为获取请求返回的最小数据量。如果可用数据不足,则请求将在响应请求之前等待累积那么多数据。1 字节的默认设置意味着只要有一个字节的数据可用或获取请求超时等待数据到达,就会响应获取请求。将此设置为大于 1 的值将导致服务器等待大量数据累积,这可以以一些额外的延迟为代价提高服务器吞吐量。

fetch.max.wait.ms 如果没有足够的数据立即满足 fetch.min.bytes 给出的要求,服务器将在响应 fetch 请求之前阻塞的最长时间。

于 2019-11-13T14:33:46.877 回答