我们需要实现以下内容。给定一个将提供已知数量消息的 Redis 通道:
对于从通道消费的每条消息:
- 从 Redis 获取 JSON 文档
- 解析 JSON 文档,提取结果对象列表
聚合所有结果对象以生成单个结果
我们希望将第 1 步和第 2 步分配给许多工作人员,并避免将所有结果收集到内存中。我们还想显示两个步骤的进度条。
但是,我们看不到构建应用程序的好方法,以便我们可以看到进度并保持工作在系统中移动,而不会因为不合时宜的时间而阻塞。
例如,在第 1 步中,如果我们从 Redis 通道读取到队列中,那么我们可以将队列传递给 Dask,在这种情况下,我们开始处理每条消息,而无需等待所有消息。但是,如果我们使用队列,我们就看不到显示进度的方法(大概是因为队列通常具有未知大小?)
如果我们从 Redis 通道收集到一个列表并将其传递给 Dask,那么我们可以看到进度,但是我们必须等待来自 Redis 的所有消息,然后才能开始处理第一个消息。
有没有推荐的方法来解决这类问题?