2

我正在试验一个 Kafka 集群(3 个节点),我打算使用以下 kafka 客户端依赖项使用一个简单的 java 应用程序围绕冗余和可用性(停止集群中的节点等)运行一些测试:-

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.8.0</version>
    </dependency>

我将复制因子配置为 3 以确保跨所有节点复制主题,并且我只为主题使用 1 个分区。我正在努力理解我在此示例代码中看到的一些行为,特别是在寻求偏移量时(一个节点离线):-

    String topic = "test-topic";
    TopicPartition partition = new TopicPartition(topic, 0);
    List<TopicPartition> partitions = Collections.singletonList(partition);
    
    while (true) {                  

        Consumer<String, String> consumer = createConsumer();           
        consumer.assign(partitions);
        consumer.seek(partition, 0);        
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(2000));
        
        if (records.isEmpty())
            System.out.println("No Records Found");
        else
            System.out.println("Records Found: " + records.count());
        
        consumer.close();   
        Thread.sleep(2000);
    }   

当集群中的一个节点离线时,此代码有时会返回“未找到记录”:-

未找到记录 找到记录:1 未找到记录 找到记录:1 找到记录:1 找到记录:1 未找到记录 找到记录:1 找到记录:1 找到记录:1 找到记录:1 找到记录:1 找到记录:1 否找到记录 找到记录:1 找到记录:1 找到记录:1 找到记录:1 找到记录:1 未找到记录

你会注意到我每次都在 while 循环中创建消费者。这是为了模拟不同的消费者进入和连接,因为每个消费者都有不同的消费者组 ID。将消费者创建移到while 循环之外(并删除co​​nsumer.close())会产生大部分预期的结果,即所有日志都显示“找到的记录:1”。但是,“有时”第一次民意调查将不返回任何记录,所有剩余的都显示找到 1 条记录:-

    String topic = "test-topic";
    TopicPartition partition = new TopicPartition(topic, 0);
    List<TopicPartition> partitions = Collections.singletonList(partition);
    Consumer<String, String> consumer = createConsumer();       
    
    while (true) {                  
        
        consumer.assign(partitions);
        consumer.seek(partition, 0);        
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(2000));
        
        if (records.isEmpty())
            System.out.println("No Records Found");
        else
            System.out.println("Records Found: " + records.count());
                
        Thread.sleep(2000);
    }   

createConsumer 代码定义如下供参考:-

public static Consumer<String, String> createConsumer() {
    
    Properties config = new Properties();
    config.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-" + UUID.randomUUID().toString());
    config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node-1:9092, node-2:9092, node-3:9092");
    config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
    config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest" );
    config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);
    Consumer<String, String> consumer = new KafkaConsumer<String, String>(config);
    return consumer;
}

我想了解这种行为,以便能够可靠地运行我的可用性测试。

4

1 回答 1

0

我也被这个问题困住了,最后像这样解决了:

consumer.assign(partitions);
consumer.poll(0); // important! wait metadata update, and not consume message
consumer.seek(partition, 0);        

然而,poll(long)@Deprecated这篇文章提供了一些信息。

于 2021-12-30T02:44:59.570 回答