0

我有一个场景,在给定一个 observable 的情况下,我想知道任何从未到达任何订阅者的元素(例如过滤掉)并据此采取行动。实现这样的目标的最佳方法是什么?

4

1 回答 1

0

选项1

结合使用发布和功能组合。

var Rx        = require('rx')
    log       = console.log.bind(console),
    source    = Rx.Observable.interval(100).take(100);
    published = source.publish(),
    accepted  = published.where(condition),
    rejected  = published.where(not(condition));

accepted.subscribe(log.bind(undefined, 'accepted: '));
rejected.subscribe(log.bind(undefined, 'rejected: '));

published.connect();

function condition (x) {
  return x % 4 === 0;
}

function not (func) {
  return function () {
    return !func.apply(this, arguments);
  }
}

选项#2

标记(变异)事件并在最后一秒过滤。

var Rx        = require('rx')
    log       = console.log.bind(console),
    source    = Rx.Observable.interval(100).take(100);

source
  .map(toAcceptable)
  .doAction(function (x) {
    x.acceptable = x.acceptable && condition1(x.value);
  })
  .doAction(function (x) {
    x.acceptable = x.acceptable && condition2(x.value);
  })
  .doAction(function (x) {
    log(x.acceptable ? 'accepted:' : 'rejected:', x);
  })
  .subscribe();

function condition1 (x) {
  return x % 4 === 0;
}

function condition2 (x) {
  return x % 3 === 0;
}

function toAcceptable (x) {
  return {
    acceptable: true,
    value: x
  };
}
于 2014-05-23T16:31:09.790 回答