1

我正在构建一个 Angular2 应用程序,所以我已经习惯了 Observables 和 Reactive Extensions 作为一个整体。我正在使用 TypeScript 和 rxjs。

现在我有一个可观察的,或者如果你愿意的话,我有一个流,它包含一些对象的数组。比方说人对象。现在我有另外两个 Person-objects 流,并且想将它们组合起来,所以我得到一个始终是最新的流:

var people$ = getPeople();                  // Observable<Person[]>
var personAdded$ = eventHub.personAdded;    // Observable<Person>;
var personRemoved$ = eventHub.personRemoved // Observable<Person>;

var allwaysUpToDatePeople$ = people$.doSomeMagic(personAdded$, personRemoved$, ...);

如果 people-stream 发出一个数组,比方说 5 个人,然后 personAdded-stream 发出一个人,allPeople-stream 将发出一个 6 数组。如果 personRemoved-stream 发出一个人,allPeople-流应该发出一个 Person-objects 数组,而不是刚刚由 personRemoved-stream 发出的那个。

rxjs 中是否有内置方法来获得这种行为?

4

2 回答 2

2

我的建议是您将 an 的想法包装action到一个流中,然后可以将其合并并直接应用于Array.

第一步是定义一些描述您的操作的函数:

function add(people, person) {
  return people.concat([people]);
}

function remove(people, person) {
  const index = people.indexOf(person);
  return index < 0 ? people : people.splice(index, 1);
}

注意:我们避免就地改变数组,因为它可能会产生无法预料的副作用。纯度要求我们创建数组的副本。

现在我们可以使用这些函数并将它们提升到流中以创建一个Observable发出函数的函数:

const added$ = eventHub.personAdded.map(person => people => add(people, person));
const removed$ = eventHub.personRemoved.map(person => people => remove(people, person));

现在我们得到以下形式的事件:people => people其中输入和输出将是一个人数组(在这个例子中简化为一个字符串数组)。

现在我们将如何连接它?好吧,我们真的只关心在我们有一个数组来应用它们之后添加或删除这些事件:

const currentPeople = 

  // Resets this stream if a new set of people comes in
  people$.switchMap(peopleArray => 

    // Merge the actions together 
    Rx.Observable.merge(added$, removed$)

      // Pass in the starting Array and apply each action as it comes in
      .scan((current, op) => op(current), peopleArray)

      // Always emit the starting array first
      .startWith(people)
  )
  // This just makes sure that every new subscription doesn't restart the stream
  // and every subscriber always gets the latest value
  .shareReplay(1);

根据您的需要,这种技术有几种优化(即避免函数柯里化,或使用二分搜索),但我发现上述对于一般情况来说相对优雅。

于 2016-10-14T21:33:26.617 回答
1

您想要合并所有流(Ghostbusters 风格),然后使用扫描运算符来确定状态。扫描操作符的工作方式类似于 Javascript reduce。

这是一个演示...

const initialPeople = ['Person 1', 'Person 2', 'Person 3', 'Person 4'];

const initialPeople$ = Rx.Observable.from(initialPeople);

const addPeople = ['Person 5', 'Person 6', 'Person 7'];

const addPeople$ = Rx.Observable.from(addPeople)
            .concatMap(x => Rx.Observable.of(x).delay(1000)); // this just makes it async

const removePeople = ['Person 2x', 'Person 4x'];

const removePeople$ = Rx.Observable.from(removePeople)
                                              .delay(5000)
                                                .concatMap(x => Rx.Observable.of(x).delay(1000));

const mergedStream$ = Rx.Observable.merge(initialPeople$, addPeople$, removePeople$)

mergedStream$
  .scan((acc, stream) => {
        if (stream.includes('x') && acc.length > 0) {
            const index = acc.findIndex(person => person === stream.replace('x', ''))
            acc.splice(index, 1);
        } else {
            acc.push(stream);
        }
      return acc;
  }, [])
  .subscribe(x => console.log(x))

// In the end, ["Person 1", "Person 3", "Person 5", "Person 6", "Person 7"]

http://jsbin.com/rozetoy/edit?js,console

你没有提到你的数据结构。我使用“x”作为标志有点(很多)笨重且有问题。但我认为您了解如何修改扫描运算符以适合您的数据。

于 2016-10-14T15:51:34.250 回答