1

我正在使用 kafka 高级消费者。当我启动消费者时,它会找到所有新消息。当我使用 Java kafka 生产者生成新消息时,它会发现它们。但是一分钟后,它继续循环,但没有找到新消息。当我在调试器中暂停执行时,消费者突然开始寻找要消费的消息。我在 Java 中使用 0.8.0 版。请注意,在发生错误时,使用消息的进程将在单独的“错误”主题中生成消息。当我停止产生这些错误消息时,我就不再遇到这个问题了。

4

1 回答 1

0

这个问题似乎是我没有看到报告的 kafka 错误。如果您使用同一个 ConsumerConnector 创建多个 ConsumerIterator(通过为其提供多个主题的映射),则主题会每隔一段时间在 ConsumerIterators 中切换。如果您尝试通过在调试器中暂停来查看 consumerIterator,它们会切换回来。

这是我创建具有错误的 ConsumerIterators 的旧代码:

/**
 * @param zookeeperAddresses (includes the port number)
 * @param topics all topics to be consumed.
 * @return A list of ConsumerIterators.
 */
public List<ConsumerIterator> getConsumers(String zookeeperAddresses, List<String> topics) {
    String groupId = "client_" + topics.get(0);
    LOGGER.info("Zookeeper address = " + zookeeperAddresses + ", group id = " + groupId);
    ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
            createConsumerConfig(zookeeperAddresses, groupId));
    consumers.add(consumer);
    Map<String, Integer> topicCountMap = new HashMap<>();
    for (String topic : topics) {
        topicCountMap.put(topic, Integer.valueOf(1));
    }
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
    List<ConsumerIterator> topicConsumers = new LinkedList<>();
    for (String topic : topics) {
        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
        assert(streams.size() == 1);
        ConsumerIterator<byte[], byte[]> consumerIterator = streams.get(0).iterator();
        topicConsumers.add(consumerIterator);
    }
    return topicConsumers;
}

这是解决此错误的固定代码:

/**
 * @param zookeeperAddresses (includes the port number)
 * @param topics all topics to be consumed.
 * @return A list of ConsumerIterators.
 */
public List<ConsumerIterator> getConsumers(String zookeeperAddresses, List<String> topics) {
    String groupId = "client_" + topics.get(0);
    LOGGER.info("Zookeeper address = " + zookeeperAddresses + ", group id = " + groupId);
    List<ConsumerIterator> topicConsumers = new LinkedList<>();
    for (String topic : topics) {
        ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
                createConsumerConfig(zookeeperAddresses, groupId));
        consumers.add(consumer);
        Map<String, Integer> topicCountMap = new HashMap<>();
        topicCountMap.put(topic, Integer.valueOf(1));
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
        assert(streams.size() == 1);
        ConsumerIterator<byte[], byte[]> consumerIterator = streams.get(0).iterator();
        topicConsumers.add(consumerIterator);
    }
    return topicConsumers;
}
于 2014-12-10T11:47:16.467 回答