1

当我尝试将 CompletableFuture 与 Axon 一起使用时,我遇到了问题。例如:

CompletableFuture future = CompletableFuture.supplyAsync(() -> {

            log.info("Start processing target: {}", target.toString());
            return new Event();

        }, threadPool);

future.thenAcceptAsync(event -> {
            log.info("Send Event");
            AggregateLifecycle.apply(event);
}, currentExecutor);

在 thenAcceptAsync - AggregateLifecycle.apply(event) 有意外行为。我的一些 @EventSourcingHandler 处理程序开始两次处理事件。有人知道如何解决吗?

我一直在阅读文档,我得到的一切都是:

在大多数情况下,DefaultUnitOfWork 将为您提供所需的功能。它期望处理发生在单个线程中。

所以,看来我应该以某种方式使用 CurrentUnitOfWork.get/set 方法,但仍然无法理解 Axon API。

4

1 回答 1

4

您不应该apply()异步事件。该apply()方法将调用聚合的内部 @EventSourcingHandler 方法,并在工作单元完成(成功)时安排发布事件。Axon 与工作单元(协调单个消息处理程序调用的活动)一起工作的方式,必须在管理该工作单元的线程中调用 apply() 方法。

如果要异步发布事件,请使用使用异步传输的事件总线,并使用跟踪处理器。

于 2018-10-23T08:27:27.900 回答