我在 Spring Webflux 应用程序中使用 Spring Cloud Stream Binder Kafka 3.0.0,该应用程序公开了一个 API,该 API 接收一些数据并将其发布到 Kafka 主题,使用@Output
:
@Autowired
private lateinit var producer: Producer
@PostMapping
@ResponseStatus(CREATED)
fun create(@RequestBody metrics: SomeMetric): Mono<Void> {
producer.send(metrics)
return Mono.empty()
}
@Component
class Producer(private val producerSources: ProducerSources) {
@Transactional
fun send(metrics: SomeMetric) {
producerSources.metrics().send(MessageBuilder.withPayload(metrics) .build())
}
}
interface ProducerSources {
@Output("metrics")
fun metrics(): MessageChannel
//Other...
}
我已经将 Kafka 和 Spring Boot 应用程序都配置为使用事务生产者(请注意@Transactional
上面发送方法上的注释):
@Configuration
class KafkaProducerConfiguration {
@Bean
fun transactionManager(binders: BinderFactory): PlatformTransactionManager {
val pf = (binders.getBinder("kafka",
MessageChannel::class.java) as KafkaMessageChannelBinder).transactionalProducerFactory
return KafkaTransactionManager<ByteArray, ByteArray>(pf)
}
@EnableTransactionManagement
@EnableBinding(value = [ProducerSources::class])
@SpringBootApplication
class MyApplication
fun main(args: Array<String>) {
runApplication<MyApplication>(*args)
}
关键是,作为一个 Spring Webflux 应用程序,我不应该阻塞 http 线程,所以我应该将(阻塞)生产者包装在 fromCallable 块上并在另一个线程池上执行它,例如:
@Component
class Producer(private val producerSources: ProducerSources) {
@Transactional
fun send(metrics: SomeMetric) : Mono<Unit> {
Mono.fromCallable {
producerSources.metrics().send(MessageBuilder.withPayload(metrics) .build())
}.subscribeOn(Schedulers.elastic())
}
}
@PostMapping
@ResponseStatus(CREATED)
fun create(@RequestBody metrics: SomeMetric): Mono<Void> {
return producer.send(metrics)
}
我的问题是:
@Transactional
注释是否仍然适用于这种方法?估计不应该...- 在响应式 Spring Webflux + Cloud Stream Kafka 上下文中支持事务性的推荐方法是什么?
- 奖励:Spring Cloud Stream 是否支持Reactor Kafka ?如果是这样,我们如何在这种情况下配置它
@Output
,事务支持......?