1

我正在尝试使用 ThreadPoolScheduler 动态添加和删除订阅者到 Observable BlockingCollection。

我不确定这是我的代码还是 RX 本身的问题,但我假设在这种情况下我应该能够根据需要订阅/取消订阅。

我已将问题简化为下面粘贴的测试。

代码正常工作,直到我对订阅者调用 Dispose 然后添加新订阅者。从本质上讲,我似乎让旧线程仍在对 Observable 进行排队,但从未对它们做任何事情。

这是单元测试,它设置 32 个订阅者,添加 64 个对象,然后取消订阅并重复相同的测试。我已经删除了任何额外的代码(并添加了一些睡眠只是为了测试以确保在我取消订阅之前线程已经完成)

前 64 个被正确处理,但第二组只有 32 个对象被传递给我的订阅者。

[TestClass]
public class RxTests
{
    public class ObservTest
    {

        public BlockingCollection<ObserverTests.UnitTestObservable> mBlockingCollection = new BlockingCollection<ObserverTests.UnitTestObservable>();
        public IObservable<ObserverTests.UnitTestObservable> mObservableBlockingCollection;

        private static readonly object ObservableLock = new object();
        private static volatile ObservTest ObservableInstance;

        public static ObservTest Instance
        {
            get
            {
                if (ObservableInstance != null)
                    return ObservableInstance;
                lock (ObservableLock)
                {
                    if (ObservableInstance == null)
                    {
                        ObservTest observable = new ObservTest();
                        observable.mObservableBlockingCollection = observable.mBlockingCollection.GetConsumingEnumerable().ToObservable(ThreadPoolScheduler.Instance);
                        ObservableInstance = observable;
                    }
                    return ObservableInstance;
                }
            }
        }

        private int count = 0;
        public void Release()
        {
            Interlocked.Increment(ref count);
            Console.WriteLine("Release {0} : {1}", count, Thread.CurrentThread.ManagedThreadId);
        }

        public void LogCount()
        {
            Console.WriteLine("Total :{0}", count);
        }

    }

    [TestMethod]
    public void TestMethod1()
    {

        IList<IDisposable> subscribers = new List<IDisposable>();

        for (int count = 0; count < 32; count++)
        {
            IDisposable disposable = ObservTest.Instance.mObservableBlockingCollection.Subscribe(Observe);
            subscribers.Add(disposable);
        }

        for (int count = 0; count < 64; count++)
        {
            ObserverTests.UnitTestObservable observable = new ObserverTests.UnitTestObservable
            {
                Name = string.Format("{0}", count)
            };
            ObservTest.Instance.mBlockingCollection.Add(observable);
        }

        Thread.Sleep(5000);
        foreach (IDisposable disposable in subscribers)
        {
            disposable.Dispose();
        }
        subscribers.Clear();

        for (int count = 0; count < 32; count++)
        {
            IDisposable disposable = ObservTest.Instance.mObservableBlockingCollection.Subscribe(Observe);
            subscribers.Add(disposable);
        }

        for (int count = 0; count < 64; count++)
        {
            ObserverTests.UnitTestObservable observable = new ObserverTests.UnitTestObservable
            {
                Name = string.Format("{0}", count)
            };
            ObservTest.Instance.mBlockingCollection.Add(observable);
        }
        Thread.Sleep(3000);
        ObservTest.Instance.LogCount();

    }

    public static void Observe(ObserverTests.UnitTestObservable observable)
    {
        Console.WriteLine("Observe {0} : {1}", observable.Name, Thread.CurrentThread.ManagedThreadId);
        ObservTest.Instance.Release();
    }
}

所以输出中的最终计数是 96,而我期望它是 128

如果我将初始订阅者的数量从 32 减少,则处理的数量会增加。例如,如果我在第一个循环中将计数从 32 减少到 16,我会得到 112。如果我将其减少到 8,我会得到 120

我的目标是建立一个系统,随着正在执行的任务数量的增加,可用于处理它们的订阅者数量也会增加。

4

0 回答 0