4

我对如何从 dask 中获得最佳效果感到困惑。

问题 我有一个包含多个时间序列的数据框(每个都有自己的时间序列key),我需要my_fun在每个时间序列上运行一个函数。用 pandas 解决它的一种方法涉及 df = list(df.groupby("key"))然后应用my_fun 多处理。尽管内存使用量很大,但性能在我的机器上相当不错,而在谷歌云计算上却很糟糕。

在 Dask 上,我当前的工作流程是:

import dask.dataframe as dd
from dask.multiprocessing import get
  1. 从 S3 读取数据。14 个文件 -> 14 个分区
  2. `df.groupby("key").apply(my_fun).to_frame.compute(get=get)

因为我没有设置索引df.known_divisionsFalse

结果图是 在此处输入图像描述 ,我不明白我看到的是否是瓶颈。

问题:

  1. 最好df.npartitions是倍数ncpu还是没关系?
  2. 由此看来,索引设置为键似乎更好。我的猜测是我可以做类似的事情

    df["key2"] = df["key"] df = df.set_index("key2")

但是,同样,我不知道这是否是最好的方法。

4

1 回答 1

5

对于 Dask 中的“什么需要时间”之类的问题,通常建议您使用“分布式”调度程序而不是多处理 - 您可以使用任意数量的进程/线程运行,但您可以通过诊断获得更多信息仪表板。

对于您的特定问题,如果您要对一个在分区之间分割得不好的列进行分组并应用除简单聚合之外的任何其他内容,那么您将不可避免地需要洗牌。设置索引会作为显式步骤为您执行此 shuffle,或者您会在任务图中获得明显的隐式 shuffle。这是一个多对多的操作,每个聚合任务都需要来自每个原始分区的输入,因此是瓶颈。没有办法解决这个问题。

至于分区数量,是的,您可以有次优条件,例如 8 个内核上的 9 个分区(您将计算 8 个任务,然后可能在一个内核上阻塞最后一个任务,而其他内核空闲);但总的来说,只要您不使用非常少量的分区,您就可以依靠 dask 做出合理的调度决策。在许多情况下,这并不重要。

于 2017-12-08T19:47:57.560 回答