0

我尝试使用 cqrs 实现应用程序,并使用 axon 框架实现事件源。我将命令端和查询部分实现为单独的微服务并复制(扩展)查询微服务。我使用消息代理作为 RabbitMq。如果命令部分发布事件不更新所有查询微服务。它以循环方式工作。如何同时更新所有微服务。

这是我的依赖文件

<dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <dependency>
        <groupId>org.axonframework</groupId>
        <artifactId>axon-amqp</artifactId>
        <version>${axon.version}</version>
    </dependency>
    <dependency>
        <groupId>org.axonframework</groupId>
        <artifactId>axon-spring-boot-starter</artifactId>
        <version>${axon.version}</version>
    </dependency>

这是我在命令端的配置

    @Bean
    public Exchange exchange() {
        return ExchangeBuilder.fanoutExchange("SeatReserveEvents").build();
    }

    @Bean
    public Queue queue() {
        return QueueBuilder.durable("SeatReserveEvents").build();
    }

    @Bean
    public Binding binding() {
        return BindingBuilder.bind(queue()).to(exchange()).with("*").noargs();
    }

    @Autowired
    public void configure(AmqpAdmin admin) {
        admin.declareExchange(exchange());
        admin.declareQueue(queue());
        admin.declareBinding(binding());
    }

这是 application.yml

axon:
  amqp:
    exchange: SeatReserveEvents 

这是命令端配置

    @Bean
public SpringAMQPMessageSource statisticsQueue(Serializer serializer) {
    return new SpringAMQPMessageSource(new DefaultAMQPMessageConverter(serializer)) {
        @RabbitListener(queues = "SeatReserveEvents")
        @Override
        public void onMessage(Message arg0, Channel arg1) throws Exception {
            super.onMessage(arg0, arg1);
        }

    };
}

这是处理程序

@Component
@ProcessingGroup("statistics")
public class EventLoggingHandler
{
    @EventHandler
    protected void on(SeatResurvationCreateEvent event) {
        System.err.println(event);
    }

    @EventHandler
    protected void on(SeatReservationUpdateEvent event) {
        System.err.println(event);
    }
}

这是 application.yml

axon:
  eventhandling:
    processors:
      statistics.source: statisticsQueue
4

2 回答 2

2

我想说这更像是一个 AMQP/RabbitMQ 配置设置,而不是 Axon Framework 特定的问题。也就是说,您希望将 RabbitMQ 设置为不执行循环,而是执行 Pub/Sub,如教程中所述。

然而,我确实有另一个更具体的 Axon Framework 响应。如果您也可以直接从存储中提取事件,为什么要立即将您的事件发布到队列中?因此,您将TrackingEventProcessors在应用程序的查询端拥有,当它们被应用程序的命令端附加时,它们会从事件存储中提取事件。

这就是包含 CQRS 的 Axon 框架应用程序的单体版本最初看起来像任何方式。因此,在命令和查询端拆分 CQRS 应用程序的最简单的下一步是保留接收事件的方式,而不在其间添加队列。但是,如果您有通过队列发布的特定要求,或者您只是更喜欢使用队列而不是让查询应用程序直接从事件存储中提取,请忽略此评论并返回到 RabbitMQ 教程。

于 2018-01-09T10:03:01.157 回答
0

我们需要更改 RabbitMq 配置以从命令端轴突发布事件以获取更多实例。为此,我们必须在发布者端更改配置,如下所示。

@Bean
public FanoutExchange fanoutExchange() {
    FanoutExchange exchange = new FanoutExchange("SeatReserveEvents");
    return exchange;
}

@Autowired
public void configure(AmqpAdmin admin) {
    admin.declareExchange(fanoutExchange());

}

接下来是订阅方,我们必须像下面这样更改 bean

@Bean
public SpringAMQPMessageSource statisticsQueue(Serializer serializer) {
    return new SpringAMQPMessageSource(new DefaultAMQPMessageConverter(serializer)) {

        @RabbitListener(bindings = @QueueBinding(
                value = @Queue,
                exchange = @Exchange(value ="SeatReserveEvents",type = ExchangeTypes.FANOUT),
                key = "orderRoutingKey")
          )
        @Override
        public void onMessage(Message arg0, Channel arg1) throws Exception {
            super.onMessage(arg0, arg1);
        }
    };
}

现在我们可以复制消费者的更多实例。这种模式是发布者/订阅者模式。交换类型为扇出

于 2018-01-14T18:35:27.917 回答