3

我想使用 angular-rx 作为结果的简单刷新按钮。如果用户单击刷新按钮,则重新加载结果。如果用户在 1 秒内单击刷新按钮 100 次,则仅加载最新结果。如果结果由于某种原因失败,这并不意味着刷新按钮应该停止工作。

为了实现最后一点,即使失败,我也想保留订阅(或重新订阅),但我不知道该怎么做?

这不起作用,但这是一个简单的示例,我尝试在错误时重新订阅:

var refreshObs = $scope.$createObservableFunction('refresh');

var doSubscribe = function () {
  refreshObs
  .select(function (x, idx, obs) {
      // get the results.
      // in here might throw an exception
  })
  .switch()
  .subscribe(
  function (x) { /*show the results*/ }, // on next
  function (err) { // on error
      doSubscribe(); // re-subscribe
  },
  function () { } // on complete
  );
};
doSubscribe();

我认为这很常见,应该有一些标准做法来实现这一点?

更新

使用建议的解决方案,这是我所做的测试:

// using angularjs and the rx.lite.js library
var testCount = 0;
var obsSubject = new rx.Subject(); // note. rx is injected but is really Rx
$scope.refreshButton = function () { // click runs this
  obsSubject.onNext();
};

obsSubject.map(function () {
  testCount++;
  if (testCount % 2 === 0) {
      throw new Error("something to catch");
  }
  return 1;
})
.catch(function (e) {
  return rx.Observable.return(1);
})
.subscribe(
    function (x) {
    // do something with results
});

这些是我的测试结果:

  1. 单击了刷新按钮
  2. obsSubject.onNext() 调用
  3. map 函数返回 1。
  4. subscribe onNext 被触发
  5. 单击了刷新按钮
  6. obsSubject.onNext() 调用
  7. 映射函数抛出错误
  8. 进入catch函数
  9. subscribe onNext 被触发
  10. 单击了刷新按钮
  11. obsSubject.onNext() 调用
  12. 没有什么。我需要继续订阅

我的理解是 catch 应该保留订阅,但我的测试表明它没有。为什么?

4

1 回答 1

5

根据您评论中给出的上下文,您希望:

  • 每个刷新按钮都会触发“获取结果”
  • 向用户显示的每个错误

你真的不需要重新订阅,这是一种反模式,因为 Rx 中的代码从不依赖于它,而且额外的递归调用只会让读者感到困惑。它也让我们想起了回调地狱。

在这种情况下,您应该:

  • 删除 doSubscribe() 调用,因为您不需要它们。使用该代码,您已经有了每次刷新单击都会触发新的“获取结果”的行为。
  • 替换select().switch().flatMap()(或.flatMapLatest())。当您执行 时select(),结果是一个元流(流的流),您正在使用switch()将元流扁平化为流。这就是 flatMap 所做的一切,但仅限于一次操作。你也可以.then()从 JS Promises 中理解 flatMap。
  • 包括.catch()将处理您的错误的运算符,就像在一个catch块中一样。发生错误后您无法获得更多结果的原因是 Observable 总是因错误或“完成”事件而终止。使用catch()操作符,我们可以用 Observable 上的正常事件替换错误,以便它可以继续。

要改进您的代码:

var refreshObs = $scope.$createObservableFunction('refresh');

refreshObs
  .flatMapLatest(function (x, idx, obs) {
    // get the results.
    // in here might throw an exception
    // should return an Observable of the results
  })
  .catch(function(e) {
      // do something with the error
      return Rx.Observable.empty(); // replace the error with nothing
  })
  .subscribe(function (x) { 
      // on next
  });

另请注意,我删除了 onError 和 onComplete 处理程序,因为它们内部没有任何事情可做。

也看看更多的运营商。例如retry(),可用于在每次发生错误时再次自动“获取结果”。请参阅https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/retry.md

retry()与 with 结合使用以do()处理错误 ( do),并允许订阅者自动重新订阅源 observable ( retry)。

refreshObs
  .flatMapLatest(function (x, idx, obs) {
    // get the results.
    // in here might throw an exception
    // should return an Observable of the results
  })
  .do(function(){}, // noop for onNext
  function(e) {
      // do something with the error
  })
  .retry()
  .subscribe(function (x) { 
      // on next
  });

在这里查看一个工作示例:http: //jsfiddle.net/staltz/9wd13gp9/9/

于 2014-09-04T11:48:30.683 回答