5

我正在尝试将现有的 API 转换为与 RxJS 一起使用......对节点来说相当新,对 RxJs 来说非常新,所以请多多包涵。

我有一个现有的 API(getNextMessage),当某些东西可用时,它要么阻塞(异步),要么通过节点样式(err,val)回调返回新项目或错误。

所以它看起来像:

getNextMessage(nodeStyleCompletionCallback);

您可以将 getNextMessage 想象成一个 http 请求,该请求在未来服务器响应时完成,但您确实需要在收到消息后再次调用 getNextMessage 以继续从服务器获取新项目。

所以,为了使它成为一个可观察的集合,我必须让 RxJs 继续调用我的 getNextMessage 函数,直到订阅者被释放();

基本上,我正在尝试创建自己的 RxJs 可观察集合。

问题是:

  1. 我不知道如何让subscriber.dispose() 杀死async.forever
  2. 我可能不应该首先使用 async.forever
  3. 我不确定我是否应该为每条消息“完成” - 那不应该在序列的末尾
  4. 我想最终消除使用 fromNodeCallback 的需要,以拥有一流的 RxJS 可观察
  5. 显然我有点困惑。

希望能得到一点帮助,谢谢!

这是我现有的代码:

var Rx = require('rx');
var port = require('../lib/port');
var async = require('async');

function observableReceive(portName)
{
    var observerCallback;
    var listenPort = new port(portName);
    var disposed = false;

    var asyncReceive = function(asyncCallback)
    {
        listenPort.getNextMessage(
            function(error, json)
            {
                observerCallback(error, json);

                if (!disposed)
                    setImmediate(asyncCallback);
            }
        );
    }

    return function(outerCallback)
    {
        observerCallback = outerCallback;
        async.forever(asyncReceive);
    }
}

var receive = Rx.Observable.fromNodeCallback(observableReceive('rxtest'));
var source = receive();

var subscription = source.forEach(
    function (json)
    {
        console.log('receive completed: ' + JSON.stringify(json));
    },
    function (error) {
        console.log("receive failed: " + error.toString());
    },
    function () {
        console.log('Completed');
        subscription.dispose();
    }
);
4

2 回答 2

14

所以这可能是我会做的。

var Rx = require('Rx');

// This is just for kicks. You have your own getNextMessage to use. ;)
var getNextMessage = (function(){

  var i = 1;

  return function (callback) {
    setTimeout(function () {
      if (i > 10) {
        callback("lawdy lawd it's ova' ten, ya'll.");
      } else {
        callback(undefined, i++);
      }
    }, 5);
  };

}());

// This just makes an observable version of getNextMessage.
var nextMessageAsObservable = Rx.Observable.create(function (o) {
  getNextMessage(function (err, val) {
    if (err) {
      o.onError(err);
    } else {
      o.onNext(val);
      o.onCompleted(); 
    }
  });
});

// This repeats the call to getNextMessage as many times (11) as you want.
// "take" will cancel the subscription after receiving 11 items.
nextMessageAsObservable
  .repeat()
  .take(11)
  .subscribe(
    function (x)   { console.log('next', x);    },
    function (err) { console.log('error', err); },
    function ()    { console.log('done');       }
  );
于 2014-02-10T07:01:07.087 回答
5

我意识到这已经有一年多了,但我认为更好的解决方案是使用递归调度:

Rx.Observable.forever = function(next, scheduler) {
  scheduler = scheduler || Rx.Scheduler.default,
  //Internally wrap the the callback into an observable
  next = Rx.Observable.fromNodeCallback(next);    

  return Rx.Observable.create(function(observer) {
    var disposable = new Rx.SingleAssignmentDisposable(),
        hasState = false;
    disposable.setDisposable(scheduler.scheduleRecursiveWithState(null, 
      function(state, self) { 
        hasState && observer.onNext(state);
        hasState = false;  
        next().subscribe(function(x){
          hasState = true;
          self(x);
        }, observer.onError.bind(observer));

      }));

    return disposable;

  });

};

这里的想法是,您可以在前一个项目完成后安排新项目。您调用next()which 调用传入的方法,当它返回一个值时,您安排下一项进行调用。

然后你可以像这样使用它:

Rx.Observable.forever(getNextMessage)
.take(11)
.subscribe(function(message) {
 console.log(message);
});

在此处查看一个工作示例

于 2015-05-24T07:08:37.973 回答