4

我在 Spring Webflux 应用程序中使用 Spring Cloud Stream Binder Kafka 3.0.0,该应用程序公开了一个 API,该 API 接收一些数据并将其发布到 Kafka 主题,使用@Output

    @Autowired
    private lateinit var producer: Producer

    @PostMapping
    @ResponseStatus(CREATED)
    fun create(@RequestBody metrics: SomeMetric): Mono<Void> {
        producer.send(metrics)
        return Mono.empty()
    }
@Component
class Producer(private val producerSources: ProducerSources) {

    @Transactional
    fun send(metrics: SomeMetric) {
        producerSources.metrics().send(MessageBuilder.withPayload(metrics)                       .build())
    }            
}
interface ProducerSources {

    @Output("metrics")
    fun metrics(): MessageChannel

    //Other...

}

我已经将 Kafka 和 Spring Boot 应用程序都配置为使用事务生产者(请注意@Transactional上面发送方法上的注释):

@Configuration
class KafkaProducerConfiguration {

    @Bean
    fun transactionManager(binders: BinderFactory): PlatformTransactionManager {
        val pf = (binders.getBinder("kafka",
                MessageChannel::class.java) as KafkaMessageChannelBinder).transactionalProducerFactory
        return KafkaTransactionManager<ByteArray, ByteArray>(pf)
    }
@EnableTransactionManagement
@EnableBinding(value = [ProducerSources::class])
@SpringBootApplication
class MyApplication

fun main(args: Array<String>) {
    runApplication<MyApplication>(*args)
}

关键是,作为一个 Spring Webflux 应用程序,我不应该阻塞 http 线程,所以我应该将(阻塞)生产者包装在 fromCallable 块上并在另一个线程池上执行它,例如:

@Component
class Producer(private val producerSources: ProducerSources) {

    @Transactional
    fun send(metrics: SomeMetric) : Mono<Unit> {
        Mono.fromCallable {
            producerSources.metrics().send(MessageBuilder.withPayload(metrics)                       .build())           
       }.subscribeOn(Schedulers.elastic())
    }

}
    @PostMapping
    @ResponseStatus(CREATED)
    fun create(@RequestBody metrics: SomeMetric): Mono<Void> {
        return producer.send(metrics)
    }

我的问题是:

  • @Transactional注释是否仍然适用于这种方法?估计不应该...
  • 在响应式 Spring Webflux + Cloud Stream Kafka 上下文中支持事务性的推荐方法是什么?
  • 奖励:Spring Cloud Stream 是否支持Reactor Kafka ?如果是这样,我们如何在这种情况下配置它@Output,事务支持......?
4

0 回答 0