1

我有消息的来源,它是一个Observable. 对于每条消息,我想进行一个 HTTP 调用,该调用将产生另一个Observable,所以我将它们与 结合在一起,flatMap然后将它们发送给某个订阅者。这里是这个场景的代码:

Rx.Observable.interval(1000)
.flatMap (tick) ->
  // returns an `Observable`
  loadMessages()
.flatMap (message) ->
  // also returns and `Observable`
  makeHttpRequest(message)
.subscribe (result) ->
  console.info "Processed: ", result

这个例子是用咖啡脚本编写的,但我认为问题陈述对于任何其他 Rx 实现都是有效的。

我对这种方法的问题是loadMessages会很快产生大量消息。这意味着,我在很短的时间内发出了很多 HTTP 请求。这在我的情况下是不可接受的,所以我想将并行 HTTP 请求的数量限制在 10 个左右。换句话说,当我发出 HTTP 请求时,我想限制管道或应用某种背压。

Rx 是否有任何标准方法或最佳实践来处理这种情况?

目前,我实现了非常简单(并且非常次优)的背压机制,如果系统处理的按摩过多,则会忽略滴答声。它看起来像这样(简化版):

Rx.Observable.interval(1000)
.filter (tick) ->
  stats.applyBackpressureBasedOnTheMessagesInProcessing()
.do (tick) ->
  stats.messageIn()
.flatMap (tick) ->
  // returns an `Observable`
  loadMessages()
.flatMap (message) ->
  // also returns and `Observable`
  makeHttpRequest(message)
.do (tick) ->
  stats.messageOut()
.subscribe (result) ->
  console.info "Processed: ", result

不过,我不确定这是否可以做得更好,或者 Rx 可能已经有一些机制来处理这种需求。

4

3 回答 3

2

这不是严格的背压,这只是限制并发。这是一个简单的方法(忽略我可能错误的语法,通过TextArea编码):

Rx.Observable.interval(1000)
    .flatMap (tick) ->
        // returns an `Observable`
        loadMessages()
    .map (message) ->
        // also returns and `Observable`, but only when
        // someone first subscribes to it
        Rx.Observable.defer ->
            makeHttpRequest(message)
    .merge 10 // at a time
    .subscribe (result) ->
        console.info "Processed: ", result

在 C# 中,等效的想法是,而SelectMany不是Select(Defer(x)).Merge(n). Merge(int)最多订阅nin-flight Observables,并缓冲其余的直到稍后。我们有 , 的原因是为了让我们在订阅我们Defer之前不做任何工作。Merge(n)

于 2014-03-20T20:12:19.710 回答
1

听起来您想从队列中拉出而不是推送您的 http 请求。Rx 真的是这里正确的技术选择吗?

编辑:

一般来说,我不会使用 Rx 来设计解决方案,因为我对源事件有完全的命令式控制。这不是一个被动的场景。

Rxjs 中的背压模块显然是为处理您不拥有源流的情况而编写的。在这里你做。

TPL 数据流听起来更适合这里。

如果你必须使用 RX,你可以像这样设置一个循环:如果你想限制 X 个并发事件,设置一个 Subject 作为你的消息源并强制推送 ( OnNext) X 个消息到它。在您的订阅者中,您可以在 OnNext 处理程序的每次迭代中将新消息推送到主题,直到源耗尽。这保证了最多 X 条消息在传输中。

于 2014-03-17T22:37:33.620 回答
1

在 RXJS 中,您可以使用背压子模块

http://rxjs.codeplex.com/SourceControl/latest#src/core/backpressure/

disclaimer我从未使用过 JS 的 RX 版本,但您确实要求一种实现背压的标准方法,并且核心库似乎支持它。RX c# 尚无此支持。不知道为什么。

于 2014-03-18T05:15:14.983 回答