我正在使用带有 kafka binder 的 spring-cloud-stream 来使用来自 kafka 的消息。该应用程序基本上是使用来自 kafka 的消息并更新数据库。
在某些情况下,DB 出现故障(可能会持续数小时)或其他一些临时技术问题。由于在这些情况下,在有限的时间内重试消息然后将其移动到 DLQ 是没有意义的,所以当我们遇到某些类型的异常(例如 DBHostNotAvaialableException)时,我试图实现无限次数的重试
为了实现这一点,我尝试了两种方法(两种方法都面临问题) -
在这种方法中,尝试在配置 ConcurrentKafkaListenerContainerFactory bean 时在容器属性上设置错误处理程序,但错误处理程序根本没有被触发。在调试流程时,我在创建的 KafkaMessageListenerContainer 中实现了 errorHandler 字段,因此它们使用默认的 LoggingErrorHandler。下面是我的容器工厂 bean 配置 - 这种方法的 @StreamListener 方法与第二种方法相同,除了对消费者的搜索。
@Bean public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory(ConsumerFactory<String, Object> kafkaConsumerFactory) { ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory(); factory.setConsumerFactory(kafkaConsumerFactory); factory.getContainerProperties().setAckOnError(false); ContainerProperties containerProperties = factory.getContainerProperties(); // even tried a custom implementation of RemainingRecordsErrorHandler but call never went in to the implementation factory.getContainerProperties().setErrorHandler(new SeekToCurrentErrorHandler()); return factory; }
我在配置工厂 bean 时是否遗漏了一些东西,或者这个 bean 仅与 @KafkaListener 而不是 @StreamListener 相关?
第二种选择是尝试使用手动确认和查找来实现它,在 @StreamListener 方法中从标头获取确认和使用者,如果收到可重试的异常,我会使用 retrytemplate 进行一定次数的重试,当这些重试用尽时我会触发
consumer.seek()
. 下面的示例代码 -@StreamListener(MySink.INPUT) public void processInput(Message<String> msg) { MessageHeaders msgHeaders = msg.getHeaders(); Acknowledgment ack = msgHeaders.get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class); Consumer<?,?> consumer = msgHeaders.get(KafkaHeaders.CONSUMER, Consumer.class); Integer partition = msgHeaders.get(KafkaHeaders.RECEIVED_PARTITION_ID, Integer.class); String topicName = msgHeaders.get(KafkaHeaders.RECEIVED_TOPIC, String.class); Long offset = msgHeaders.get(KafkaHeaders.OFFSET, Long.class); try { retryTemplate.execute( context -> { // this is a sample service call to update database which might throw retryable exceptions like DBHostNotAvaialableException consumeMessage(msg.getPayload()); return null; } ); } catch (DBHostNotAvaialableException ex) { // once retries as per retrytemplate are exhausted do a seek consumer.seek(new TopicPartition(topicName, partition), offset); } catch (Exception ex) { // if some other exception just log and put in dlq based on enableDlq property logger.warn("some other business exception hence putting in dlq "); throw ex; } if (ack != null) { ack.acknowledge(); }
}
这种方法的问题- 因为我正在执行 consumer.seek() 而可能有来自上次轮询的待处理记录,如果 DB 在此期间出现(因此出现故障),这些记录可能会被处理和提交。有没有办法在执行搜索时清除这些记录?
PS - 我们目前处于 2.0.3.RELEASE 版本的 spring boot 和 Finchley.RELEASE 或 spring 云依赖项(因此也不能使用否定确认等功能,目前无法升级)。