6

我希望以下代码会异步运行:

var range = Rx.Observable.range(0, 3000000);

range.subscribe(
  function(x) {},
  function(err) {},
  function() {
    console.log('Completed');
});

console.log('Hello World');

但事实并非如此。通过大范围的数字需要一段时间,只有在完成后才能恢复执行,您可以在这里尝试代码。

我对何时期望 RxJS 同步或异步运行感到困惑。这取决于使用的方法吗?我之前的想法是,一旦我们进入 Observables/Observer 领域,其中的一切都是异步运行的,类似于 Promise 的工作方式。

4

1 回答 1

21

RxJs 遵循与 Rx.Net 相同的规则。默认情况下,每个可观察操作员使用完成其工作所需的最小异步性。在这种情况下,Range可以同步运行这些数字,它确实如此(它的文档告诉你它将Rx.Scheduler.currentThread默认使用 .

如果您想引入比操作所需更多的异步性,则需要告诉它使用不同的Scheduler

要获得您期望的行为,您需要使用Rx.Scheduler.timeout. 从本质上讲,这将导致它通过setTimeout. (实际上并不是这么简单,调度器会使用浏览器中最快的方法来调度延迟的工作)。

var range = Rx.Observable.range(0, 3000000, Rx.Scheduler.timeout);

更新了 jsFiddle

请注意,通过 300 万个数字迭代setTimeout将花费几乎永远的时间。因此,也许我们希望分批处理 1,000 个。因此,在这里我们将利用Range同步运行的默认行为,然后批处理值并observeOn通过我们的超时调度程序用于运行批处理:

var range = Rx.Observable
    .range(0, 3000000)
    .bufferWithCount(1000)
    .observeOn(Rx.Scheduler.timeout) // run each buffer via setTimeout
    .select(function (buffer, i) {
       console.log("processing buffer", i);
       return Rx.Observable.fromArray(buffer);
     })
    .concatAll(); // concat the buffers together

jsFiddlerange请注意,在遍历所有 3,000,000 个值并bufferWithCount生成 3,000 个数组时,开始时会有延迟。对于您的数据源不像Observable.range.

仅供参考的承诺在这方面没有任何不同。如果你调用then一个已经完成的promise,这个then函数可能会同步运行。所有 Promise 和 Observables 真正做的是提供一个接口,通过该接口,您可以提供保证在满足条件时运行的回调,无论条件已经满足还是稍后会满足。然后,RxJs 提供了许多机制来强制异步运行某些东西,如果你真的想要那样的话。以及介绍具体时间的方法。

于 2014-01-23T18:31:32.090 回答