0

这是一项学术练习,我是 Reactive Extensions 的新手,并试图了解这项技术。我为自己设定了一个目标,即制作一个返回 Pi 的连续数字的 IObservable(由于不相关的原因,我现在恰好对 Pi 很感兴趣)。Reactive Extensions 包含用于制作 observables 的运算符,它们给出的指导是您应该“几乎永远不需要创建自己的 IObsevable”。但是我看不到如何使用现成的运算符和方法来做到这一点。让我再解释一下。

我计划使用一种算法,该算法将涉及扩展 Arctan 的泰勒级数。为了获得下一位 Pi,我将在该系列中扩展更多术语。

所以我需要异步进行系列扩展,偶尔将下一个计算数字扔给 IObserver。我显然不想为每个新数字从头开始重新计算。

有没有办法使用 RX 的内置运算符来实现这种行为,还是我必须从头开始编写一个 IObservable?什么策略表明自己?

4

2 回答 2

2

对于这样的事情,最简单的方法是使用Subject。Subject 既是 IObservable 又是 IObserver,这听起来有点奇怪,但它允许您像这样使用它们:

class PiCalculator
{
    private readonly Subject<int> resultStream = new Subject<int>();

    public IObservable<int> ResultStream
    {
        get { return resultStream; }
    }

    public void Start()
    {
        // Whatever the algorithm actually is
        for (int i = 0; i < 1000; i++)
        {
            resultStream.OnNext(i);
        }
    }
}

所以在你的算法中,OnNext只要你想产生下一个值,你就可以调用这个主题。

然后使用它,你只需要这样的东西:

var piCalculator = new PiCalculator();
piCalculator.ResultStream.Subscribe(n => Console.WriteLine((n)));
piCalculator.Start();
于 2012-06-21T19:29:13.020 回答
1

最简单的方法是创建一个Enumerable然后转换它:

IEnumerable<int> Pi()
{
    // algorithm here
    for (int i = 0; i < 1000; i++)
    {
        yield return i;
    }
}

用法(对于冷可观察,即每个新的“订阅”都从头开始创建 Pi):

var cold = Pi().ToObservable(Scheduler.ThreadPool);
cold.Take(5).Subscribe(Console.WriteLine);

如果你想做到hot(每个人都共享相同的基础计算),你可以这样做:

var hot = cold.Publish().RefCount();

它将在第一个订阅者之后开始计算,并在他们全部断开连接时停止。这是一个简单的测试:

hot.Subscribe(p => Console.WriteLine("hot1: " + p));
Thread.Sleep(5);    
hot.Subscribe(p => Console.WriteLine("hot2: " + p));    

这应该hot1只显示打印一段时间,然后hot2在短暂等待后加入,但打印相同的数字。如果使用 完成此操作cold,则两个订阅都将从 0 开始。

于 2012-06-21T22:17:10.590 回答