41

对我来说,RxJs 5share()运算符的工作原理并不是 100% 清楚,请参阅此处的最新文档。Jsbin 的问题在这里

如果我用一系列 0 到 2 创建一个 observable,每个值间隔一秒:

var source = Rx.Observable.interval(1000)
.take(5)
.do(function (x) {
    console.log('some side effect');
});

如果我为这个 observable 创建两个订阅者:

source.subscribe((n) => console.log("subscriptor 1 = " + n));
source.subscribe((n) => console.log("subscriptor 2 = " + n));

我在控制台中得到了这个:

"some side effect ..."
"subscriptor 1 = 0"
"some side effect ..."
"subscriptor 2 = 0"
"some side effect ..."
"subscriptor 1 = 1"
"some side effect ..."
"subscriptor 2 = 1"
"some side effect ..."
"subscriptor 1 = 2"
"some side effect ..."
"subscriptor 2 = 2"

我以为每个订阅都会订阅同一个 Observable,但似乎并非如此!就像订阅的行为创建了一个完全独立的 Observable!

但是如果将share()操作符添加到源 observable 中:

var source = Rx.Observable.interval(1000)
.take(3)
.do(function (x) {
    console.log('some side effect ...');
})
.share();

然后我们得到这个:

"some side effect ..."
"subscriptor 1 = 0"
"subscriptor 2 = 0"
"some side effect ..."
"subscriptor 1 = 1"
"subscriptor 2 = 1"
"some side effect ..."
"subscriptor 1 = 2"
"subscriptor 2 = 2"

如果没有share().

这是怎么回事,share()操作员是如何工作的?每个订阅是否都会创建一个新的 Observable 链?

4

2 回答 2

23

请注意,您使用的是 RxJS v5,而您的文档链接似乎是 RxJS v4。我不记得具体细节,但我认为share运营商经历了一些变化,特别是在完成和重新订阅方面,但不要相信我的话。

回到您的问题,正如您在研究中所表明的那样,您的期望与图书馆设计不符。Observables 懒惰地实例化它们的数据流,具体地在订阅者订阅时启动数据流。当第二个订阅者订阅相同的 observable 时,会启动另一个新的数据流,就像它是第一个订阅者一样(所以是的,每个订阅都会创建一个新的 observable 链,如您所说)。这就是在 RxJS 术语中创造的冷可观察对象,这是 RxJS 可观察对象的默认行为。如果你想要一个 observable,它在数据到达的那一刻将其数据发送给它拥有的订阅者,这就是一个 hot observable,而获得 hot observable 的一种方法是使用share操作符。

您可以在此处找到图示的订阅和数据流:Hot and Cold observables:是否有“热”和“冷”运算符?(这对 RxJS v4 有效,但大部分对 v5 有效)。

于 2016-02-01T22:47:54.430 回答
22

如果满足这两个条件,则 share 会使 observable 变得“热”:

  1. 订阅者数量 > 0
  2. 并且 observable 尚未完成

场景一:订阅者数量 > 0 并且 observable 在新订阅之前没有完成

var shared  = rx.Observable.interval(5000).take(2).share();
var startTime = Date.now();
var log = (x) => (value) => { 
    console.log(`onNext for ${x}, Delay: ${Date.now() - startTime} , Value: ${value}`);
};

var observer1 = shared.subscribe(log('observer1')),
    observer2;

setTimeout(()=>{
    observer2 = shared.subscribe(log('observer2'));
}, 3000);

// emission for both observer 1 and observer 2, with the samve value at startTime + 5 seconds
// another emission for both observers at: startTime + 10 seconds

场景二:新订阅前订阅人数为零。变“冷”

 var shared  = rx.Observable.interval(5000).take(2).share();
    var startTime = Date.now();
    var log = (x) => (value) => { 
    console.log(`onNext for ${x}, Delay: ${Date.now() - startTime} , Value: ${value}`);
};

var observer1 = shared.subscribe(log('observer1')),
    observer2;

setTimeout(()=>{
    observer1.unsubscribe(); 
}, 1000);

setTimeout(()=>{
    observer2 = shared.subscribe(log('observer2')); // number of subscribers is 0 at this time
}, 3000);
// observer2's onNext is called at startTime + 8 seconds
// observer2's onNext is called at startTime + 13 seconds

场景 3:当 observable 在新订阅之前完成时。变“冷”

 var shared  = rx.Observable.interval(5000).take(2).share();
    var startTime = Date.now();
    var log = (x) => (value) => { 
        console.log(`onNext for ${x}, Delay: ${Date.now() - startTime} , Value: ${value}`);
    };

var observer1 = shared.subscribe(log('observer1')),
    observer2;

setTimeout(()=>{
    observer2 = shared.subscribe(log('observer2'));
}, 12000);

// 2 emission for observable 1, at startTime + 5 secs, and at startTime + 10secs
// 2 emissions for observable 2,at startTime + 12 + 5 secs, and at startTime + 12 + 10secs
于 2016-11-19T23:04:50.253 回答