0

我有一个 Spring Boot 项目,它有一个 Kafka 侦听器,我想使用 Embedded Kafka 进行测试。我让 Kafka Listener 注销消息“收到记录”。仅当我在方法的开头添加 a 时才会注销Thread.sleep(1000)

测试类:

@SpringBootTest
@DirtiesContext
@EnableKafka
@EmbeddedKafka(partitions = 1, topics = { "my-topic" }, ports = 7654)
class KafkaTest {

    private static final String TOPIC = "my-topic";

    @Autowired
    EmbeddedKafkaBroker kafkaBroker;

    @Test
    void testSendEvent() throws ExecutionException, InterruptedException {
        // Thread.sleep(1000); // I wont see the Listener log message unless I add this sleep
        Producer<Integer, String> producer = configureProducer();
        ProducerRecord<Integer, String> producerRecord = new ProducerRecord<>(TOPIC, "myMessage");
        producer.send(producerRecord).get();
        producer.close();
    }

    private Producer<Integer, String> configureProducer() {
        Map<String, Object> producerProps = new HashMap<>(KafkaTestUtils.producerProps(kafkaBroker));
        return new DefaultKafkaProducerFactory<Integer, String>(producerProps).createProducer();
    }
}

我不想使用善变Thread.sleep()的测试显然是在一些设置过程完成之前执行的。我显然需要等待某些事情,但我不知道该怎么做。

使用:

  • 爪哇 11
  • 春季启动 2.5.6
  • JUnit 5
  • spring-kafka-test 2.7.8
4

2 回答 2

1

将 bean 添加到测试上下文并(例如)在收到a时@EventListener倒计时 a ;然后在测试中CountDownLatchConsumerStartedEvent

assertThat(eventListner.getLatch().await(10, TimeUnit.SECONDS)).isTrue();

https://docs.spring.io/spring-kafka/docs/current/reference/html/#events

https://docs.spring.io/spring-kafka/docs/current/reference/html/#event-consumption

或者添加一个ConsumerRebalanceListener并等待分区分配。

于 2021-11-10T14:11:18.467 回答
0

我显然需要等待某些事情,但我不知道该怎么做。

您需要使用不同的方法来留出Kafka时间来处理和路由消息...

看这条线...

ConsumerRecord<String, String> message = records.poll(500, TimeUnit.MILLISECONDS);

在测试 Kafka 侦听器时,我们总是指定轮询延迟。这是因为您的消息被提供给 kafka,然后它将在另一个线程中处理它。你需要等待它。

这是它在使用的代码上下文中的外观。

class UserKafkaProducerTest {
  @Test
  void testWriteToKafka() throws InterruptedException, JsonProcessingException {
      // Create a user and write to Kafka
      User user = new User("11111", "John", "Wick");
      producer.writeToKafka(user);

      // Read the message (John Wick user) with a test consumer from Kafka and assert its properties
      ConsumerRecord<String, String> message = records.poll(500, TimeUnit.MILLISECONDS);
      assertNotNull(message);
      assertEquals("11111", message.key());
      User result = objectMapper.readValue(message.value(), User.class);
      assertNotNull(result);
      assertEquals("John", result.getFirstName());
      assertEquals("Wick", result.getLastName());
  }
}

这是本文中的一段代码,它使事情变得清晰。

于 2021-11-10T12:05:48.660 回答