1

我正在寻找一种通过特定逻辑的项目的方法。最明显的答案可能是使用 .Select ,它适用于大多数情况,但我有一个特殊情况,这个问题实际上可以改写为在所有订阅者消费项目后如何调用某个方法?

我正在考虑一个看起来像这样的扩展PassThrough(this IObservable<TSource> obj, Action<TSource, IObserver<TResult>> selector),我会以下列方式使用它

.PassThrough((source, observer) => {
   if(source != null) {
      using(var result = new Result(source)) {
         observer.OnNext(result);
      }
   }
});

其中最重要的部分是在对象传递给 OnNext 之后调用 .Dispose 对结果对象,换句话说,在它被订阅者消费之后。我没有找到这样的扩展方法。有人可以举例说明如何使用现有的 Rx.NET API 来实现它,或者假设它是可能的,如何创建一个可以做到这一点的扩展?

4

1 回答 1

0

您正在寻找的可能是 Observable.Create 通过扩展方法。在您的情况下,它可能如下所示:

public static IObservable<Result> PassThrough(this IObservable<TSource> obj, T source) {
   return Observable.Create(observer => {
      if(source != null) {
         using(var result = new Result(source)) {
            observer.OnNext(result);
            observer.OnCompleted();
         }
      } else {
         observer.OnError(Some Error);
         ...
      }
   })
}

显然,取决于您是否要让流继续进行,您将省略 OnCompleted() 调用。

有关 Observable.Create 用法的更多信息,请参见此处:http: //introtorx.com/Content/v1.0.10621.0/04_CreatingObservableSequences.html#CreationOfObservables

于 2018-04-03T01:58:10.967 回答