我有一个 Spring Boot 项目,它有一个 Kafka 侦听器,我想使用 Embedded Kafka 进行测试。我让 Kafka Listener 注销消息“收到记录”。仅当我在方法的开头添加 a 时才会注销Thread.sleep(1000)
。
测试类:
@SpringBootTest
@DirtiesContext
@EnableKafka
@EmbeddedKafka(partitions = 1, topics = { "my-topic" }, ports = 7654)
class KafkaTest {
private static final String TOPIC = "my-topic";
@Autowired
EmbeddedKafkaBroker kafkaBroker;
@Test
void testSendEvent() throws ExecutionException, InterruptedException {
// Thread.sleep(1000); // I wont see the Listener log message unless I add this sleep
Producer<Integer, String> producer = configureProducer();
ProducerRecord<Integer, String> producerRecord = new ProducerRecord<>(TOPIC, "myMessage");
producer.send(producerRecord).get();
producer.close();
}
private Producer<Integer, String> configureProducer() {
Map<String, Object> producerProps = new HashMap<>(KafkaTestUtils.producerProps(kafkaBroker));
return new DefaultKafkaProducerFactory<Integer, String>(producerProps).createProducer();
}
}
我不想使用善变Thread.sleep()
的测试显然是在一些设置过程完成之前执行的。我显然需要等待某些事情,但我不知道该怎么做。
使用:
- 爪哇 11
- 春季启动 2.5.6
- JUnit 5
- spring-kafka-test 2.7.8