您的问题归结为在给定时间减少并发网络请求的数量。
一个简单的方法是拥有一个跟踪传出请求计数的 GenServer。然后,对于每个客户(在您的情况下最多为 200 个),它可以检查是否有打开的请求,然后采取相应的行动。这是服务器的样子:
defmodule Throttler do
use GenServer
#server
@impl true
def init(max_concurrent: max_concurrent) do
{:ok, %{count: 0, max_concurrent: max_concurrent}}
end
@impl true
def handle_call(:run, _from, %{count: count, max_concurrent: max_concurrent} = state) when count < max_concurrent, do: {:reply, :ok, %{state | count: count + 1}}
@impl true
def handle_call(:run, _from, %{count: count, max_concurrent: max_concurrent} = state) when count >= max_concurrent, do: {:reply, {:error, :too_many_requests}, state}
@impl true
def handle_call(:finished, _from, %{count: count} = state) when count > 0, do: {:reply, :ok, %{state | count: count - 1}}
end
好的,现在我们有了一个可以调用的服务器,handle_call(pid, :run)
它会告诉我们是否超出了计数。一旦任务(获取 URL)完成,我们需要调用handle_call(pid, :finished)
让服务器知道我们已经完成了任务。
在客户端,我们可以将其包装在一个方便的辅助函数中。(请注意,这仍然在 Throttler 模块中,因此__MODULE__
有效)
defmodule Throttler do
#client
def start_link(max_concurrent: max_concurrent) when max_concurrent > 0 do
GenServer.start_link(__MODULE__, [max_concurrent: max_concurrent])
end
def execute_async(pid, func) do
GenServer.call(pid, :run)
|> case do
:ok ->
task = Task.async(fn ->
try do
func.()
after
GenServer.call(pid, :finished)
end
end)
{:ok, task}
{:error, reason} -> {:error, reason, func}
end
end
end
这里我们传入一个我们希望在客户端异步执行的函数,并在执行之前在服务器端完成调用 :run 和 :finished 的工作。如果成功,我们将返回一个任务,否则我们将失败。
将它们放在一起,您将获得如下所示的代码:
{:ok, pid} = Throttler.start_link(max_concurrent: 3)
results = Enum.map(1..5, fn num ->
Throttler.execute(pid, fn ->
IO.puts("Running command #{num}")
:timer.sleep(:5000)
IO.puts("Sleep complete for #{num}")
num * 10
end)
end)
valid_tasks = Enum.filter(results, &(match?({:ok, _func}, &1))) |> Enum.map(&elem(&1, 1))
现在你有一堆任务要么成功,要么失败,你可以采取适当的行动。
失败后怎么办?这是背压的有趣部分 :) 最简单的事情是超时并重试,假设您最终会清除下游的压力。否则,您可以完全失败请求并继续将问题推向上游。