5

我有一个IObservable<Item>类内部,我想公开一个只读属性,该属性提供在给定时间推送到可观察对象的最后一项。所以它将提供一个单一的值Item

如果没有推送任何值,则它必须返回一个默认值。

我怎样才能做到这一点而不必订阅可观察对象并拥有“支持字段”?

4

3 回答 3

6

只是在这里补充@Asti的答案,也许可以帮助您消除挫败感:

可观察对象不是物理“事物”,它更像是一个逻辑概念。Rx 经常与 LINQ 进行比较,而且大多数时候这是一个公平的比较。但是,当您开始谈论数据结构时,它就会崩溃:LINQ 的可枚举与用于学习目的的列表非常相似。

但是,在 Rx 方面,根本没有与 List 等效的方法。Observable 是一个瞬态数据结构,所有算子都处理这个瞬态。如果您正在寻找永久状态,那么您将离开 Rx。

话虽如此,将 observable 转换为某种状态是一个常见问题,并且有一些包可以帮助您:ReactiveUI 可能是最知名的。ReactiveProperty 是另一个。这两个软件包都有缺陷,但可能会对您有所帮助。

如果您只是在寻找一种更简单的方法来获得支持字段,而无需对支持字段进行样板化,这将起作用:

public static class ReactivePropertyExtensions
{
    public static ReactiveProperty<T> ToReactiveProperty<T>(this IObservable<T> source)
    {
        return new ReactiveProperty<T>(source);
    }

    public static ReactiveProperty<T> ToReactiveProperty<T>(this IObservable<T> source, T defaultValue)
    {
        return new ReactiveProperty<T>(source, defaultValue);
    }
}

public class ReactiveProperty<T> : IDisposable
{
    private IObservable<T> Source { get; }
    private IDisposable Subscription { get; }
    public T Value { get; private set; }

    public ReactiveProperty(IObservable<T> source)
        : this(source, default(T)) { }

    public ReactiveProperty(IObservable<T> source, T defaultValue)
    {
        Value = defaultValue;
        Source = source;
        Subscription = source.Subscribe(t => Value = t);
    }

    public void Dispose()
    {
        Subscription.Dispose();
    }
}

示例使用:

var ticker = Observable.Interval(TimeSpan.FromSeconds(1))
    .Publish().RefCount();

var latestTickerValue = ticker.ToReactiveProperty();
Console.WriteLine(latestTickerValue.Value);
await Task.Delay(TimeSpan.FromSeconds(1));
Console.WriteLine(latestTickerValue.Value);
await Task.Delay(TimeSpan.FromSeconds(3));
Console.WriteLine(latestTickerValue.Value);
于 2017-02-07T14:46:42.177 回答
3

假设一个 hot observable。

为了observable = source.Replay(1); observable.Connect();

提供以下值:

public int Value => observable.Take(1).Amb(Observable.Return(defaultValue)).Wait();

如果没有推送任何值,这将返回一个默认值。

你想要从 Reactive 到 state 的转换,所以支持字段不是一个糟糕的选择。您提到您不想订阅,而是要观察任何东西:某事,某处必须订阅

于 2017-02-04T16:54:16.353 回答
1

Value本着 Asti解决方案的精神,这是定义属性的另一种方式。

private readonly IObservable<Item> _source;
private readonly IObservable<Item> _lastValue;

public SomeClass() // Constructor
{
    _source = /* Initialize the source observable (hot) */

    _lastValue = _source
        .Catch(Observable.Never<Item>())
        .Concat(Observable.Never<Item>())
        .Publish(default)
        .AutoConnect(0)
        .FirstAsync();
}

public Item Value => _lastValue.Wait();

Publish接受参数的运算符initialValue...

返回一个可连接的可观察序列,该序列共享对基础序列的单个订阅,并以initialValue. 该运算符是Multicast使用BehaviorSubject<T>.

BehaviorSubject<T>是一个专门ISubject<T>的...

表示随时间变化的值。观察者可以订阅主题以接收最后(或初始)值和所有后续通知。

Catch添加and运算符是Concat为了保留最后一个值,即使在源序列正常或异常完成的情况下也是如此。

就我个人而言,我会犹豫是否使用此解决方案,因为volatile在运算符中更新的字段Do会更自然地完成相同的事情。我发布它主要是为了演示 Rx 功能。

于 2020-12-04T14:41:56.627 回答