2

我正在向 API 发出请求,但他们的服务器只允许一定数量的活动连接,所以我想限制正在进行的提取次数。出于我的目的,只有在 HTTP 响应正文到达客户端时才完成(而不是正在进行)提取。

我想创建一个这样的抽象:

const fetchLimiter = new FetchLimiter(maxConnections);
fetchLimiter.fetch(url, options); // Returns the same thing as fetch()

这会使事情变得更简单,但似乎无法知道其他代码使用的流何时结束,因为流在​​被读取时被锁定。可以使用ReadableStream.tee()将流分成两部分,使用一个并将另一个返回给调用者(可能还Response用它构造一个),但这会降低性能,对吗?

4

1 回答 1

2

由于fetch使用了 Promise,您可以利用它来制作一个简单的队列系统。

这是我以前用于排队基于承诺的东西的一种方法。它通过创建 aPromise然后将其解析器添加到数组来将项目排入队列。当然,在该 Promise 解决之前,它await会阻止任何以后的 Promise 被调用。

当一个完成时,我们所要做的就是抓住下一个解析器并调用它。承诺解决,然后fetch开始!

最好的部分,因为我们实际上并没有消耗fetch结果,所以不必担心必须clone或任何事情......我们只是将其完整传递,以便您稍后可以使用它then

*编辑:由于在 fetch 承诺解决后正文仍在流式传输,我添加了第三个选项,以便您可以传入正文类型,并让 FetchLimiter 为您检索和解析正文。

这些都返回一个最终用实际内容解决的承诺。

这样你就可以让 FetchLimiter 为你解析正文。我这样做是为了让它返回一个数组[response, data],这样你仍然可以检查响应代码、标题等内容。

await就此而言,如果您需要执行更复杂的操作,例如根据响应代码解析主体的不同方法,您甚至可以传入回调或其他内容。

例子

我添加了注释以指示FetchLimiter代码的开始和结束位置……其余的只是演示代码。

它使用了一个使用 setTimeout 的假fetch,它将在 0.5-1.5 秒之间解析。它将立即启动前三个请求,然后活动将被填满,并等待一个解决。

发生这种情况时,您将看到该承诺已解决的注释,然后队列中的下一个承诺将开始,然后您将看到循环then中的 from解决。for我添加了这一点then,以便您可以看到事件的顺序。

(function() {
  const fetch = (resource, init) => new Promise((resolve, reject) => {
    console.log('starting ' + resource);
    setTimeout(() => {
      console.log(' - resolving ' + resource);
      resolve(resource);
    }, 500 + 1000 * Math.random());
  });

  function FetchLimiter() {
    this.queue = [];
    this.active = 0;
    this.maxActive = 3;
    this.fetchFn = fetch;
  }
  FetchLimiter.prototype.fetch = async function(resource, init, respType) {
    // if at max active, enqueue the next request by adding a promise
    // ahead of it, and putting the resolver in the "queue" array.
    if (this.active >= this.maxActive) {
      await new Promise(resolve => {
        this.queue.push(resolve); // push, adds to end of array
      });
    }
    this.active++; // increment active once we're about to start the fetch
    const resp = await this.fetchFn(resource, init);
    let data;
    if (['arrayBuffer', 'blob', 'json', 'text', 'formData'].indexOf(respType) >= 0)
      data = await resp[respType]();
    this.active--; // decrement active once fetch is done
    this.checkQueue(); // time to start the next fetch from queue
    return [resp, data]; // return value from fetch
  };

  FetchLimiter.prototype.checkQueue = function() {
    if (this.active < this.maxActive && this.queue.length) {
      // shfit, pulls from start of array. This gives first in, first out.
      const next = this.queue.shift();
      next('resolved'); // resolve promise, value doesn't matter
    }
  }

  const limiter = new FetchLimiter();
  for (let i = 0; i < 9; i++) {
    limiter.fetch('/mypage/' + i)
      .then(x => console.log(' - .then ' + x));
  }
})();

注意事项:

  • 我不能 100% 确定当承诺解决时,身体是否仍在流动……这似乎是你的担忧。但是,如果这是一个问题,您可以使用其中一种 Body mixin 方法,如blobortextjson,在完全解析正文内容之前无法解决(请参阅此处

  • 作为一个非常简单的概念证明,我有意保持它非常简短(如 15 行实际代码)。您希望在生产代码中添加一些错误处理,以便如果fetch由于连接错误或其他原因而拒绝,您仍然会减少活动计数器并开始下一个fetch.

  • 当然它也使用async/await语法,因为它更容易阅读。如果您需要支持较旧的浏览器,您可能希望使用 Promises 重写或使用 babel 或等效工具进行转换。

于 2020-05-27T20:32:03.273 回答