2

我在 S3 中有几千个 CSV 文件,我想加载它们,将它们连接到一个 pandas 数据帧中,并与集群上的所有 dask 工作人员共享整个数据帧。所有文件的大小大致相同 (~1MB)。我每台机器使用 8 个进程(每个核心一个),每个进程使用一个线程。整个数据框很适合每个工作进程的内存。实现这一目标的最有效和可扩展的方法是什么?

我使用 MPI4py 实现了这个工作流程,如下所示:在一个工作进程中使用线程池将所有文件读入 pandas 数据帧,将数据帧连接在一起,并使用 MPI4py 的广播功能将完整的数据帧发送到所有其他工作进程。

我已经想到了五种方法来实现这一目标:

  1. 每个工作人员使用 pandas.read_csv 读取所有文件,然后使用 pandas.concat 将它们连接在一起。
  2. 使用 dask.dataframe.from_delayed 将所有文件读入分布式 dask 数据帧,使用 dask.distributed.worker_client 在每个工作进程上获取客户端,然后在每个工作进程上使用 dask.dataframe.compute 获取熊猫数据帧。
  3. 像解决方案 2 一样加载分布式数据帧,使用 dask.distributed.Client.replicate 将所有分区分配给所有工作人员,使用 dask.distributed.worker_client 在每个工作进程上获取客户端,并使用 dask.dataframe。计算以获取每个工作进程中的 pandas 数据帧。
  4. 像解决方案 2 一样加载分布式数据帧,使用 dask.dataframe.compute 将数据帧带入本地进程,从集群中删除分布式数据帧(通过取消期货),并使用 dask.distributed.Client.scatter(broadcast =True, direct=True) 将本地 pandas 数据帧发送给所有工作人员。
  5. 加载分布式数据帧并将其收集到本地进程,如解决方案 4,使用 dask.distributed.Client.scatter(broadcast=False) 将其发送到工作进程,使用 dask.distributed.Client.replicate 将其发送到所有其他工人。

解决方案 2-5 与 MPI4py 版本相比具有巨大优势,因为它们利用了 dask 并行加载数据帧的能力。然而,当需要在集群中分发数据时,这些解决方案中没有一个能够接近 MPI4py 广播功能的性能。此外,我无法预测他们的内存使用情况,并且我看到许多来自工作人员的消息抱怨事件循环在几秒钟内没有响应。

在这个阶段,我倾向于使用第一种解决方案:即使数据加载效率低下,它也不会那么慢,而且根据我的经验,它是最强大的。如果我走这条路,我肯定会在桌面上留下很多性能潜力。有什么方法可以改进其中一种 dask 解决方案吗?还是有其他我没有考虑过的解决方案?

4

0 回答 0