9

我有一项以页面形式返回数据的服务。对一页的响应包含有关如何查询下一页的详细信息。

我的方法是返回响应数据,然后如果有更多可用页面,则立即将延迟调用连接到相同的可观察序列。

function getPageFromServer(index) {
  // return dummy data for testcase
  return {nextpage:index+1, data:[1,2,3]};
}

function getPagedItems(index) {
  return Observable.return(getPageFromServer(index))
    .flatMap(function(response) {
      if (response.nextpage !== null) {
        return Observable.fromArray(response.data)
          .concat(Observable.defer(function() {return getPagedItems(response.nextpage);}));
      }

      return Observable.fromArray(response.data);
    });
}

getPagedItems(0).subscribe(
  function(item) {
    console.log(new Date(), item);
  },
  function(error) {
    console.log(error);
  }
)

这一定是错误的方法,因为在 2 秒内你会得到:

      throw e;
            ^
RangeError: Maximum call stack size exceeded
    at CompositeDisposablePrototype.dispose (/Users/me/node_modules/rx/dist/rx.all.js:654:51)

什么是正确的分页方法?

4

4 回答 4

7

查看OP代码,确实是正确的方法。只需要让你的模拟服务异步来模拟真实情况。这是一个避免堆栈耗尽的解决方案(我getPageFromServer实际上还返回了一个冷可观察对象,而不是要求调用者包装它)。

请注意,如果您真的希望您的服务请求在实际应用程序中同步完成,因此您需要确保您的代码在发生这种情况时不会耗尽堆栈,只需getPagedItems()调用currentThread 调度程序。调度程序使用currentThread蹦床调度任务以防止递归调用(和堆栈耗尽)。请参阅末尾的注释掉的行getPagedItems

function getPageFromServer(index) {
    // return dummy data asynchronously for testcase
    // use timeout scheduler to force the result to be asynchronous like
    // it would be for a real service request
    return Rx.Observable.return({nextpage: index + 1, data: [1,2,3]}, Rx.Scheduler.timeout);

    // for real request, if you are using jQuery, just use rxjs-jquery and return:
    //return Rx.Observable.defer(function () { return $.ajaxAsObservable(...); });
}

function getPagedItems(index) {
    var result = getPageFromServer(index)
        .retry(3) // retry the call if it fails
        .flatMap(function (response) {
            var result = Rx.Observable.fromArray(response.data);
            if (response.nextpage !== null) {
                result = result.concat(getPagedItems(response.nextpage));
            }
            return result;
        });

    // If you think you will really satisfy requests synchronously, then instead
    // of using the Rx.Scheduler.timeout in getPageFromServer(), you can
    // use the currentThreadScheduler here to prevent the stack exhaustion...

    // return result.observeOn(Rx.Scheduler.currentThread) 
    return result;
}
于 2014-12-22T15:24:59.503 回答
4

编辑啊!我看到了你面临的问题。一些尾调用优化应该可以解决您的问题:

function mockGetPageAjaxCall(index) {
  // return dummy data for testcase
  return Promise.resolve({nextpage:index+1, data:[1,2,3]});
}

function getPageFromServer(index) {
  return Observable.create(function(obs) {
    mockGetPageAjaxCall(index).then(function(page) {
      obs.onNext(page);
    }).catch(function(err) {
      obs.onError(err)
    }).finally(function() {
      obs.onCompleted();
    });
  });
}

function getPagedItems(index) {
    return Observable.create(function(obs) {
        // create a delegate to do the work
        var disposable = new SerialDisposable();
        var recur = function(index) {
            disposable.setDisposable(getPageFromServer(index).retry().subscribe(function(page) {
                obs.onNext(page.items);
                if(page.nextpage === null) {
                  obs.onCompleted();   
                }

                // call the delegate recursively
                recur(page.nextpage);
            }));
        };

        // call the delegate to start it
        recur(0);

        return disposable;
    });
}

getPagedItems(0).subscribe(
  function(item) {
    console.log(new Date(), item);
  },
  function(error) {
    console.log(error);
  }
)
于 2014-12-16T21:44:11.503 回答
3

这是一个更简洁且恕我直言的干净答案,没有任何递归。它使用ogen (~46 loc) 将任何生成器转换为 observable。

它有一个自定义构建的 next 函数,该函数将在您的函数产生某些结果时发出数据。

nb:原创文章值得一读

function getPagedItems({offset=0, limit=4}) {
    paginatedQueryGenerator = function*(someParams offset, limit) {
        let hasMore = true
        while(hasMore) {
            const results = yield YOUR_PROMISE_BASED_REQUEST(someParams, limit, offset)
            hasMore = results && results.nextpage !== null 
            offset += limit
        }
    }

    return ogen(paginatedQueryGenerator)(someParams, offset, limit)
} 
于 2017-04-17T23:23:09.933 回答
0

另一种解决方案是使用 retryWhen

getAllData() {
    let page = 0;
    let result = [];

    const getOnePage = () => {
        return of(0).pipe(mergeMap(() => getPaginatedData(page++)));
    };

    return getOnePage()
        .pipe(
            map(items => {
                result = result.concat(items);
                if (templates.length === PAGE_SIZE) {
                    throw 'get next page';
                }
            }),
            retryWhen(e => e.pipe(
                takeWhile(er => er === 'get next page'))
            ),
            map(e => result)
        )
        .subscribe(items => {
            console.log('here is all data', items);
        });

}
于 2018-12-05T12:56:00.363 回答