我有一个IObservable<Item>
类内部,我想公开一个只读属性,该属性提供在给定时间推送到可观察对象的最后一项。所以它将提供一个单一的值Item
。
如果没有推送任何值,则它必须返回一个默认值。
我怎样才能做到这一点而不必订阅可观察对象并拥有“支持字段”?
我有一个IObservable<Item>
类内部,我想公开一个只读属性,该属性提供在给定时间推送到可观察对象的最后一项。所以它将提供一个单一的值Item
。
如果没有推送任何值,则它必须返回一个默认值。
我怎样才能做到这一点而不必订阅可观察对象并拥有“支持字段”?
只是在这里补充@Asti的答案,也许可以帮助您消除挫败感:
可观察对象不是物理“事物”,它更像是一个逻辑概念。Rx 经常与 LINQ 进行比较,而且大多数时候这是一个公平的比较。但是,当您开始谈论数据结构时,它就会崩溃:LINQ 的可枚举与用于学习目的的列表非常相似。
但是,在 Rx 方面,根本没有与 List 等效的方法。Observable 是一个瞬态数据结构,所有算子都处理这个瞬态。如果您正在寻找永久状态,那么您将离开 Rx。
话虽如此,将 observable 转换为某种状态是一个常见问题,并且有一些包可以帮助您:ReactiveUI 可能是最知名的。ReactiveProperty 是另一个。这两个软件包都有缺陷,但可能会对您有所帮助。
如果您只是在寻找一种更简单的方法来获得支持字段,而无需对支持字段进行样板化,这将起作用:
public static class ReactivePropertyExtensions
{
public static ReactiveProperty<T> ToReactiveProperty<T>(this IObservable<T> source)
{
return new ReactiveProperty<T>(source);
}
public static ReactiveProperty<T> ToReactiveProperty<T>(this IObservable<T> source, T defaultValue)
{
return new ReactiveProperty<T>(source, defaultValue);
}
}
public class ReactiveProperty<T> : IDisposable
{
private IObservable<T> Source { get; }
private IDisposable Subscription { get; }
public T Value { get; private set; }
public ReactiveProperty(IObservable<T> source)
: this(source, default(T)) { }
public ReactiveProperty(IObservable<T> source, T defaultValue)
{
Value = defaultValue;
Source = source;
Subscription = source.Subscribe(t => Value = t);
}
public void Dispose()
{
Subscription.Dispose();
}
}
示例使用:
var ticker = Observable.Interval(TimeSpan.FromSeconds(1))
.Publish().RefCount();
var latestTickerValue = ticker.ToReactiveProperty();
Console.WriteLine(latestTickerValue.Value);
await Task.Delay(TimeSpan.FromSeconds(1));
Console.WriteLine(latestTickerValue.Value);
await Task.Delay(TimeSpan.FromSeconds(3));
Console.WriteLine(latestTickerValue.Value);
假设一个 hot observable。
为了observable = source.Replay(1); observable.Connect();
提供以下值:
public int Value =>
observable.Take(1).Amb(Observable.Return(defaultValue)).Wait();
如果没有推送任何值,这将返回一个默认值。
你想要从 Reactive 到 state 的转换,所以支持字段不是一个糟糕的选择。您提到您不想订阅,而是要观察任何东西:某事,某处必须订阅。
Value
本着 Asti解决方案的精神,这是定义属性的另一种方式。
private readonly IObservable<Item> _source;
private readonly IObservable<Item> _lastValue;
public SomeClass() // Constructor
{
_source = /* Initialize the source observable (hot) */
_lastValue = _source
.Catch(Observable.Never<Item>())
.Concat(Observable.Never<Item>())
.Publish(default)
.AutoConnect(0)
.FirstAsync();
}
public Item Value => _lastValue.Wait();
Publish
接受参数的运算符initialValue
...
返回一个可连接的可观察序列,该序列共享对基础序列的单个订阅,并以
initialValue
. 该运算符是Multicast
使用BehaviorSubject<T>
.
这BehaviorSubject<T>
是一个专门ISubject<T>
的...
表示随时间变化的值。观察者可以订阅主题以接收最后(或初始)值和所有后续通知。
Catch
添加and运算符是Concat
为了保留最后一个值,即使在源序列正常或异常完成的情况下也是如此。
就我个人而言,我会犹豫是否使用此解决方案,因为volatile
在运算符中更新的字段Do
会更自然地完成相同的事情。我发布它主要是为了演示 Rx 功能。