3

我对 kafka binder 进行了一些测试,似乎 spring-cloud-stream 生产者不参与 spring 管理的事务。

给定代码

@RequestMapping(method = RequestMethod.POST)
    @Transactional
    public Customer insertCustomer(@RequestBody Customer customer) {
        customerDao.insertCustomer(customer);
        source.output().send(MessageBuilder.withPayload(CustomerEventHelper.createSaveEvent(customer)).build());
        if (true) {
            throw new RuntimeException("rollback test");
        }
        return customer;
    }

customerDao.insertCustomer 调用被回滚,但仍然发送了 kafka 消息。如果我有客户事件的消费者将客户插入数据仓库,则数据仓库和记录系统将在事务回滚时不同步。有没有办法让卡夫卡活页夹在这里交易?

4

1 回答 1

4

Kafka binder 不是事务性的,Kafka 一般不支持事务。

我们确实打算解决 Spring Cloud Stream 1.1 的事务管理:https ://github.com/spring-cloud/spring-cloud-stream/issues/536 。

但是,您现在甚至可以仅在成功提交后通过注册事务同步来发送消息,如下所示:

TransactionSynchronizationManager.registerSynchronization(
   new TransactionSynchronization(){
       void afterCommit(){                     
           source.output().send(MessageBuilder.withPayload(event).build());
    if (true) {

       }
});

http://docs.spring.io/spring/docs/current/javadoc-api/org/springframework/transaction/support/TransactionSynchronization.html

于 2016-06-10T21:43:40.680 回答