1

我已经删除了样板以达到重点

// a.js

// My observables from stream and event
this.a = Rx.Node.fromStream(this.aStream());
this.itemSource = Rx.Observable.fromEvent(ee, 'addItem');

// Zip 'em
this.itemcombo = Rx.Observable.zip(this.a, this.itemSource, function (s1, s2) {
    return {item: s2, a: s1.toString()};
});

// Streams the lowercase alphabet
rb.prototype.aStream = function aStream() {
var rs = Readable();

var c = 97;
rs._read = function () {
    rs.push(String.fromCharCode(c++));
    console.log('Hit!');
    if (c > 'z'.charCodeAt(0)) {
        rs.push(null);
    }
};

return rs;
};

// b.js(需要上面导出的模块)

rb.enqueue('a'); // The method simply does an ee.emit('addItem', ...)  in the module to trigger the itemSource observable

我期望看到的:

{item: 'a', a: 'a'}打印在控制台中

发生了什么:

Hit!之前打印了 24 次{item: 'a', a: 'a'}。这意味着zip从 中获取所有值aStream,缓冲它们,然后做它应该做的事情。

我如何获得相同的功能zip提供但懒惰?我的目标是使用无限流/可观察的并用有限(异步)流压缩它。

编辑

通过 runnable 查看/编辑它:RX Zip test Edit 2 Code updated based on answer -> no output now。

4

1 回答 1

1

zip确实是懒惰。它只是订阅ab在任何一个产生新值时执行它的工作。

您的问题是,fromStream一旦zip订阅它,它就会同步发出它的所有值。发生这种情况是因为您的客户Readable一直在说“有更多可用数据!”

使您的Readable异步,您将获得所需的行为。

尝试这样的事情(未经测试)

var rs = Readable();
var subscription = null;
rs._read = function () {
    if (!subscription) {
        // produce the values once per second
        subscription = Rx.Observable
            .generateWithRelativeTime(
                97, // start value
                function (c) { return c > 'z'.charCodeAt(0); }, // end condition
                function (c) { return c + 1; }, // step function
                function (c) { return String.fromCharCode(c); }, // result selector
                function () { return 1000; }) // 1000ms between values
            .subscribe(
                function (s) {
                    rs.push(s);
                    console.log("Hit!");
                },
                function (error) { rs.push(null); },
                function () { rs.push(null); });
    }
};
于 2014-08-27T14:16:11.253 回答