我试图找到某种基于另一个可观察值的值过滤可观察值的方法。例如,假设我们只想接收时间 x 和 y 之间的事件。可以根据计时器的值过滤可观察对象吗?
2 回答
几种方式。没有一些代码,很难知道哪个是最好的。
通过 CombineLatest(一直收听并根据最新值进行过滤):
var astream = ...;
var bstream = ...;
var filtered = Observable.CombineLatest(astream, bstream, (a, b) => new { a, b })
.Where(v => v.b >= x && v.b <= y)
.Select(v => v.a); // Alas sometimes you will get duplicate a's.
astream
通过选择和切换(仅在bstream
满足某些条件时才收听):
var astream = ...;
var bstream = ...;
var filtered = bstream
.Select(b => (b >= x && b <= y) ? astream : Observable.Empty<T>())
.Switch();
正如Brandon所提到的,有很多方法可以组合事件流。
Reactive joins withObservable.Join
是一个非常通用的工具,但很大一部分 Rx 内置运算符库以支持基于另一个过滤的方式组合流。
我真的很喜欢 Brandon 的Select
+Switch
技术(我 +1);我已经把它归档以备将来使用!
这是一种直接解决将源流过滤到开始和结束时间的问题的方法。Select
它比+有一些优势,Switch
包括:
- 它避免检查源流的每个事件的过滤条件。
- 它避免了需要“空”流。
- 它只订阅一次源。
- 它
OnCompleted()
会在到达结束时间后立即发送,而不是像源流一样持续。
它实际上归结为Observable.Window
运算符的特定过载,但我将逐步解释它。
Window
基本思想是通过应用在开始时间打开并在结束时间关闭的a 来过滤源流。
xs
首先让我们创建一个一秒脉冲的示例源流 ( ),以及开始时间和结束时间:
var xs = Observable.Interval(TimeSpan.FromSeconds(1));
var startTime = DateTime.Now + TimeSpan.FromSeconds(5);
var endTime = DateTime.Now + TimeSpan.FromSeconds(8);
为简洁起见,我没有检查startTime
之前的内容endTime
。现在我们创建一个流来打开窗口,以及一个流来关闭窗口:
var start = Observable.Timer(startTime);
var end = Observable.Timer(endTime);
最后使用Observable.Window
. 该操作符的输出是一个流 ( IObservable<IObservable<T>>
) - 每个子流都是一个新窗口。
我们将使用的重载接受一个流,其事件标记一个新窗口的打开,以及一个工厂函数,用于在给定触发窗口打开的事件的情况下提供关闭流。
使用我们的 Timer 流,我们知道将在开始时间创建一个窗口,并在结束时间关闭。
我们使用以下方法展平流Observable.Merge
:
var filtered = xs.Window(start, _ => end).Merge();
如果我们这样订阅:
filtered.Subscribe(Console.WriteLine);
正如预期的那样,我们得到以下输出:
4
5
6
同样,有很多很多方法可以解决这个问题,而不仅仅是使用Window
. 例如,您还可以轻松扩展此解决方案以支持多个窗口(通过使用打开时间流和关闭时间流工厂)。