我正在使用 NestJS 和 Kafka 来发布和订阅事件。当消费者抛出异常时,使用 KafkaJS 的自动重试器继续重试超出配置中指定的重试次数时,我遇到了问题。
配置
export const microserviceConfig = (
brokers: string,
clientID: string,
groupID: string
): KafkaOptions => {
return {
transport: Transport.KAFKA,
options: {
client: {
clientId: clientID,
brokers: (process.env.KAFKA_BOOTSTRAP_SERVERS &&
process.env.KAFKA_BOOTSTRAP_SERVERS.split(',')) || ['localhost:9092'],
...(process.env.KAFKA_SSL === 'true' && { ssl: true }),
...(process.env.KAFKA_SASL_USERNAME &&
process.env.KAFKA_SASL_PASSWORD && {
sasl: {
mechanism: 'scram-sha-512',
username: USERNAME,
password: PASSWORD
}
})
},
consumer: {
groupId: groupID,
allowAutoTopicCreation: true,
retry: {
initialRetryTime: 1000,
retries: 5,
}
},
producer: {
allowAutoTopicCreation: true,
createPartitioner: Partitioners.JavaCompatiblePartitioner
}
}
};
};
然后我们根据 NestJS 社区的建议使用了异常过滤。
@Catch()
export class KafkaFilter extends BaseExceptionFilter {
catch(exception: any, host: ArgumentsHost) {
console.log(exception);
super.catch(exception, host);
}
}
过滤器被击中,但我们有一个计数和一个计时器。控制台日志如下所示:
Error hit: Attempt 1
Error hit: Attempt 2
Error hit: Attempt 3
Error hit: Attempt 4
Error hit: Attempt 5
Error hit: Attempt 6
Error hit: Attempt 7
...
Error hit: Attempt 30
任何建议或更改将不胜感激!