我正在试验一个 Kafka 集群(3 个节点),我打算使用以下 kafka 客户端依赖项使用一个简单的 java 应用程序围绕冗余和可用性(停止集群中的节点等)运行一些测试:-
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>
我将复制因子配置为 3 以确保跨所有节点复制主题,并且我只为主题使用 1 个分区。我正在努力理解我在此示例代码中看到的一些行为,特别是在寻求偏移量时(一个节点离线):-
String topic = "test-topic";
TopicPartition partition = new TopicPartition(topic, 0);
List<TopicPartition> partitions = Collections.singletonList(partition);
while (true) {
Consumer<String, String> consumer = createConsumer();
consumer.assign(partitions);
consumer.seek(partition, 0);
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(2000));
if (records.isEmpty())
System.out.println("No Records Found");
else
System.out.println("Records Found: " + records.count());
consumer.close();
Thread.sleep(2000);
}
当集群中的一个节点离线时,此代码有时会返回“未找到记录”:-
未找到记录 找到记录:1 未找到记录 找到记录:1 找到记录:1 找到记录:1 未找到记录 找到记录:1 找到记录:1 找到记录:1 找到记录:1 找到记录:1 找到记录:1 否找到记录 找到记录:1 找到记录:1 找到记录:1 找到记录:1 找到记录:1 未找到记录
你会注意到我每次都在 while 循环中创建消费者。这是为了模拟不同的消费者进入和连接,因为每个消费者都有不同的消费者组 ID。将消费者创建移到while 循环之外(并删除consumer.close())会产生大部分预期的结果,即所有日志都显示“找到的记录:1”。但是,“有时”第一次民意调查将不返回任何记录,所有剩余的都显示找到 1 条记录:-
String topic = "test-topic";
TopicPartition partition = new TopicPartition(topic, 0);
List<TopicPartition> partitions = Collections.singletonList(partition);
Consumer<String, String> consumer = createConsumer();
while (true) {
consumer.assign(partitions);
consumer.seek(partition, 0);
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(2000));
if (records.isEmpty())
System.out.println("No Records Found");
else
System.out.println("Records Found: " + records.count());
Thread.sleep(2000);
}
createConsumer 代码定义如下供参考:-
public static Consumer<String, String> createConsumer() {
Properties config = new Properties();
config.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-" + UUID.randomUUID().toString());
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node-1:9092, node-2:9092, node-3:9092");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest" );
config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);
Consumer<String, String> consumer = new KafkaConsumer<String, String>(config);
return consumer;
}
我想了解这种行为,以便能够可靠地运行我的可用性测试。