1

我正在使用 NestJS 和 Kafka 来发布和订阅事件。当消费者抛出异常时,使用 KafkaJS 的自动重试器继续重试超出配置中指定的重试次数时,我遇到了问题。

配置

export const microserviceConfig = (
  brokers: string,
  clientID: string,
  groupID: string
): KafkaOptions => {
  return {
    transport: Transport.KAFKA,
    options: {
      client: {
        clientId: clientID,
        brokers: (process.env.KAFKA_BOOTSTRAP_SERVERS &&
          process.env.KAFKA_BOOTSTRAP_SERVERS.split(',')) || ['localhost:9092'],
        ...(process.env.KAFKA_SSL === 'true' && { ssl: true }),
        ...(process.env.KAFKA_SASL_USERNAME &&
          process.env.KAFKA_SASL_PASSWORD && {
            sasl: {
              mechanism: 'scram-sha-512',
              username: USERNAME,
              password: PASSWORD
            }
          })
      },
      consumer: {
        groupId: groupID,
        allowAutoTopicCreation: true,
        retry: {
          initialRetryTime: 1000,
          retries: 5,
        }
      },
      producer: {
        allowAutoTopicCreation: true,
        createPartitioner: Partitioners.JavaCompatiblePartitioner
      }
    }
  };
};

然后我们根据 NestJS 社区的建议使用了异常过滤。

@Catch()
export class KafkaFilter extends BaseExceptionFilter {
    catch(exception: any, host: ArgumentsHost) {
        console.log(exception);
        super.catch(exception, host);
    }
}

过滤器被击中,但我们有一个计数和一个计时器。控制台日志如下所示:

Error hit: Attempt 1
Error hit: Attempt 2
Error hit: Attempt 3
Error hit: Attempt 4
Error hit: Attempt 5
Error hit: Attempt 6
Error hit: Attempt 7
... 
Error hit: Attempt 30

任何建议或更改将不胜感激!

4

0 回答 0