0

在开始使用 Spring Cloud Stream 之前,我使用的是 Spring-Kafka 及其对批量消费和自定义错误处理的支持。注意这段代码的最后两行:

        ConcurrentKafkaListenerContainerFactory<String, byte[]> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConcurrency(this.consumerConfigurationProperties.getConsumer().get(TOPIC_AG_TASK_EMPP).getConcurrency());
    factory.setConsumerFactory(consumerFactory);
    factory.setMessageConverter(avroMessageConverter);
    factory.getContainerProperties().setPollTimeout(this.consumerConfigurationProperties.getConsumer().get(TOPIC_AG_TASK_EMPP).getPollTimeout());
    factory.getContainerProperties().setPauseEnabled(true);
    factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.BATCH);
    factory.getContainerProperties().setErrorHandler(dlqAGTaskErrorHandler);

但是,使用 Spring Cloud Stream 我找不到如何配置它。我只能找到这些配置属性:

spring.cloud.stream.kafka.bindings.input.consumer.autoCommitOffset,enableDlq

因此,是否可以(是否)可以在 Spring Cloud Stream 中注册自定义错误处理程序并将 AckMode 设置为 BATCH?

谢谢你的支持。

4

1 回答 1

0

目前我们在 Spring Cloud Stream 级别还不支持这些选项。实施后,以下问题应提供等效选项(可能尽快为 Chelsea.RC1):

https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/70

https://github.com/spring-cloud/spring-cloud-stream/issues/538

于 2017-03-30T02:37:17.087 回答