4

我研究 DDD 已经有一段时间了,偶然发现了 CQRS 和事件溯源 (ES) 等设计模式。这些模式可用于帮助以更少的努力实现 DDD 的某些概念。

然后我开始开发一个简单的软件来实现所有这些概念。并开始想象可能的失败路径。

只是为了澄清我的架构,下图描述了一个来自前端并到达控制器的请求,我是后端(为简单起见,我忽略了所有过滤器、活页夹)。

序列图

  1. 演员发送一份表格,其中包含他想从一个帐户中提取的金额。
  2. 控制器将视图模型传递给应用程序层,在那里它将被转换为一个命令
  3. 应用层打开一个工作单元 (UOW) 将 VM 映射到命令并将命令发送到调度程序。
  4. 调度程序找到知道如何处理命令(帐户)的相应聚合类,并向工厂请求帐户的特定实例。
  5. 工厂创建一个新的帐户实例并从事件存储中请求所有事件。
  6. 事件存储返回帐户的所有事件。
  7. 工厂将所有事件发送到聚合,以便其内部状态正确。并返回帐户的实例。
  8. 调度程序将命令发送到帐户,以便对其进行处理。
  9. 帐户检查它是否有足够的钱进行提款。如果有,它会发送一个新事件“MoneyWithdrawnEvent”。
  10. 此事件由更改其内部状态的聚合(帐户)处理。
  11. 应用程序层关闭 UOW,当它关闭时,UOW 检查所有加载的聚合以检查它们是否有新事件要保存到事件存储中。如果有,它将事件发送到存储库。
  12. 存储库将事件持久化到事件存储区。

可以添加许多层,例如:聚合的缓存、事件的缓存、快照等。

有时 ES 可以与关系数据库并行使用。因此,当 UOW 保存已发生的新事件时,它还将聚合保存到关系数据库中。

ES 的好处之一是它有一个中心数据源,即事件存储。因此,即使内存甚至关系数据库中的模型损坏,我们也可以从事件中重建模型。

有了这个事实来源,我们可以构建其他系统,以不同的方式使用事件来形成不同的模型。

但是,要使其发挥作用,我们需要真理的来源是干净的,没有被破坏的。否则所有这些好处都不会存在。

也就是说,如果我们考虑图像中描述的架构中的并发性,可能会出现一些问题:

  • 如果actor在一个排序周期内向后端发送了两次表单,并且后端启动了两个线程(每个请求一个),那么他们将调用两次应用层,并启动两个UOW,以此类推。这可能导致将两个事件存储在事件存储中。

这个问题可以在很多不同的地方处理:

  1. 前端可以控制哪些用户/参与者可以执行什么操作以及执行多少次。

  2. Dispatcher 可以拥有所有正在处理的命令的缓存,并且如果存在引用同一聚合(帐户)的命令,它会引发异常。

  3. 存储库可以在保存之前创建聚合的新实例并运行事件存储中的所有事件,以检查版本是否仍与步骤 7 中获取的版本相同。

每个解决方案的问题:

  1. 前端

    • 用户可以通过编辑一些 javascript 来绕过这个限制。
    • 如果打开了多个会话(例如不同的浏览器),则需要一些静态字段来保存对所有打开的会话的引用。并且有必要锁定一些静态变量来访问该字段。
    • 如果有多个服务器用于执行特定操作(水平缩放),则此静态字段将不起作用,因为有必要在所有服务器之间共享此字段。因此,需要一些层(例如 Redis)。
  2. 命令缓存

    • 为了使该解决方案起作用,有必要在读取和写入命令缓存时锁定命令缓存的某些静态变量。

    • 如果有多个服务器用于正在执行的应用程序层的特定用例(水平扩展),则此静态缓存将不起作用,因为有必要在所有服务器之间共享它。因此,需要一些层(例如 Redis)。

  3. 存储库版本检查

    • 为了使该解决方案起作用,有必要在进行检查(数据库版本等于步骤 7 中获取的版本)和保存之前锁定一些静态变量。

    • 如果系统是分布式的(水平规模),则有必要锁定事件存储。因为,否则,两个进程都可以通过检查(数据库的版本等于步骤 7 中获取的版本),然后一个保存,然后另一个保存。并且根据技术,不可能锁定事件存储。因此,将有另一个层来序列化对事件存储的每次访问并添加锁定存储的可能性。

