问题标签 [dask-distributed]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
python-3.x - Dask Distributed - 分布式读取:调度程序将任务分配给错误的工作人员
我目前正在对 Dask 分布式集群进行分布式读取。让所有节点从一个公共 NFS 获取数据是可行的,但我的数据已经分散/存储在每个节点的本地驱动器上。调度程序在单独的节点上运行。
这是代码
任何尝试处理这些数据都会导致FileNotFoundError: [Errno 2] No such file or directory: '/data/nyc_taxi/yellow*.csv.part30'
调度程序日志给了我:
这表明调度程序将任务分配给了错误的工作人员。不过,询问客户client.who_has(fdf[0])
会给我正确的地址。
我查看了这个博客条目和这个 wiki 条目,但它没有为我提供任何有用的线索。我在这里想念什么?
最理想的是,我很想做这样的事情:
我正在开发 dask 0.16.0/distributed 1.20.0,但该错误也发生在较旧的 dask 版本(大约 0.15.4)上。
dask - 我收集了一些期货,这些期货是持久化数据帧的结果。如何对它们进行延迟操作?
我已经设置了一个调度程序和 4 个工作节点来对 csv 进行一些处理。csv 的大小仅为 300 mb。
st 列表包含 1000 个元素
当我这样做时,我收到此错误:
我的假设是工作人员应该获得完整的数据框并对其进行查询。但我认为它只是获得障碍并试图做到这一点。
它的解决方法是什么?由于数据帧块已经在工作人员的内存中。我不想将数据框移动到每个工作人员。
python-2.7 - Dask read_sql_table 错误:“instancemethod”对象没有属性“__getitem__”
我收到此参数的此错误:
产生此错误:
有人知道是什么问题吗?
python - Python + Distributed - 是否可以使用 Dask 来利用一组工作人员来应用一个函数来同时从文件夹中分离文件
我想编写一个程序来计算读取 .py 文件文件夹所需的时间并计算每个文件的圈复杂度。我安装了 Radon 来计算复杂性,但我也希望能够实现一个分布式系统,该系统创建一组 n 个工作人员,其中每个工作人员在文件夹中被赋予一个单独的文件,然后使用氡气计算。
我正在将 dask 用于分布式系统,并且想知道是否有可能实现我上面的要求。即,如果我有一个包含 10 个 .py 文件的文件夹,我可以创建 1 个工作人员来读取所有文件并计算复杂度,然后我的程序将记录执行该操作所花费的时间。或者我可以指定 10 个工作节点来寻找工作(即要计算的文件),每个节点将获取一个文件并同时运行,然后程序将记录执行该操作所花费的时间。
我使用 dask 设置了基本程序,它调用了一个函数,但我不确定你是否可以提供一个项目列表,这些项目分布在一组工作人员上,然后调用该函数并返回结果。
这可以使用 dask 吗?
python - Dask DataFrame.map_partition() 写入数据库表
我有一个 dask 数据框,其中包含一些转换后的数据。我想将这些数据写回mysql表。我已经实现了一个函数,该函数将数据帧作为 db url 并将数据帧写回数据库。因为我需要对数据框的数据进行一些最终编辑,所以我使用 pandasdf.to_dict('record')
来处理写入。
函数看起来像这样
在我的代码中:
但是,当我转到 MySQL 中受尊重的表时,我得到 7702,所有列上的值为 1。当我尝试使用该值过滤 all_records 时,没有返回任何字典。有没有人遇到过这种情况?您如何使用 dask 处理来自分区的数据库写入?
PS:我使用 LocalCluster 和 dask 分布式
dask - dask ec2 设置实例需要多长时间?
我是 dask.distributed 的新手。我正在尝试为分布式作业设置一些集群。我正在尝试 dask-ec2 来设置它们。当我使用所需的 Args 运行命令时,它卡在安装工作任务。我在 30 分钟后杀死了它。我正在使用端口号:22 用于 ssh 到集群。我正在尝试启动 2 个实例。
它什么时候完成或其他事情是错误的?
dask - Dask 表演:工作流程疑虑
我对如何从 dask 中获得最佳效果感到困惑。
问题
我有一个包含多个时间序列的数据框(每个都有自己的时间序列key
),我需要my_fun
在每个时间序列上运行一个函数。用 pandas 解决它的一种方法涉及
df = list(df.groupby("key"))
然后应用my_fun
多处理。尽管内存使用量很大,但性能在我的机器上相当不错,而在谷歌云计算上却很糟糕。
在 Dask 上,我当前的工作流程是:
- 从 S3 读取数据。14 个文件 -> 14 个分区
- `df.groupby("key").apply(my_fun).to_frame.compute(get=get)
因为我没有设置索引df.known_divisions
是False
问题:
- 最好
df.npartitions
是倍数ncpu
还是没关系? 由此看来,将索引设置为键似乎更好。我的猜测是我可以做类似的事情
df["key2"] = df["key"] df = df.set_index("key2")
但是,同样,我不知道这是否是最好的方法。
dask - dask dataframe to parquet 因内存错误而失败
我从多个 hdfs 文件创建了 dask 数据帧,然后尝试将最终数据帧写回 hdfs(parquet)。但它因内存错误消息而失败。
最终的 dask 数据框将有 100 万条记录和 600 列。
Dask 集群大小:5 个节点,每个节点具有 55G 内存。
例外:
请指出我正确的方向来克服这个问题。
dask - 使用 dask-distributed 如何从队列提供的长时间运行的任务中生成期货
我正在按照此示例http://matthewrocklin.com/blog/work/2017/02/11/dask-tensorflow使用磁盘分布式长时间运行的任务,其中长时间运行的工作任务从队列中获取输入就像在 tensorflow 示例中一样,并将其结果传递到输出队列。(我在最新版本的 dask 中没有看到示例中使用的通道)。
我可以看到如何分散列表并应用映射来生成将输入数据推送到工作人员输入队列的期货列表。
现在,如果我们等待工作人员使用任务,所有结果都将在工作人员的输出队列中。我们可以使用
只要我们通过等待所有工作任务完成然后再调用它们来手动处理排序,这就可以正常工作。
我们如何将输出期货链接到输入的下游?有没有办法让长期运行的任务在可以收集回调度程序任务的工作人员上创建未来?
我尝试让 transfer_dask_to_worker(batch) 也查询输出队列并返回结果:
这适用于短名单,但由于取消了大约 1000 个项目的期货而开始失败。
提前致谢。
dask - 在 Dask 集合中获取所有期货的最佳方式
d
如果一个人有一个依赖于一些持久内容的 Dask 集合,那么获取所有依赖的Future
s列表的最佳方法是什么?d