0

在启动应用程序后第一次调用在 Kafka 上发送消息后阻止创建一些 beanAbstractKafkaProducerConfiguration

  io.micronaut.context.DefaultBeanContext.getBeanInternal(BeanResolutionContext, Class, Qualifier, boolean, boolean) DefaultBeanContext.java:2289
  io.micronaut.context.DefaultBeanContext.getBean(Class) DefaultBeanContext.java:733
  io.micronaut.configuration.kafka.intercept.KafkaClientIntroductionAdvice.lambda$getProducer$13(String, AnnotationMetadata, Argument, Argument, KafkaClientIntroductionAdvice$ProducerKey) KafkaClientIntroductionAdvice.java:586
  io.micronaut.configuration.kafka.intercept.KafkaClientIntroductionAdvice$$Lambda$956.apply(Object)
  java.util.concurrent.ConcurrentHashMap.computeIfAbsent(Object, Function) ConcurrentHashMap.java:1705
  io.micronaut.configuration.kafka.intercept.KafkaClientIntroductionAdvice.getProducer(Argument, Argument, AnnotationMetadata) KafkaClientIntroductionAdvice.java:575
  io.micronaut.configuration.kafka.intercept.KafkaClientIntroductionAdvice.intercept(MethodInvocationContext) KafkaClientIntroductionAdvice.java:227
  io.micronaut.aop.chain.MethodInterceptorChain.proceed() MethodInterceptorChain.java:82
  io.micronaut.retry.intercept.RecoveryInterceptor.intercept(MethodInvocationContext) RecoveryInterceptor.java:92
  io.micronaut.aop.chain.MethodInterceptorChain.proceed() MethodInterceptorChain.java:82

任何想法如何处理。应用程序永远阻塞。

问候奥利弗

4

1 回答 1

0

找到了原因。我正在使用消费者线程池收听rabbitmq消息并将这些内容写入kafka主题。由于某种原因,最终会出现竞争条件。

在为 rabbitmq 使用另一个专用线程池后,我将其称为“消息传递”,一切正常。

于 2020-12-15T19:31:17.240 回答