这种锁定静态变量的解决方案有点好,因为它们是局部变量并且非常快。但是,依赖于 Redis 之类的东西会增加一些较大的延迟。如果我们谈论锁定对数据库的访问(事件存储),甚至更多。甚至更多,如果这必须通过另一项服务来完成。

我想知道是否有任何其他可能的解决方案来处理这个问题,因为这是一个主要问题(事件存储上的腐败),如果没有办法解决它,整个概念似乎是有缺陷的。

我对架构中的任何变化持开放态度。例如,如果一种解决方案是添加一个事件总线,以便所有内容都通过它汇集,那很好,但我看不出这能解决问题。

我不熟悉的另一点是卡夫卡。我不知道 Kafka 是否为这个问题提供了一些解决方案。

4

2 回答 2

6

尽管您提供的所有解决方案都可以在某些特定情况下工作,但我认为最后一个解决方案(3.2)适用于更一般的用例。我在我的开源框架中使用它并且效果很好。

因此,事件存储负责确保聚合不会同时被两个命令改变。

一种方法是使用乐观锁定。当聚合从事件存储中加载时,您会记住它的version. 当您持久化事件时,您尝试将它们附加到version + 1. 每个AggregateType-AggregateId-version. 如果追加失败,您应该重试整个过程(加载+句柄+追加)。

我认为这是最具可扩展性的解决方案,因为它甚至适用于分片,当分片键是 AggregateId 的子集时。

您可以轻松地将 MongoDB 用作 EventStore。在 MongoDB <= 3.6 中,您可以通过插入包含事件数组的嵌套文档的单个文档来原子地附加所有事件。

另一种解决方案是使用悲观锁定。您在加载聚合、附加事件、增加其版本和提交之前启动事务。您需要使用 2 个表/集合,一个用于聚合元数据+版本,一个用于实际事件。MongoDB >= 4.0 有事务。

在这两种解决方案中,事件存储都不会损坏

我不熟悉的另一点是卡夫卡。我不知道 Kafka 是否为这个问题提供了一些解决方案。

您可以将 Kafka 与事件溯源一起使用,但您需要更改架构。看到这个答案。

于 2018-03-26T21:53:22.003 回答
3

简短的回答:原子事务仍然是一回事。

更长的答案:要正确处理并发写入,您要么需要lock,要么需要条件写入(也称为比较和交换)。

使用日志:我们需要在第 6 步之前获取锁,并在第 12 步之后释放锁。

使用条件写入:在第 6 步,存储库将捕获并发谓词(可能是隐式的——例如,读取的事件计数)。在步骤 12 执行写入时,将检查并发谓词以确保没有并发修改。

例如,Event Store 的 HTTP API使用ES-ExpectedVersion;客户端负责计算(从它获取的事件中)它期望写入发生的位置。

Gabriel Schenker 在他 2015 年的文章Event Sourcing applied - the Repository中描述了 RDBMS 存储库和事件存储库。

当然,随着条件写入的引入,您应该考虑在写入失败时您希望模型做什么。您可能会引入重试策略(转到第 6 步),或尝试合并策略,或者干脆失败并返回给发送者。

在您的条件写入示例中,我假设有必要在步骤 11 中添加一个 Lock(以便它锁定事件存储以获取并发谓词)。并且只有在将新事件写入事件存储后才释放锁。否则,两个并发进程可以通过并发谓词检查并保存事件。

不必要。

如果您的持久性存储提供锁,但不提供条件写入,那么您的想法是正确的:在第 12 步中,存储库将获取锁,检查前提条件,提交新事件,然后释放锁。

但是理解条件写入的持久性设备可以为您实现该检查。使用事件存储,存储库不需要获取锁。它将带有有关预期状态的元数据的事件发送到存储。事件存储本身使用该信息来执行条件写入。

没有魔法 -需要有人做这项工作以确保并发写入不会相互干扰。但它不一定必须在您的代码中。

请注意,我使用的是 Eric Evans 在蓝皮书中所描述的“存储库”——它是一种抽象,它隐藏了您对如何从系统的其余部分存储事件的选择;换句话说,是适配器使您的事件存储看起来像内存中的事件集合——而不是事件存储本身。

于 2018-03-26T21:54:44.957 回答