0

我有一个流产生数据的速度不如消耗的快。

所以我有一个这样定义的生产者:

def start_link() do
  create_stream
  |> GenStage.from_enumerable(name: Producer)
end

然后我的生产者消费者订阅它

  def init(:ok) do
    {:producer_consumer, :the_state_does_not_matter, subscribe_to: [Producer]}
  end

我的消费者订阅了 mu producer-consumer

  def init(:ok) do
    {:consumer, :the_state_does_not_matter, subscribe_to: [ProducerConsumer]}
  end

我遇到的问题是消费者挂起,我认为是因为在某些时候生产者没有设法获取新数据,并且如文档中所述:

当可枚举完成或停止时,该阶段将以 :normal 原因退出。这意味着,如果消费者订阅了可枚举阶段并且:取消选项设置为:永久,这是默认的,消费者也将退出:正常原因

所以我读了更多,它建议添加cancel:: transient不完成阶段的选项。我像这样添加了它,但它不起作用,我错过了什么吗?

|> GenStage.from_enumerable(name: Producer, cancel: :transient)

最初我使用的是 aFlow.into_stages(flow, [ProducerConsumer])但我不能这样做,因为我无法从我的主管树中引用(或者我不知道如何) ProducerConsumer

children = [
  {Producer, []},
  {ProducerConsumer, []},
  {Consumer, []}
]

更新

从子定义更新对 Flow.into_stages 的传递引用

children = [
  {Producer, [name: ProducerConsumer]},
  {ProducerConsumer, []},
  {Consumer, []}
]

def start_link(producer_consumer) do
  create_stream
  |> Flow.into_stages(producer_consumer)
end

**(混合)无法启动应用程序测试:Application.start(:正常,[])返回错误:关闭:无法启动子:生产者**(退出)退出:GenServer.call({:name,ProducerConsumer }, {:"$subscribe", nil, #PID<0.2031.0>, [cancel: :transient]}, 5000) ** (EXIT) 没有连接到 Elixir.ProducerConsumer

4

1 回答 1

0

错误:

**(混合)无法启动应用程序测试:Application.start(:正常,[])返回错误:关闭:无法启动子:生产者**(退出)退出:GenServer.call({:name,ProducerConsumer }, {:"$subscribe", nil, #PID<0.2031.0>, [cancel: :transient]}, 5000) ** (EXIT) 没有连接到 Elixir.ProducerConsumer

只是意味着当 Flow.into_stages 尝试同步到提供的消费者时,消费者必须已经在运行。

因此,监督时顺序很重要,如下所示:

children = [
  Consumer,
  FlowProducerWorker # worker which implements Flow.into_stages(flow, [Consumer])
]
于 2018-06-29T09:06:06.477 回答