问题标签 [dask-distributed]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
dask - 是什么导致 CancelledError 异常的 dask 作业失败
我已经看到下面的错误消息很长一段时间了,但无法弄清楚导致失败的原因。
错误:
在重新启动 dask 集群时,它会成功运行。
python - 如何在所有 dask 工作人员上加载数据框
我在 S3 中有几千个 CSV 文件,我想加载它们,将它们连接到一个 pandas 数据帧中,并与集群上的所有 dask 工作人员共享整个数据帧。所有文件的大小大致相同 (~1MB)。我每台机器使用 8 个进程(每个核心一个),每个进程使用一个线程。整个数据框很适合每个工作进程的内存。实现这一目标的最有效和可扩展的方法是什么?
我使用 MPI4py 实现了这个工作流程,如下所示:在一个工作进程中使用线程池将所有文件读入 pandas 数据帧,将数据帧连接在一起,并使用 MPI4py 的广播功能将完整的数据帧发送到所有其他工作进程。
我已经想到了五种方法来实现这一目标:
- 每个工作人员使用 pandas.read_csv 读取所有文件,然后使用 pandas.concat 将它们连接在一起。
- 使用 dask.dataframe.from_delayed 将所有文件读入分布式 dask 数据帧,使用 dask.distributed.worker_client 在每个工作进程上获取客户端,然后在每个工作进程上使用 dask.dataframe.compute 获取熊猫数据帧。
- 像解决方案 2 一样加载分布式数据帧,使用 dask.distributed.Client.replicate 将所有分区分配给所有工作人员,使用 dask.distributed.worker_client 在每个工作进程上获取客户端,并使用 dask.dataframe。计算以获取每个工作进程中的 pandas 数据帧。
- 像解决方案 2 一样加载分布式数据帧,使用 dask.dataframe.compute 将数据帧带入本地进程,从集群中删除分布式数据帧(通过取消期货),并使用 dask.distributed.Client.scatter(broadcast =True, direct=True) 将本地 pandas 数据帧发送给所有工作人员。
- 加载分布式数据帧并将其收集到本地进程,如解决方案 4,使用 dask.distributed.Client.scatter(broadcast=False) 将其发送到工作进程,使用 dask.distributed.Client.replicate 将其发送到所有其他工人。
解决方案 2-5 与 MPI4py 版本相比具有巨大优势,因为它们利用了 dask 并行加载数据帧的能力。然而,当需要在集群中分发数据时,这些解决方案中没有一个能够接近 MPI4py 广播功能的性能。此外,我无法预测他们的内存使用情况,并且我看到许多来自工作人员的消息抱怨事件循环在几秒钟内没有响应。
在这个阶段,我倾向于使用第一种解决方案:即使数据加载效率低下,它也不会那么慢,而且根据我的经验,它是最强大的。如果我走这条路,我肯定会在桌面上留下很多性能潜力。有什么方法可以改进其中一种 dask 解决方案吗?还是有其他我没有考虑过的解决方案?
dask - dask 的 set_index 进度报告
我正在尝试在整个脚本周围包裹一个进度指示器。但是,set_index(..., compute=False)
它仍然在调度程序上运行任务,可在 Web 界面中观察到。
如何报告set_index
步骤的进度?
hadoop - 如何使用 Dask 在纱线上运行并行化的 Python 作业?
我有几个关于将 Dask 与 Hadoop/Yarn 一起使用的问题。
1 )如何将 Dask 连接到 Hadoop/YARN 并并行化作业?
当我尝试使用:
它导致错误:
CommClosedError:在:流已关闭:尝试调用远程方法“身份”时
我应该传递名称节点还是数据节点的地址?我可以推荐 Zookeeper 吗?
2 )如何使用 Dask 和 HDFS3 从 HDFS 读取数据?
当我尝试使用以下方式读取文件时:
它导致以下错误:
ImportError:没有名为 lib 的模块
我已尝试卸载并重新安装 hdfs3,但错误仍然存在。
我已经安装了knit并尝试使用以下示例启动纱线容器:
http://knit.readthedocs.io/en/latest/examples.html#ipython-parallel
这失败并出现安全错误。
我没有访问集群的权限,因此在集群中的每个节点上安装任何包都是不可能的,sudo
我唯一能做的安装是通过我的.conda
pip
userid
最后,如果有人可以在 Yarn 上发布 Dask 的工作示例,那将非常有帮助。
非常感谢任何帮助,
arrays - 多个图像意味着 dask.delayed 与 dask.array
背景
我有一个列表,其中包含经过预处理并保存为 .npy 二进制文件的数千个图像堆栈(3D numpy 数组)的路径。
案例研究我想计算所有图像的平均值,为了加快分析速度,我认为并行处理。
使用方法 dask.delayed
使用 dask.arrays
修改自Matthew Rocklin 博客的方法
问题
1.在dask.delayed
方法中是否需要预先分块列表?如果我分散原始列表,我会为每个元素获得一个未来。有没有办法告诉工人处理它有权访问的期货?
2.该dask.arrays
方法明显较慢且内存使用率较高。这是使用 dask.arrays 的“坏方法”吗?
3.有没有更好的方法来解决这个问题?
谢谢!
python - Dask SVD 计算和中间值的重用
我在 Dask 数组中有一个巨大的数 GB 矩阵。如果我执行以下操作:
其次是
我可以确保 Dask 将重用进程的中间值,还是会为 u、s 和 v 重新运行整个过程?
dask - 懒惰地从 CSV 加载 dask 数据帧(内部延迟)
在使用 dask.distributed 时,我试图在 S3 上的延迟函数中从 CSV 加载 dask 数据帧,如下所示:
read_csv()不需要与分布式客户端交互,所以我认为这是可能的。然后在客户端计算机上计算func1返回的延迟对象。
在那之前它看起来不错,打印结果
但是,Failed to serialize (<dask.bytes.core.OpenFile object at ...>, ..., ..., '\n'). Exception: can't pickle thread.lock objects
当我尝试进一步处理它时它失败了,例如
有没有办法让这个方案发挥作用,或者我在这里错过了一些更基本的限制?
PS。我得到的错误日志:
python - dask 分布式 1.19 客户端日志记录?
以下代码用于在某些时候发出日志,但似乎不再这样做。每个工作人员中的日志记录机制配置不应该允许日志出现在标准输出上吗?如果没有,我忽略了什么?
根据这个问题,我尝试将记录器配置代码放入c.run(init_logging)
在客户端实例化后立即调用的单独函数中,但这也没有任何区别。
我在 Linux 上使用分布式 1.19.3 和 Python 3.6.3。我有
在~/.dask/config.yaml
.
dask - 分析工作人员和客户端之间的 Dask 分布式通信
我想问一下是否有办法及时衡量工人和客户之间的沟通。目前,我正在使用调度程序插件来获取任务状态并从那里获取执行的详细信息。当结果返回给客户端时,我看不到任何传输任务。假设它等于最后完成的任务和计算返回之间的时间是否公平?
我们如何才能找到从每个任务转移到客户端的数据量?
一个一般的例子是
谢谢!
python - 使用 dask 将数据框分区保存到镶木地板
我目前正在尝试保存和读取从 dask 到镶木地板文件的信息。但是,当尝试使用 dask "to_parquet" 保存数据帧并随后使用 "read_parquet" 再次加载时,似乎分区信息丢失了。
这是故意的吗?我目前的解决方法是在加载后再次设置索引,但这需要很长时间。
或者我在保存和加载时错过了一些选项?