7

我有这个代码:

var s1 = new Subject<Unit>();
var s2 = new Subject<Unit>();
var ss = s1.Merge(s2).Finally(() => Console.WriteLine("Finished!"));

ss.Subscribe(_ => Console.WriteLine("Next"));

s1.OnNext(new Unit());
s2.OnNext(new Unit());
s1.OnCompleted(); // I wish ss finished here.
s2.OnCompleted(); // Yet it does so here. =(

我已经使用 OnError(new OperationCanceledException()) 解决了我的问题,但我想要一个更好的解决方案(必须有一个组合器对吗?)。

4

4 回答 4

8

或者这个,也很简洁:

public static class Ext
{
    public static IObservable<T> MergeWithCompleteOnEither<T>(this IObservable<T> source, IObservable<T> right)
    {
        return Observable.CreateWithDisposable<T>(obs =>
        {
            var compositeDisposable = new CompositeDisposable();
            var subject = new Subject<T>();

            compositeDisposable.Add(subject.Subscribe(obs));
            compositeDisposable.Add(source.Subscribe(subject));
            compositeDisposable.Add(right.Subscribe(subject));


            return compositeDisposable;

        });     
    }
}

这使用了一个主题,该主题将确保在 CreateWithDisposable() 中仅将一个 OnCompleted 推送给观察者;

于 2011-02-03T15:58:54.547 回答
8

我建议不要在任一流完成时重写 Merge 以完成,而是将 onCompleted 事件转换为 onNext 事件,并var ss = s1.Merge(s2).TakeUntil(s1ors2complete)在 s1 或 s2 结束时使用 where s1ors2complete 产生一个值。您也可以只链接.TakeUntil(s1completes).TakeUntil(s2completes)而不是创建 s1ors2complete。这种方法提供了比 MergeWithCompleteOnEither 扩展更好的组合,因为它可用于将任何“两个都完成时完成”运算符修改为“任何完成时完成”运算符。

至于如何将 onNext 事件转换为 onCompleted 事件,有几种方法可以做到这一点。CompositeDisposable 方法听起来是个不错的方法,稍作搜索发现这个关于在 onNext、onError 和 onCompleted 通知之间转换的有趣线程。我可能会创建一个名为 ReturnTrueOnCompleted 的扩展方法xs.SkipWhile(_ => true).concat(Observable.Return(True)),然后您的合并变为:

var s1ors2complete = s1.ReturnTrueOnCompleted().Amb(s2.ReturnTrueOnCompleted());
var ss = s1.Merge(s2).TakeUntil(s1ors2complete).Finally(() => Console.WriteLine("Finished!"));

您还可以查看使用像 Zip 这样的运算符,它会在输入流之一完成时自动完成。

于 2011-02-04T08:32:03.630 回答
2

假设您不需要任何一个流的输出,您可以Amb结合使用以下魔法Materialize

var s1 = new Subject<Unit>();
var s2 = new Subject<Unit>();

var ss = Observable.Amb(
        s1.Materialize().Where(x => x.Kind == NotificationKind.OnCompleted), 
        s2.Materialize().Where(x => x.Kind == NotificationKind.OnCompleted)
    )
    .Finally(() => Console.WriteLine("Finished!"));

ss.Subscribe(_ => Console.WriteLine("Next"));

s1.OnNext(new Unit());
s2.OnNext(new Unit());

s1.OnCompleted(); // ss will finish here and s2 will be unsubscribed from

如果您需要这些值,您可以Do在这两个主题上使用。

于 2011-02-03T16:33:17.447 回答
0

试试这个:

public static class Ext
{
    public static IObservable<T> MergeWithCompleteOnEither<T>(this IObservable<T> source, IObservable<T> right)
    {
        var completed = Observable.Throw<T>(new StreamCompletedException());

        return 
            source.Concat(completed)
            .Merge(right.Concat(completed))
            .Catch((StreamCompletedException ex) => Observable.Empty<T>());

    }

    private sealed class StreamCompletedException : Exception
    {
    }
}

这样做是连接一个 IObservable,它会在源或正确的源完成时抛出异常。然后我们可以使用 Catch 扩展方法返回一个空的 Observable 以在任一完成时自动完成流。

于 2011-02-03T15:47:44.157 回答