我目前正在使用 Elixir 构建一个软实时事件处理系统,现在我试图围绕 GenStage/Flow 来了解这是否是构建的正确抽象。不幸的是,大型应用程序的示例很少,大多数是关于过时数据的并行处理。我使用无限事件流作为源。
我的问题是,在我订阅将事件推送到我的应用程序的无限外部事件流的情况下,在 GenStage/Flow 上构建是否有意义。我希望在事件到达服务器时立即处理它们。那就是我不想缓冲它们,直到我得到 500 让 Flow 开始需求。但是使用 1 的最小需求有意义吗?
我会说,对于任何接近实时的 GenStage 都行不通。核心思想是推迟评估,直到有需求(例如,工人准备好)。如果您想立即处理它们,只需为每个事件生成新的 Elixir 进程并祈祷调度程序不会阻塞 :)
即使您使用 1 的最小需求,它也会按顺序处理,并且您会缓冲溢出事件。是的,您可以并行化,但我不确定您是否必须立即开始并行阶段。同样,当您同时遇到 N 个具有 N 个并行工作人员的事件时,这些事件将被缓冲。
更新经过一番考虑,我认为在某些情况下,GenStage 或其更高阶的派生流对于近实时处理仍然有用。为了避免进程创建的开销,可以使用固定宽度的窗口来收集和分区事件,然后可以由不同的消费者、池甚至在需要时由单个进程处理。唯一的缺点或限制是这将引入量化,可能低至 1ms:https ://github.com/elixir-lang/flow/blob/v0.14.2/lib/flow/window.ex#L324但这只是理论上的推测,我没有尝试过。