0

我想将嵌入式 Kafka 用于 Spring Boot 应用程序。我可以使用嵌入式 Kafka 进行 Junit 测试,但是在尝试在主应用程序中使用时,嵌入式 Kafka 对象没有被识别。

尝试加载 Spring Boot 应用程序时,嵌入式 kafka 对象未自动装配。这适用于非测试流程。

@SpringBootApplication
@DirtiesContext
@EmbeddedKafka(topics = "TEST_TOPIC.P2.R2", partitions = 1, controlledShutdown = false, brokerProperties = {
        "listeners=PLAINTEXT://localhost:9092", "port=9092" })
public class MockKafkaProducerApplication {

    public static void main(String[] args) throws Exception {
        System.out.println("Starting Spring boot Application");
        SpringApplication.run(MockKafkaProducerApplication.class, args);

    }

}


@ActiveProfiles("kafka_test")
@Configuration
public class KafkaConsumerTestBase {
    private Logger LOGGER = LoggerFactory.getLogger(KafkaConsumerTestBase.class);

    @Autowired
    protected EmbeddedKafkaBroker embeddedKafka;


    @Value("${spring.embedded.kafka.brokers}")
    private String brokerAddress;

    @Autowired
    protected KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

    @Autowired
    protected KafkaTemplate<String, String> senderTemplate;

....... ........ }

com.dell.pde.kafka.KafkaConsumerTestBase 中的字段 embeddedKafka 需要找不到类型为“org.springframework.kafka.test.EmbeddedKafkaBroker”的 bean。

注入点有以下注解: - @org.springframework.beans.factory.annotation.Autowired(required=true)

4

2 回答 2

2

嵌入式 Kafka 用于测试,而不是实际应用程序。

可以在运行基于 Spring Kafka 的测试的测试类上指定的注释。在常规 Spring TestContext Framework 之外提供以下功能:

...

一个库,提供内存中的 Kafka 实例来运行您的测试。

如果您想制作一个实际的模型应用程序,您还必须运行一个实际的 Kafka 实例。

于 2020-01-16T09:55:06.477 回答
1

@interface EmbeddedKafka用于测试目的。如果您检查 public class EmbeddedKafkaCondition,您可以看到弹簧测试如何运行它:

public ConditionEvaluationResult evaluateExecutionCondition(ExtensionContext context) {
    Optional<AnnotatedElement> element = context.getElement();
    if (element.isPresent() && !this.springTestContext((AnnotatedElement)element.get())) {
        EmbeddedKafka embedded = (EmbeddedKafka)AnnotatedElementUtils.findMergedAnnotation((AnnotatedElement)element.get(), EmbeddedKafka.class);
        if (embedded != null) {
            EmbeddedKafkaBroker broker = this.getBrokerFromStore(context);
            if (broker == null) {
                broker = this.createBroker(embedded);
                BROKERS.set(broker);
                this.getStore(context).put("embedded-kafka", broker);
            }
        }
    }

    return ConditionEvaluationResult.enabled("");
}

private boolean springTestContext(AnnotatedElement annotatedElement) {
    return AnnotatedElementUtils.findAllMergedAnnotations(annotatedElement, ExtendWith.class).stream().filter((extended) -> {
        return Arrays.asList(extended.value()).contains(SpringExtension.class);
    }).findFirst().isPresent();
}

尝试覆盖此类以在您的应用程序上运行它。

我建议你直接使用 docker 来提升 kafka 图像

于 2020-01-16T10:26:30.427 回答