我尝试使用 cqrs 实现应用程序,并使用 axon 框架实现事件源。我将命令端和查询部分实现为单独的微服务并复制(扩展)查询微服务。我使用消息代理作为 RabbitMq。如果命令部分发布事件不更新所有查询微服务。它以循环方式工作。如何同时更新所有微服务。
这是我的依赖文件
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.axonframework</groupId>
<artifactId>axon-amqp</artifactId>
<version>${axon.version}</version>
</dependency>
<dependency>
<groupId>org.axonframework</groupId>
<artifactId>axon-spring-boot-starter</artifactId>
<version>${axon.version}</version>
</dependency>
这是我在命令端的配置
@Bean
public Exchange exchange() {
return ExchangeBuilder.fanoutExchange("SeatReserveEvents").build();
}
@Bean
public Queue queue() {
return QueueBuilder.durable("SeatReserveEvents").build();
}
@Bean
public Binding binding() {
return BindingBuilder.bind(queue()).to(exchange()).with("*").noargs();
}
@Autowired
public void configure(AmqpAdmin admin) {
admin.declareExchange(exchange());
admin.declareQueue(queue());
admin.declareBinding(binding());
}
这是 application.yml
axon:
amqp:
exchange: SeatReserveEvents
这是命令端配置
@Bean
public SpringAMQPMessageSource statisticsQueue(Serializer serializer) {
return new SpringAMQPMessageSource(new DefaultAMQPMessageConverter(serializer)) {
@RabbitListener(queues = "SeatReserveEvents")
@Override
public void onMessage(Message arg0, Channel arg1) throws Exception {
super.onMessage(arg0, arg1);
}
};
}
这是处理程序
@Component
@ProcessingGroup("statistics")
public class EventLoggingHandler
{
@EventHandler
protected void on(SeatResurvationCreateEvent event) {
System.err.println(event);
}
@EventHandler
protected void on(SeatReservationUpdateEvent event) {
System.err.println(event);
}
}
这是 application.yml
axon:
eventhandling:
processors:
statistics.source: statisticsQueue