24

github 上的 fromArray Rx wiki

coffee> rext = require 'rx'                                                 
coffee> arr = [1..5]                                                 
[ 1, 2, 3, 4, 5 ]                                                    
coffee> obs = rext.Observable.fromArray(arr)                         
{ _subscribe: [Function] }                                           
coffee> obs.subscribe( (x) -> console.log("added value: " + x))      
added value: 1                                                       
added value: 2                                                       
added value: 3                                                       
added value: 4                                                       
added value: 5                                                       
{ isStopped: true,                                                   
  observer:                                                          
   { isStopped: true,                                                
     _onNext: [Function],                                            
     _onError: [Function: defaultError],                             
     _onCompleted: [Function: noop] },                               
  m: { isDisposed: true, current: null } }                           
coffee> arr.push(12)    # expecting "added value: 12"                                              
6                       # instead got new length of array                                              
coffee>          

看起来该subscribe函数在创建时只会触发一次。看起来这有点用词不当,因为我实际上只是在为数组而不是观察它的变化。不过,该代码几乎与 wiki 上的代码完全相同。所以要么我做错了,要么subscribe没有按照我的预期工作。

4

4 回答 4

15

在 RxJS 中,您要查找的内容称为Subject. 您可以将数据推送到其中并从那里流式传输。

例子:

var array = [];
var arraySubject = new Rx.Subject();

var pushToArray = function (item) {
  array.push(item);
  arraySubject.next(item);
}

// Subscribe to the subject to react to changes
arraySubject.subscribe((item) => console.log(item));
于 2016-01-16T18:18:12.860 回答
2

我发现Rx.Observable.ofObjectChanges(obj)可以正常工作。

从文档页面:

使用 Object.observe 根据对对象的更改创建一个可观察的序列。

希望能帮助到你。

于 2015-12-21T10:38:44.097 回答
2

这个怎么样:

var subject = new Rx.Subject();
//scan example building an array over time
var example = subject.scan((acc, curr) => { 
    return acc.concat(curr);
}
,[]);

//log accumulated values
var subscribe = example.subscribe(val => 
    console.log('Accumulated array:', val)
);

//next values into subject
subject.next(['Joe']);
subject.next(['Jim']);
于 2018-04-17T10:48:17.223 回答
1

Observable.fromArray 创建一个 Observable,当您添加订阅者时,它会立即为每个数组项触发事件。因此,它不会“监视”该数组的更改。

如果你需要一个“可推送集合”,Bacon.js中的 Bus 类可能就是你要找的。对于 RxJs,我的MessageQueue小类具有类似的功能。

于 2013-01-23T13:26:36.193 回答