4

我很陌生Kafka。我试图弄清楚并理解错误场景将如何工作@Listener batch consumer factory

我在做什么 ...

我正在使用进程中的记录topic并将batch它们插入DB如下...

@KafkaListener( topics = "KAFKA.TEST")
public Boolean listen(List<ConsumerRecord<String, User>> list)  throws Exception {
    Boolean result = null;
    List<User> userList = new ArrayList<>();
    for (ConsumerRecord<String, User> record : list) {
        User user = record.value();
        userList.add(user);
    }
    if(userList.size()>0) {
        result = dbService.insertBatchUser(userList);
        LOGGER.info(" users inserted " + userList.size());
    }
    else
        LOGGER.info(" No users found in the topic ");

    countDownLatch.countDown();
    return result;
}

我的问题

  1. 如果由于 DB 不可用而导致任何批处理未能插入 DB 时如何重试
  2. 如何测试 Kafka 服务器是否正在运行并能够连接到特定主题 - 为什么我问这个问题是我在本地Kafka Listener停止zookeeperKafka服务器后尝试但没有错误或异常。Kafka Producer我的意思是在停止但没有发现Template错误后发送消息抛出错误Kafka ServerListener

添加

我的配置

@Bean
public ConsumerFactory consumerFactory(){
    return new DefaultKafkaConsumerFactory(consumerConfigs(),stringKeyDeserializer(),jsonValueDeserializer());
}
@Bean
public RetryPolicy getRetryPolicy(){
    SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy();
    simpleRetryPolicy.setMaxAttempts(getMaxRetryAttempts());
    return simpleRetryPolicy;
}

@Bean
public FixedBackOffPolicy getBackOffPolicy() {
    FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
    backOffPolicy.setBackOffPeriod(getRetryInterval());
    return backOffPolicy;
}

@Bean
public RetryTemplate getRetryTemplate(){
    RetryTemplate retryTemplate = new RetryTemplate();
    retryTemplate.setRetryPolicy(getRetryPolicy());
    retryTemplate.setBackOffPolicy(getBackOffPolicy());
    return retryTemplate;
}

@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(){
    ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
    factory.setConcurrency(getConcurrency());
    factory.getContainerProperties().setPollTimeout(getPollTimeout());
    factory.setConsumerFactory(consumerFactory());
    factory.setBatchListener(true);
    factory.setRetryTemplate(getRetryTemplate());
    return factory;
}

我在这里使用org.springframework.retry.support.RetryTemplate

我得到的例外

java.lang.ClassCastException: org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter cannot be cast to org.springframework.kafka.listener.MessageListener
at org.springframework.kafka.config.AbstractKafkaListenerEndpoint.setupMessageListener(AbstractKafkaListenerEndpoint.java:306) ~[spring-kafka-1.1.2.RELEASE.jar:na]
at org.springframework.kafka.config.AbstractKafkaListenerEndpoint.setupListenerContainer(AbstractKafkaListenerEndpoint.java:282) ~[spring-kafka-1.1.2.RELEASE.jar:na]
at org.springframework.kafka.config.AbstractKafkaListenerContainerFactory.createListenerContainer(AbstractKafkaListenerContainerFactory.java:211) ~[spring-kafka-1.1.2.RELEASE.jar:na]
4

1 回答 1

2

请参阅重试交付

使用 @KafkaListener 时,在容器工厂上设置 RetryTemplate(以及可选的 recoveryCallback),侦听器将被包装在适当的重试适配器中。

新的 Kafka 客户端(0.9.xx 或 0.10.xx)不直接与 zookeeper 对话,只是与 kafka 服务器本身对话。

客户端内部不断尝试重新连接;打开调试日志以查看活动。

于 2017-02-02T14:08:55.943 回答