我有一个帮助类,可以将文本消息保存到本地文件系统。此方法返回一个Task
对象,并且根据定义是异步的。
我希望能够观察何时调用此方法,以便我可以持续监控缓冲区的大小和长度并据此做出决定。
我正在尝试使用 Reactive Extension for .NET 来实现这一点。但是,我无法想出一种设计,让我可以连续收听添加到缓冲区的消息。以下是我当前的实现:
public IObservable<Unit> Receive(InternalMessage message)
{
var observable = FileBuffer.BufferMessage(message.MessageId.ToString(), message, DateTime.UtcNow).ToObservable(); //This returns a Task, which I convert into an Observable
return observable;
}
以下是我订阅 observable 的方式:
IObservable<Unit> receiverObservable = batchHandler.Receive(message);
receiverObservable.Subscribe(
x => Console.WriteLine("On next"),
ex => //TODO,
() => // Completed);
我希望每次调用该方法时Receive
都调用订阅者。但是,AFAIK,一旦调用了这个方法,observable 完成并且序列终止,所以未来的调用Receive
将不会被监听。
有人可以推荐一种使用 Rx.Net 库来实现我正在寻找的这种可观察模式的方法,即如何保持序列打开并为其提供异步方法的结果?