1

我见过很多 GenServer 实现,我正在尝试创建一个具有此类规范的实现,但我不确定它的 GenServer 用例。

我有一个状态,例如

%{url: "abc.com/jpeg", name: "Camera1", id: :camera_one, frequency: 10}

我有这样的 100 个状态,具有不同的值,我的用例包含 5 个步骤。

  1. 将每个状态作为 Gen{ ? }。
  2. 向该 URL 发送 HTTP 请求。
  3. 得到结果。
  4. 发送另一个 HTTP 请求,其中的数据来自第一个请求。
  5. 让进程进入睡眠状态。如果频率为 10,则持续 10 秒,依此类推,10 秒后它将再次从 1 步开始。

现在,当我启动 100 个这样的 worker 时,将会有 100 * 2 个频繁的 HTTP 请求。我不确定我是否会使用 GenServer 或 GenStage 或 Flow 甚至是 Broadway?

我还担心 HTTP 请求不会崩溃,例如一个有状态的工作人员会发送一个请求,如果频率是在第一个请求返回之前 1 秒,另一个请求会被发送,GenServer 是否有能力足以处理这些案件?我认为这被称为背压?

我一直在询问和研究这个用例很长时间,我也被引导到 RabbitMQ 以及我的用例。

任何指导都会很有帮助,或者任何最小的例子都会非常感激。

? GenServer/GenStage/GenStateMachine

4

1 回答 1

0

您的问题归结为在给定时间减少并发网络请求的数量。

一个简单的方法是拥有一个跟踪传出请求计数的 GenServer。然后,对于每个客户(在您的情况下最多为 200 个),它可以检查是否有打开的请求,然后采取相应的行动。这是服务器的样子:

defmodule Throttler do
  use GenServer

  #server
  @impl true
  def init(max_concurrent: max_concurrent) do
    {:ok, %{count: 0, max_concurrent: max_concurrent}}
  end

  @impl true
  def handle_call(:run, _from, %{count: count, max_concurrent: max_concurrent} = state) when count < max_concurrent, do: {:reply, :ok, %{state | count: count + 1}}
  @impl true
  def handle_call(:run, _from, %{count: count, max_concurrent: max_concurrent} = state) when count >= max_concurrent, do: {:reply, {:error, :too_many_requests}, state}


  @impl true
  def handle_call(:finished, _from, %{count: count} = state) when count > 0, do: {:reply, :ok, %{state | count: count - 1}}
end

好的,现在我们有了一个可以调用的服务器,handle_call(pid, :run)它会告诉我们是否超出了计数。一旦任务(获取 URL)完成,我们需要调用handle_call(pid, :finished)让服务器知道我们已经完成了任务。

在客户端,我们可以将其包装在一个方便的辅助函数中。(请注意,这仍然在 Throttler 模块中,因此__MODULE__有效)

defmodule Throttler do
  #client
  def start_link(max_concurrent: max_concurrent) when max_concurrent > 0 do
    GenServer.start_link(__MODULE__, [max_concurrent: max_concurrent])
  end
  
  def execute_async(pid, func) do
    GenServer.call(pid, :run)
    |> case do
      :ok ->
        task = Task.async(fn -> 
          try do
            func.()
          after
            GenServer.call(pid, :finished)
          end
        end)
        {:ok, task}
      {:error, reason} -> {:error, reason, func}
    end
  end
end

这里我们传入一个我们希望在客户端异步执行的函数,并在执行之前在服务器端完成调用 :run 和 :finished 的工作。如果成功,我们将返回一个任务,否则我们将失败。

将它们放在一起,您将获得如下所示的代码:

{:ok, pid} = Throttler.start_link(max_concurrent: 3)
results = Enum.map(1..5, fn num -> 
  Throttler.execute(pid, fn ->
    IO.puts("Running command #{num}")
    :timer.sleep(:5000)
    IO.puts("Sleep complete for #{num}")
    num * 10
  end)
end)
valid_tasks = Enum.filter(results, &(match?({:ok, _func}, &1))) |> Enum.map(&elem(&1, 1))

现在你有一堆任务要么成功,要么失败,你可以采取适当的行动。

失败后怎么办?这是背压的有趣部分 :) 最简单的事情是超时并重试,假设您最终会清除下游的压力。否则,您可以完全失败请求并继续将问题推向上游。

于 2020-12-07T01:54:40.540 回答