0

我有一个帮助类,可以将文本消息保存到本地文件系统。此方法返回一个Task对象,并且根据定义是异步的。

我希望能够观察何时调用此方法,以便我可以持续监控缓冲区的大小和长度并据此做出决定。

我正在尝试使用 Reactive Extension for .NET 来实现这一点。但是,我无法想出一种设计,让我可以连续收听添加到缓冲区的消息。以下是我当前的实现:

public IObservable<Unit> Receive(InternalMessage message)
        {
            var observable = FileBuffer.BufferMessage(message.MessageId.ToString(), message, DateTime.UtcNow).ToObservable(); //This returns a Task, which I convert into an Observable
            return observable;
        }

以下是我订阅 observable 的方式:

IObservable<Unit> receiverObservable = batchHandler.Receive(message);
            receiverObservable.Subscribe(
                x => Console.WriteLine("On next"),
                ex => //TODO,
                () => // Completed);

我希望每次调用该方法时Receive都调用订阅者。但是,AFAIK,一旦调用了这个方法,observable 完成并且序列终止,所以未来的调用Receive将不会被监听。

有人可以推荐一种使用 Rx.Net 库来实现我正在寻找的这种可观察模式的方法,即如何保持序列打开并为其提供异步方法的结果?

4

1 回答 1

1

Receive正如您编写的代码一样,返回IObservable<Unit>,表示单个任务的完成。您想订阅返回IObservable<IObservable<Unit>>代表任务完成流的内容。

有很多方法可以做到这一点,其中最好的方法可能取决于你的班级是如何设置的以及你如何称呼它。

这是最懒的一个:

subject您声明一个代表调用流的类级变量:

Subject<IObservable<Unit>> subject = new Subject<IObservable<Unit>>();
subject.Merge().Subscribe(
    x => Console.WriteLine("On next"),
    ex => { },  //TODO
    () => { }   // Completed
);

然后,当您有新呼叫时,只需将其添加到主题中即可。

IObservable<Unit> receiverObservable = batchHandler.Receive(message);
subject.OnNext(receiverObservable);

这真的很懒惰的原因是 Rx 的核心是函数式的,它倾向于看不起可变状态变量。Subjects基本上都是可变状态。

更好的方法是弄清楚你何时/为什么打电话Receive,并将其构建为可观察的。完成后,您可以解决此问题:

IObservable<Unit> sourceReasonsToCallReceive; // Most likely sourced from event

sourceReasonsToCallReceive.SelectMany(_ => batchHandler.Receive(message))
    .SubScribe(
    x => Console.WriteLine("On next"),
    ex => { },  //TODO
    () => { }   // Completed
);

希望有帮助。

于 2017-03-20T17:20:01.460 回答