我有一个流产生数据的速度不如消耗的快。
所以我有一个这样定义的生产者:
def start_link() do
create_stream
|> GenStage.from_enumerable(name: Producer)
end
然后我的生产者消费者订阅它
def init(:ok) do
{:producer_consumer, :the_state_does_not_matter, subscribe_to: [Producer]}
end
我的消费者订阅了 mu producer-consumer
def init(:ok) do
{:consumer, :the_state_does_not_matter, subscribe_to: [ProducerConsumer]}
end
我遇到的问题是消费者挂起,我认为是因为在某些时候生产者没有设法获取新数据,并且如文档中所述:
当可枚举完成或停止时,该阶段将以 :normal 原因退出。这意味着,如果消费者订阅了可枚举阶段并且:取消选项设置为:永久,这是默认的,消费者也将退出:正常原因
所以我读了更多,它建议添加cancel:: transient
不完成阶段的选项。我像这样添加了它,但它不起作用,我错过了什么吗?
|> GenStage.from_enumerable(name: Producer, cancel: :transient)
最初我使用的是 aFlow.into_stages(flow, [ProducerConsumer])
但我不能这样做,因为我无法从我的主管树中引用(或者我不知道如何) ProducerConsumer
children = [
{Producer, []},
{ProducerConsumer, []},
{Consumer, []}
]
更新
从子定义更新对 Flow.into_stages 的传递引用
children = [
{Producer, [name: ProducerConsumer]},
{ProducerConsumer, []},
{Consumer, []}
]
def start_link(producer_consumer) do
create_stream
|> Flow.into_stages(producer_consumer)
end
**(混合)无法启动应用程序测试:Application.start(:正常,[])返回错误:关闭:无法启动子:生产者**(退出)退出:GenServer.call({:name,ProducerConsumer }, {:"$subscribe", nil, #PID<0.2031.0>, [cancel: :transient]}, 5000) ** (EXIT) 没有连接到 Elixir.ProducerConsumer