1

在 Node 应用程序中,我正在尝试使用 RxJS 处理事件流。事件流是对许多文档的更改列表。我正在使用 groupBy 通过 documentId 将流划分为新流。但我想知道,一旦在客户端关闭文档并且没有新事件添加到该 documentId 的流中,groupBy 会在该文档的流为空时处理它吗?如果没有,我将如何手动执行此操作?我想避免由正在创建但从未销毁的新文档流引起的内存泄漏。

4

2 回答 2

2

我建议做什么:

不仅有一个可观察的documentChanges,还有一个可观察的documentEvents

客户端将在打开文档时发送documentOpened事件,在更改文档时发送documentChanged事件,在关闭文档时发送 documentClosed 事件。

通过通过同一个 observable 发送所有 3 种类型的事件,您建立并保证了排序。如果客户端按该顺序发送documentOpeneddocumentChangeddocumentClosed事件,那么您的服务器将按该顺序查看它们。请注意,不能保证 2 个不同客户端发送的事件顺序。这只会让您确保特定客户端发送的事件是有序的。

然后,这就是您使用的方式groupByUntil

documentEvents
    .groupByUntil(
        function (e) { return e.documentId; }, // key
        null, // element
        function (group) { // duration selector
            var documentId = group.key;
            return group.filter(function (e) { return e.eventType === 'documentClosed'; });
      })
    .flatMap(function (eventsForDocument) {
        var documentId = eventsForDocument.key;
        return eventsForDocument.whatever(...);
    })
    .subscribe(...);

另一个更简单的选项:您可以在空闲期后使组到期。根据您对事件的处理方式,这可能绰绰有余。如果文档在 5 分钟内未编辑,则此示例使组过期。如果有更多的编辑出现,那么就会产生一个新组。

var idleTime = 5 * 60 * 1000;
events
    .groupByUntil(
        function(e) { return e.documentId; },
        null,
        function(g) { return g.debounce(idleTime); })
    .flatMap...
于 2015-05-21T21:40:01.050 回答
1

由于您包含了 .NET 标记,因此我还将介绍 Rx.NET。

你的问题措辞有点不正确。当且仅当它们从未有事件时,流才是空的。所以,它们不能变成空的。不过,不发送数据的流通常不会消耗太多资源。

在 .NET 中,组在源终止之前不会终止。我们使用“GroupByUntil”,它允许您为每个组指定一个 durationSelector 流。Observable.Timer 通常适用于此。

这意味着随着时间的推移,您可能会获得多个具有相同键的非并发流,但如果(通常情况下)您的组流在某个时候被压平,那也没关系。

在 rxjs 中,我们也有 groupByUntil。

在 Rx-Java 中,行为相似的 groupByUntil 方法被滚动到 groupBy - 请参阅https://github.com/ReactiveX/RxJava/pull/1727https://github.com/benjchristensen/RxJava/commit/b9302956832e3e77579f63fd9db25aa60eb4192a更多细节。

http://reactivex.io/documentation/operators/groupby.html说:

如果您取消订阅其中一个 GroupedObservable,则该 GroupedObservable 将被终止。如果源 Observable 稍后发出一个项,其键与以这种方式终止的 GroupedObservable 匹配,则 groupBy 将创建并发出一个新的 GroupedObservable 以匹配该键。

因此,在 Rx-Java 中,您必须取消订阅分组的 observable 流才能终止它。takeUntil使用timer流可以解决此问题。

附录:

作为对您的评论的回应,在下游操作员取消订阅之前,流不会终止。groupByUntil 的持续时间选择器会导致终止。如果文档一旦关闭就不会再次打开,那么您只需将“documentclosed”事件发送到流中并使用常规 groupBy 和 takeWhile 测试“documentClosed”。

文档不再打开很重要的原因是,如果使用 groupBy(在 rx-js 和 rx.net 中),如果已经看到的密钥再次出现,则不会创建新组。

如果这是一个问题,那么您将需要使用 groupByUntil 并使用已发布的流来监视 documentClosed 事件 - 使用已发布的流将确保您不会收到订阅副作用。

于 2015-05-21T17:05:17.683 回答