如果您想使用注释指定 Kafka 侦听器,则它们只是“设计时”。Spring-kafka 也允许您动态创建它们,请参阅KafkaMessageListenerContainer。
动态创建的 Kafka 侦听器的最简单示例是:
Map<String, Object> consumerConfig = ImmutableMap.of(
BOOTSTRAP_SERVERS_CONFIG, "brokerAddress",
GROUP_ID_CONFIG, "groupId"
);
DefaultKafkaConsumerFactory<String, String> kafkaConsumerFactory =
new DefaultKafkaConsumerFactory<>(
consumerConfig,
new StringDeserializer(),
new StringDeserializer());
ContainerProperties containerProperties = new ContainerProperties("topicName");
containerProperties.setMessageListener((MessageListener<String, String>) record -> {
//do something with received record
}
ConcurrentMessageListenerContainer container =
new ConcurrentMessageListenerContainer<>(
kafkaConsumerFactory,
containerProperties);
container.start();
有关更多解释和代码,请参阅此博客文章: http: //www.douevencode.com/articles/2017-12/spring-kafka-without-annotations/