3

我从 Axon 框架开始,遇到了一些障碍。

虽然我可以使用它们的 ID 加载单个聚合,但我不知道如何获取所有聚合的列表或所有聚合 ID 的列表。

该类EventSourcingRepository只有load()返回一个聚合的方法。

有没有办法对所有聚合(ID)或者我应该在轴突之外保留所有聚合 ID 的列表?

为了简单起见,我现在只使用一个InMemoryEventStorageEngine。我正在使用 Axon 3.0.7。

4

2 回答 2

3

首先,我想知道您为什么要从Repository. 该Repository接口设置为您可以加载一个Aggregate来处理命令或创建一个新的Aggregate.

问你的问题,我几乎猜你是用它来查询而不是命令处理。然而,这不是EventSourcingRepository.

你想要这个我能想到的一个原因是你想要实现一个 API 调用来发布一个命令到Aggregates你的应用程序中的所有特定类型。考虑到这种情况,是的,您需要自己存储 aggregateId 引用。

但以我之前的问题结束:为什么要通过Repository接口检索聚合列表?

答案更新

关于您的评论,我在回答中添加了以下内容:

Axon 可以帮助您在设置应用程序时考虑到事件溯源,同时也考虑到 CQRS(命令查询责任分离)。因此,这意味着您的应用程序的命令和查询端被分开了。

聚合Repository是应用程序的命令端,您请求执行操作。因此,它不提供聚合列表,因为命令是对聚合的意图表达。因此,它只需要Repository用户检索一个聚合或创建一个聚合。

您需要的聚合列表示例是应用程序的查询端。查询端(您的视图/实体)通常根据事件(通过事件获取)进行更新。对于您的应用程序中的任何查询要求,您通常会引入一个针对您的需求量身定制的单独视图。

在您的示例中,这意味着您将引入一个事件处理组件,监听您的聚合事件,它使用聚合的查询模型更新存储库。

于 2017-11-13T08:31:32.270 回答
0

EventStore传入的EventSourcingRepository工具StreamableMessageSource<M extends Message<?>>是一种访问聚合的方法。

虽然使用事件处理组件的框架方式可能会更好地扩展(取决于它的使用方式/上下文),但我很确定事件处理组件StreamableMessageSource<M extends Message<?>>无论如何都是由驱动的。因此,如果我们想跳过框架并直接进入,我们可以这样做:

    List<String> aggregates(StreamableMessageSource<Message<?>> eventStore) {
        return immediatelyAvailableStream(eventStore.openStream(
                eventStore.createTailToken() /* All events in the event store */
        ))
                .filter(e -> e instanceof DomainEventMessage)
                .map(e -> (DomainEventMessage) e)
                .map(DomainEventMessage::getAggregateIdentifier)
                .distinct()
                .collect(Collectors.toList());
    }

    /*
        Note that the stream returned by BlockingStream.asStream() will block / won't terminate
        as it waits for future elements.
     */
    static <M> Stream<M> immediatelyAvailableStream(final BlockingStream<M> messageStream) {
        Iterator<M> iterator = new Iterator<M>() {
            @Override
            public boolean hasNext() {
                return messageStream.hasNextAvailable();
            }

            @Override
            public M next() {
                try {
                    return messageStream.nextAvailable();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new IllegalStateException("Didn't expect to be interrupted");
                }
            }
        };

        Spliterator<M> spliterator = Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED);
        Stream stream = StreamSupport.stream(spliterator, false);
        return (Stream)stream.onClose(messageStream::close);
    }
于 2019-11-28T00:51:03.533 回答