0

我已经使用 axon 和 spring-boot 实现了 CQRS+ES 应用程序。我使用单独的查询模型和命令模型应用程序。我使用 rabbitmq 从命令模式发布事件。它工作正常。但是跟踪处理器实现在我的应用程序中不起作用。

这是我的查询模型

@SpringBootApplication
public class SeatQueryPart1Application {
    public static void main(String[] args) {
        SpringApplication.run(SeatQueryPart1Application.class, args);
    }

    @Bean
    public SpringAMQPMessageSource statisticsQueue(Serializer serializer) {
        return new SpringAMQPMessageSource(new DefaultAMQPMessageConverter(serializer)) {
            @RabbitListener(exclusive = false, bindings = @QueueBinding(value = @Queue, exchange = @Exchange(value = "ExchangeTypesTests.FanoutExchange", type = ExchangeTypes.FANOUT), key = "orderRoutingKey"))
            @Override
            public void onMessage(Message arg0, Channel arg1) throws Exception {
                super.onMessage(arg0, arg1);
            }
        };
    }

    @Autowired
    public void conf(EventHandlingConfiguration configuration) {
        configuration.registerTrackingProcessor("statistics");
    }

}

这是一个事件处理程序类

@ProcessingGroup("statistics")
@Component
public class EventLoggingHandler {


    private SeatReservationRepository seatReservationRepo;
    public EventLoggingHandler(final SeatReservationRepository e) {
        this.seatReservationRepo = e;
    }

    @EventHandler
    protected void on(SeatResurvationCreateEvent event) {
        Timestamp timestamp = new Timestamp(System.currentTimeMillis());
        Seat seat=new Seat(event.getId(), event.getSeatId(), event.getDate(),timestamp ,true);
        seatReservationRepo.save(seat);
    }

}

这是 yml 配置

axon:
  eventhandling:
    processors:
      statistics.source: statisticsQueue

我该怎么做才能正确。(任何人都可以建议教程或代码示例)

4

1 回答 1

2

SpringAMQPMessageSource 是一个 SubscribableMessageSource。这意味着您不能使用跟踪事件处理器来处理消息。它仅与可订阅事件处理器兼容。

删除configuration.registerTrackingProcessor("statistics");并将其保留为默认值(订阅)应该可以解决问题。

于 2018-04-03T07:58:51.033 回答