2

我正在使用 dask.distributed 实现各种数据处理管道。通常,原始数据是从 S3 读取的,最终处理过的(大型)集合也会写入 S3 上的 CSV。

我可以异步运行处理并监视进度,但我注意到将集合存储到文件的所有 to_xxx() 方法似乎都是同步调用。它的一个缺点是调用可能会阻塞很长时间。其次,我不能轻易地构建一个完整的图表以供以后执行。

有没有办法异步运行例如 to_csv() 并获取未来对象而不是阻塞?

PS:我很确定我可以自己实现异步存储,例如通过将集合转换为延迟()并存储每个分区。但这似乎是一个常见的情况——除非我错过了已经存在的功能,否则在框架中包含这样的东西会很好。

4

1 回答 1

1

大多数to_*函数都有一个compute=True可以替换为的关键字参数compute=False。在这些情况下,它将返回一系列延迟值,然后您可以异步计算这些值

values = df.to_csv('s3://...', compute=False)
futures = client.compute(values)
于 2017-08-24T12:58:30.423 回答