4

我有两个合并后的可观察对象,合并后进行扫描。第一个是简单范围,另一个是主题。每当主题发出一个新值时,onNext我在扫描中连接该值并将新数组作为累加器返回。如果我处理我的订阅,然后再次订阅,它会重播该范围内的值,但我已经丢失了主题中的值。在下面的代码中,我希望我的第二次订阅的最终值为[1, 2, 3, 4, 5]

最好的方法是什么?现在我有另一个主题,我在其中存储最终值并订阅它,但感觉不对。

这是一个简单的版本,演示了正在发生的事情:

var Rx = require('rx');

var source = Rx.Observable.range(1, 3);

var adder = new Rx.Subject();

var merged = source.merge(adder)
                    .scan([], function(accum, x) {
                        return accum.concat(x);
                    });

var subscription1 = merged.subscribe(function(x) {console.log(x)});
adder.onNext(4);
adder.onNext(5);

subscription1.dispose();

console.log('After Disposal');

var subscription2 = merged.subscribe(function(x) {console.log(x)});

这输出:

[ 1 ]
[ 1, 2 ]
[ 1, 2, 3 ]
[ 1, 2, 3, 4 ]
[ 1, 2, 3, 4, 5 ]
After Disposal
[ 1 ]
[ 1, 2 ]
[ 1, 2, 3 ]
4

2 回答 2

8

Subject 是一个热门的 Observable,这就是为什么第二个订阅不会看到来自 Subject 的事件。范围 Observable 是冷的,因此每个“执行实例”完全由每个订阅拥有。另一方面,Subject 的“执行实例”是单例且独立的,因此第二个订阅不会看到事件。

有几种方法可以解决这个问题。

  1. 使用 ReplaySubject。您必须指定缓冲区大小。如果您对该缓冲区没有很好的限制,则使用无限缓冲区可能会导致内存问题。
  2. 避免使用主题。换句话说,避免使用热的 Observable,将其替换为冷的 Observable,根据我一开始给出的问题描述,您的订阅不会有问题,他们会看到相同的事件。通常,Subject 可以被 Observable 替换,但这取决于您的整体架构。可能需要重写很多。而在最坏的情况下,比如 Observable 的循环依赖,你无法避免使用 Subject。
  3. 重新排列代码,使订阅在 Subject 开始发射之前开始,因此所有订阅都有机会看到来自热门 Observable 的“实时”发射。

但是,如果我对这个问题的解释是正确的,那么您只需要 发出的最后一个事件merged,因此您可以使用替代 (1) 的变体,其中只重播最后一个事件。这将是添加.shareReplay(1)to的问题merged,这将使其成为热门重放的 Observable。

于 2015-06-01T10:21:49.580 回答
1

从您的描述中,听起来您正在寻找BehaviorSubjector ReplaySubject(或它们相应的运算符publishValue()replay()),而不是scan.

您可以使用以下方法在每次重新访问页面时连接您的状态,(因为您提到它,我将使用 todo 应用程序示例):

var todos = buttonClicked
  .map(function(e) { return newTodoBox.text(); })
   //This will preserve the state of the todo list across subscriptions
  .replay();

var todoSubscription;

pageActivated.subscribe(function() {
  todoSubscription = new Rx.CompositeDisposable(
                          //Add the items to the list
                          todos.subscribe(addItem),
                          //This enables the replay to actually start
                          //receiving values
                          todos.connect());
  /*Do other activation work*/
});

pageDeactivated.subscribe(function() {
   todoSubscription && todoSubscription.dispose();
   /*Do other clean up work*/
});

这是一个工作代码示例,我认为它反映了您正在寻找的内容,请注意,在这种情况下,我不需要扫描,因为列表会被replay()更进一步的链完全补充。您会注意到,如果您添加一些待办事项,然后断开连接并再次连接,这些项目将重新出现。

var buttonClicked = Rx.Observable.fromEvent($('#add'), 'click');

var disconnect = Rx.Observable.fromEvent($('#disconnect'), 'click');

var connect = Rx.Observable.fromEvent($('#connect'), 'click');

var newTodoBox = $('#todos')[0];

var mylist = $('#mylist');

function addItem(e) {
  mylist.append('<li>' + e + '</li>');
}

var todos = buttonClicked
  .map(function(e) {
    return newTodoBox.value;
  })
  .replay();

var todoSubscription;

disconnect.subscribe(function() {
  mylist.empty();
  todoSubscription && todoSubscription.dispose();
});

connect.startWith(true).subscribe(function() {
  todoSubscription = new Rx.CompositeDisposable(
    todos.subscribe(addItem),
    todos.connect());
});
<head>
  <script src="https://ajax.googleapis.com/ajax/libs/jquery/2.1.1/jquery.min.js"></script>
  <script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/2.5.2/rx.all.js"></script>
</head>

<body>
  <input id="todos" />
  <button id="add">Add Todo</button>
  <div>
    <ul id='mylist'></ul>
  </div>
  <div>
    <button id='disconnect'>Disconnect</button>
    <button id='connect'>Connect</button>
  </div>
</body>

于 2015-06-01T06:34:39.830 回答