2

下面的代码片段工作正常。但有时它不是从 Kafka 的 topic 读取消息。我没有收到任何错误。在 Kafka 方面(我们使用的是 Aiven Managed Kafka),消费者组已与主题相关联,并且消费者脚本运行良好。

我需要您的指导来解决上述问题。

卡夫卡版本 - 2.0.1

节点模块版本 - “node-rdkafka”:“^2.7.0”

代码参考 - https://github.com/Blizzard/node-rdkafka

const Kafka = require('node-rdkafka');
const path = require('path');
console.log(Kafka.features);
const consumer = new Kafka.KafkaConsumer({
      'group.id': 'testxyz',
      'metadata.broker.list': "kafka-production-url",
      'security.protocol': 'SSL',
      'ssl.key.password': "password",
      'ssl.key.location': '/var/www/html/config/service.key',
      'ssl.certificate.location': '/var/www/html/config/service.cert',
      'ssl.ca.location': '/var/www/html/config/ca.pem',
      'socket.keepalive.enable': true,
      'enable.auto.commit': false,
      'rebalance_cb': function(err, assignment) {

    if (err.code === Kafka.CODES.ERRORS.ERR__ASSIGN_PARTITIONS) {
        // Note: this can throw when you are disconnected. Take care and wrap it in
        // a try catch if that matters to you
        this.assign(assignment);
    } else if (err.code == Kafka.CODES.ERRORS.ERR__REVOKE_PARTITIONS){
        // Same as above
        this.unassign();
    } else {
        // We had a real error
        console.error(err);
    }

      },
      'offset_commit_cb': function (err: any, topicPartitions: any) {
        if (err) {
          // There was an error committing
          console.error("error in commit", err);
        } else {
          // Commit went through. Let's log the topic partitions
          console.log("Success in commit", topicPartitions);
        }
      }
    }, 
'auto.offset.reset': 'earliest');

consumer.on('ready', {
    consumer.subscribe("mytopic");
    consumer.consume();
});

// handle the messgae
consumer.on('data', {
    console.log(data);
});

consumer.on('disconnected', {
    console.log("onDisconnected",args);
});
consumer.connect();
4

0 回答 0