6

我正在尝试查看如何使用 Rx 将多个可观察事件流式传输到一组事件中。但是当我运行以下代码时出现异常。那么这是否意味着多个观察者总是因为违反 Rx 语法而容易出现异常呢?我的意思是,如果多个观察者中的两个偶然同时生成一个事件(任何两个可观察者都有一定的概率同时生成),它应该给出一个例外。

DateTimeOffset start;
        object sync = new object();
        var subject = new Subject<long>();
        var observer = Observer.Create<long>(c =>
        {
            lock (sync)
            {
                Console.WriteLine(c);
            }
        })
            ;

        var observable1 = Observable.Interval(TimeSpan.FromSeconds(2));
        var observable2 = Observable.Interval(TimeSpan.FromSeconds(5));
        var observable3 = Observable.Never<long>().Timeout
            (start = DateTimeOffset.Now.AddSeconds(15),
             (new long[] { 1 }).ToObservable());
        var observable4 = Observable.Never<long>().Timeout(start);
        observable1.Subscribe(observer);
        observable2.Subscribe(observer);
        observable3.Subscribe(observer);
        observable4.Subscribe(observer);
        Thread.Sleep(20000);

感谢吉迪恩的解释。这是我得到的例外。你是对的,这是一个超时异常。这是一个编码错误。谢谢。

System.TimeoutException: The operation has timed out.
   at System.Reactive.Observer.<Create>b__8[T](Exception e)
   at System.Reactive.AnonymousObserver`1.Error(Exception exception)
   at System.Reactive.AbstractObserver`1.OnError(Exception error)
   at System.Reactive.Subjects.Subject`1.OnError(Exception error)
   at System.Reactive.AnonymousObservable`1.AutoDetachObserver.Error(Exception e
xception)
   at System.Reactive.AbstractObserver`1.OnError(Exception error)
   at System.Reactive.AnonymousObservable`1.AutoDetachObserver.Error(Exception e
xception)
   at System.Reactive.AbstractObserver`1.OnError(Exception error)
   at System.Reactive.Linq.Observable.<>c__DisplayClass28c`1.<>c__DisplayClass28
e.<Throw>b__28b()
   at System.Reactive.Concurrency.Scheduler.Invoke(IScheduler scheduler, Action
action)
   at System.Reactive.Concurrency.ImmediateScheduler.Schedule[TState](TState sta
te, Func`3 action)
   at System.Reactive.Concurrency.Scheduler.Schedule(IScheduler scheduler, Actio
n action)
   at System.Reactive.Linq.Observable.<>c__DisplayClass28c`1.<Throw>b__28a(IObse
rver`1 observer)
   at System.Reactive.AnonymousObservable`1.<>c__DisplayClass1.<Subscribe>b__0()

   at System.Reactive.Concurrency.Scheduler.Invoke(IScheduler scheduler, Action
action)
   at System.Reactive.Concurrency.ScheduledItem`2.InvokeCore()
   at System.Reactive.Concurrency.ScheduledItem`1.Invoke()
   at System.Reactive.Concurrency.CurrentThreadScheduler.Trampoline.Run()
   at System.Reactive.Concurrency.CurrentThreadScheduler.Schedule[TState](TState
 state, TimeSpan dueTime, Func`3 action)
   at System.Reactive.Concurrency.CurrentThreadScheduler.Schedule[TState](TState
 state, Func`3 action)
   at System.Reactive.Concurrency.Scheduler.Schedule(IScheduler scheduler, Actio
n action)
   at System.Reactive.AnonymousObservable`1.Subscribe(IObserver`1 observer)
   at System.Reactive.Linq.Observable.<>c__DisplayClass543`1.<>c__DisplayClass54
5.<Timeout>b__53f()
   at System.Reactive.Concurrency.Scheduler.Invoke(IScheduler scheduler, Action
action)
   at System.Reactive.Concurrency.ThreadPoolScheduler.<>c__DisplayClass8`1.<Sche
dule>b__6(Object _)
   at System.Threading._TimerCallback.TimerCallback_Context(Object state)
   at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, C
ontextCallback callback, Object state, Boolean ignoreSyncCtx)
   at System.Threading._TimerCallback.PerformTimerCallback(Object state)
4

2 回答 2

6

是的,观察者可以监听多个可观察对象。最好的例子就是Merge运营商。内置运算符都将遵循 RX 语法,并且通常会在不这样做的源上强制执行它。

IObserver你得到的就是Observer.Create这样一种情况。一旦 OnError 或 OnCompleted 被调用,它将忽略任何未来对 OnNext 的调用。这确实意味着使用同一个观察者订阅一个可观察对象,然后在第一个观察对象之后订阅另一个可观察对象将不起作用,因为来自第一个可观察对象的终止消息将导致观察者忽略来自第二个可观察对象的消息。为了解决这个问题,像Merge,ConcatOnErrorResumeNext(以及其他)这样的运算符在内部使用多个观察者,并且除了最后一个 observable 之外,不会将完成消息(OnError 和/或 OnCompleted 取决于运算符的语义)传递给外部观察者。

你没有提到你得到了什么异常,但我猜这是你从 Timeout 得到的错误observable4。如果您没有提供另一个用于超时的可观察对象,则调用观察者的 ,并且不接受错误处理程序的和重载OnError的默认设置是简单地抛出异常。OnErrorSubscribeObserver.Create

虽然这显然是示例/测试代码,但我确实想指出,即使您不再收到传递给您的消息OnNext,所有其他 observable 在此异常之后仍会继续运行。要么使用Merge为您跟踪,要么跟踪描述中的所有一次性用品,并在完成消息出现时自行处理它们。 CompositeDisposable(in System.Reactive.Disposables) 对此有好处。

于 2011-12-13T20:59:26.120 回答
2

你真的不应该在这里使用锁,但如果你真的想让它工作,你可以这样做:

var x = Observable.Create<T>(subj => { /* Fill it in*/ })
    .Multicast(new Subject<T>());

// Set up your subscriptions Here!

// When you call the Connect, whatever is in the Observable.Create will be called
x.Connect();

如果您想更加安全,您可以通过使用 ReplaySubject 而不是 Subject 来将 Create 的结果“重播”给您以供将来订阅(而使用 Subject, Connect之后的订阅者将获得没有)

于 2011-12-17T01:31:45.843 回答