1

每当流 B 触发时,我想停止流 A 以获得一个通知。两个流都将保持在线并且永远不会完成。

A: o--o--o--o--o--o--o--o--o  
B: --o-----o--------o-------  
R: o-----o-----o--o-----o--o  

或者

A: o--o--o--o--o--o--o--o--o  
B: -oo----oo-------oo-------  
R: o-----o-----o--o-----o--o  
4

2 回答 2

2

当 observable 很热(并且没有refCount)时,此解决方案将起作用:

streamA
    .takeUntil(streamB)
    .skip(1)
    .repeat()
    .merge(streamA.take(1))
    .subscribe(console.log);
  1. .takeUntil(streamB):在流产生值时使流A完成。B
  2. .skip(1): 使流A在开始时跳过一个值(或作为 的结果.repeat())。
  3. .repeat():使流A无限重复(重新连接)。
  4. .merge(streamA.take(1)): 偏移.skip(1)流开始处的效果。

使 A 流每 5 秒跳过一次的示例:

var streamA,
    streamB;

streamA = Rx.Observable
    .interval(1000)
    .map(function (x) {
        return 'A:' + x;
}).publish();

streamB = Rx.Observable
    .interval(5000);

streamA
    .takeUntil(streamB)
    .skip(1)
    .repeat()
    .merge(streamA.take(1))
    .subscribe(console.log);

streamA.connect();

也可以使用这个沙箱http://jsbin.com/gijorid/4/edit?js,consoleBACTION()在运行代码的时候在控制台日志中执行手动push一个值到(streamB有助于分析代码)。

于 2011-05-08T14:15:12.970 回答
2

这是我为类似问题SkipWhen所做的运算符的一个版本(不同之处在于,在原始版本中,多个“B”会跳过多个“A”):

public static IObservable<TSource> SkipWhen<TSource, TOther>(this IObservable<TSource> source, 
    IObservable<TOther> other)
{
    return Observable.Create<TSource>(observer =>
    {
        object lockObject = new object();
        bool shouldSkip = false;

        var otherSubscription = new MutableDisposable();
        var sourceSubscription = new MutableDisposable();

        otherSubscription.Disposable = other.Subscribe(
            x => { lock(lockObject) { shouldSkip = true; } });

        sourceSubscription.Disposable = source.Where(_ =>
        {
            lock(lockObject)
            {
                if (shouldSkip)
                {
                    shouldSkip = false;
                    return false;
                }
                else
                {
                    return true;
                }
            }
        }).Subscribe(observer);

        return new CompositeDisposable(
            sourceSubscription, otherSubscription);
    });
}

如果当前实现成为瓶颈,请考虑将锁实现更改为使用ReaderWriterLockSlim.

于 2011-05-08T21:15:28.270 回答