我从 Axon 框架开始,遇到了一些障碍。
虽然我可以使用它们的 ID 加载单个聚合,但我不知道如何获取所有聚合的列表或所有聚合 ID 的列表。
该类EventSourcingRepository
只有load()
返回一个聚合的方法。
有没有办法对所有聚合(ID)或者我应该在轴突之外保留所有聚合 ID 的列表?
为了简单起见,我现在只使用一个InMemoryEventStorageEngine
。我正在使用 Axon 3.0.7。
我从 Axon 框架开始,遇到了一些障碍。
虽然我可以使用它们的 ID 加载单个聚合,但我不知道如何获取所有聚合的列表或所有聚合 ID 的列表。
该类EventSourcingRepository
只有load()
返回一个聚合的方法。
有没有办法对所有聚合(ID)或者我应该在轴突之外保留所有聚合 ID 的列表?
为了简单起见,我现在只使用一个InMemoryEventStorageEngine
。我正在使用 Axon 3.0.7。
首先,我想知道您为什么要从Repository
. 该Repository
接口设置为您可以加载一个Aggregate
来处理命令或创建一个新的Aggregate
.
问你的问题,我几乎猜你是用它来查询而不是命令处理。然而,这不是EventSourcingRepository
.
你想要这个我能想到的一个原因是你想要实现一个 API 调用来发布一个命令到Aggregates
你的应用程序中的所有特定类型。考虑到这种情况,是的,您需要自己存储 aggregateId 引用。
但以我之前的问题结束:为什么要通过Repository
接口检索聚合列表?
答案更新
关于您的评论,我在回答中添加了以下内容:
Axon 可以帮助您在设置应用程序时考虑到事件溯源,同时也考虑到 CQRS(命令查询责任分离)。因此,这意味着您的应用程序的命令和查询端被分开了。
聚合Repository
是应用程序的命令端,您请求执行操作。因此,它不提供聚合列表,因为命令是对聚合的意图表达。因此,它只需要Repository
用户检索一个聚合或创建一个聚合。
您需要的聚合列表示例是应用程序的查询端。查询端(您的视图/实体)通常根据事件(通过事件获取)进行更新。对于您的应用程序中的任何查询要求,您通常会引入一个针对您的需求量身定制的单独视图。
在您的示例中,这意味着您将引入一个事件处理组件,监听您的聚合事件,它使用聚合的查询模型更新存储库。
EventStore
传入的EventSourcingRepository
工具StreamableMessageSource<M extends Message<?>>
是一种访问聚合的方法。
虽然使用事件处理组件的框架方式可能会更好地扩展(取决于它的使用方式/上下文),但我很确定事件处理组件StreamableMessageSource<M extends Message<?>>
无论如何都是由驱动的。因此,如果我们想跳过框架并直接进入,我们可以这样做:
List<String> aggregates(StreamableMessageSource<Message<?>> eventStore) {
return immediatelyAvailableStream(eventStore.openStream(
eventStore.createTailToken() /* All events in the event store */
))
.filter(e -> e instanceof DomainEventMessage)
.map(e -> (DomainEventMessage) e)
.map(DomainEventMessage::getAggregateIdentifier)
.distinct()
.collect(Collectors.toList());
}
/*
Note that the stream returned by BlockingStream.asStream() will block / won't terminate
as it waits for future elements.
*/
static <M> Stream<M> immediatelyAvailableStream(final BlockingStream<M> messageStream) {
Iterator<M> iterator = new Iterator<M>() {
@Override
public boolean hasNext() {
return messageStream.hasNextAvailable();
}
@Override
public M next() {
try {
return messageStream.nextAvailable();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException("Didn't expect to be interrupted");
}
}
};
Spliterator<M> spliterator = Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED);
Stream stream = StreamSupport.stream(spliterator, false);
return (Stream)stream.onClose(messageStream::close);
}