3

我试图找到某种基于另一个可观察值的值过滤可观察值的方法。例如,假设我们只想接收时间 x 和 y 之间的事件。可以根据计时器的值过滤可观察对象吗?

4

2 回答 2

4

几种方式。没有一些代码,很难知道哪个是最好的。

通过 CombineLatest(一直收听并根据最新值进行过滤):

var astream = ...;
var bstream = ...;
var filtered = Observable.CombineLatest(astream, bstream, (a, b) => new { a, b })
          .Where(v => v.b >= x && v.b <= y)
          .Select(v => v.a); // Alas sometimes you will get duplicate a's.

astream通过选择和切换(仅在bstream满足某些条件时才收听):

var astream = ...;
var bstream = ...;
var filtered = bstream
    .Select(b => (b >= x && b <= y) ? astream : Observable.Empty<T>())
    .Switch();
于 2013-12-26T22:04:05.207 回答
1

正如Brandon所提到的,有很多方法可以组合事件流。

Reactive joins withObservable.Join是一个非常通用的工具,但很大一部分 Rx 内置运算符库以支持基于另一个过滤的方式组合流。

我真的很喜欢 Brandon 的Select+Switch技术(我 +1);我已经把它归档以备将来使用!

这是一种直接解决将源流过滤到开始和结束时间的问题的方法。Select它比+有一些优势,Switch包括:

  • 它避免检查源流的每个事件的过滤条件。
  • 它避免了需要“空”流。
  • 它只订阅一次源。
  • OnCompleted()会在到达结束时间后立即发送,而不是像源流一样持续。

它实际上归结为Observable.Window运算符的特定过载,但我将逐步解释它。

Window基本思想是通过应用在开始时间打开并在结束时间关闭的a 来过滤源流。

xs首先让我们创建一个一秒脉冲的示例源流 ( ),以及开始时间和结束时间:

var xs = Observable.Interval(TimeSpan.FromSeconds(1));

var startTime = DateTime.Now + TimeSpan.FromSeconds(5);    
var endTime = DateTime.Now + TimeSpan.FromSeconds(8);

为简洁起见,我没有检查startTime之前的内容endTime。现在我们创建一个流来打开窗口,以及一个流来关闭窗口:

var start = Observable.Timer(startTime);
var end = Observable.Timer(endTime);

最后使用Observable.Window. 该操作符的输出是一个流 ( IObservable<IObservable<T>>) - 每个子流都是一个新窗口。

我们将使用的重载接受一个流,其事件标记一个新窗口的打开,以及一个工厂函数,用于在给定触发窗口打开的事件的情况下提供关闭流。

使用我们的 Timer 流,我们知道将在开始时间创建一个窗口,并在结束时间关闭。

我们使用以下方法展平流Observable.Merge

var filtered = xs.Window(start, _ => end).Merge();

如果我们这样订阅:

filtered.Subscribe(Console.WriteLine);

正如预期的那样,我们得到以下输出:

4
5
6

同样,有很多很多方法可以解决这个问题,而不仅仅是使用Window. 例如,您还可以轻松扩展此解决方案以支持多个窗口(通过使用打开时间流和关闭时间流工厂)。

于 2013-12-27T14:42:19.570 回答