问题标签 [dask-distributed]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
371 浏览

dask - Dask Sheduler 的内存使用缓慢增加

我正在运行测试:

我使用它(带有更多代码)来估计我的查询性能。但是对于这个例子,我已经删除了所有我能想到的东西。

上面的代码每秒泄漏大约 0.1 MB,我估计每 1000 次调用大约 0.3 MB。

我在我的代码中做错了吗?

0 投票
0 回答
426 浏览

dask - Dask 工作人员未能反序列化任务

我将新节点添加到 dask 集群并使用 conda 安装了所有依赖项。安装在所有 dask 节点上的包和版本方面都相似。

但是,新节点上的任务失败并出现以下错误:

[worker ....]:distributed.worker - 警告 - 无法反序列化任务

所有其他节点工作正常。仅供参考:所有节点都安装了快速镶木地板。

如果我遗漏了什么,请告诉我。

0 投票
1 回答
1211 浏览

dask - 如何从命令行运行 dask 分布式本地集群?

我想Client(LocalCluster())从命令行做相当于。

当与从 Jupyter 笔记本分发的分布式交互时,我最终会经常重新启动我的内核并LocalCluster每次都启动一个新内核,以及刷新我的散景网页。

我宁愿在后台运行一个可以连接到的进程,这可能吗?

0 投票
0 回答
47 浏览

dask - 在 Dask 分布式客户端中不使用 v.get() 检查变量是否存在

使用 Dask 分布式客户端时,我可以轻松获得如下变量:

但是,如果我在不存在的密钥上执行此操作:

5秒后给我:

是否可以在不依赖超时的情况下预先检查变量是否存在?

问候, 尼克拉斯

0 投票
1 回答
1131 浏览

python-3.x - Dask - 搜索与值匹配的行

我试图使用 Dask 读取一个非常大的 csv 文件的文件夹(它们都适合内存,它们非常大,但我有很多 RAM) - 我当前的解决方案看起来像:

1(然后使用熊猫)还是 2 更好?只是想知道该怎么做?

0 投票
1 回答
563 浏览

dask - dask 数据帧 set_index 抛出错误

我有一个从 HDFS 上的镶木地板文件创建的 dask 数据框。使用 api: set_index 创建设置索引时,失败并出现以下错误。

文件“/ebs/d1/agent/conda/envs/py361/lib/python3.6/site-packages/dask/dataframe/shuffle.py”,第 64 行,在 set_index 分区、大小、分钟、最大值 = base.compute (分区、大小、分钟、最大值)文件“/ebs/d1/agent/conda/envs/py361/lib/python3.6/site-packages/dask/base.py”,第 206 行,计算结果 = get( dsk,密钥,**kwargs)文件“/ebs/d1/agent/conda/envs/py361/lib/python3.6/site-packages/distributed/client.py”,第 1949 行,在 get results = self.gather (打包,异步=异步)文件“/ebs/d1/agent/conda/envs/py361/lib/python3.6/site-packages/distributed/client.py”,第 1391 行,收集异步=异步)文件“ /ebs/d1/agent/conda/envs/py361/lib/python3.6/site-packages/distributed/client.py”,第 561 行,同步返回同步(self.loop,func,*args, **kwargs) 文件“/ebs/d1/agent/conda/envs/py361/lib/python3.6/site-packages/distributed/utils.py”,第 241 行,同步 6.reraise(*error [0]) 文件“/ebs/d1/agent/conda/envs/py361/lib/python3.6/site-packages/six.py”,第 693 行,在 reraise raise value 文件“/ebs/d1/agent/ conda/envs/py361/lib/python3.6/site-packages/distributed/utils.py”,第 229 行,在 f 结果[0] = yield make_coro() 文件“/ebs/d1/agent/conda/envs/ py361/lib/python3.6/site-packages/tornado/gen.py”,第 1055 行,运行值 = future.result() 文件“/ebs/d1/agent/conda/envs/py361/lib/python3. 6/site-packages/tornado/concurrent.py”,第 238 行,结果 raise_exc_info(self._exc_info) 文件“”,第 4 行,在 raise_exc_info 文件“/ebs/d1/agent/conda/envs/py361/lib/蟒蛇3。6/site-packages/tornado/gen.py”,第 1063 行,运行中产生 = self.gen.throw(*exc_info) 文件“/ebs/d1/agent/conda/envs/py361/lib/python3.6/ site-packages/distributed/client.py”,第 1269 行,在 _gather 回溯中)文件“/ebs/d1/agent/conda/envs/py361/lib/python3.6/site-packages/six.py”,第 692 行,在 reraise raise value.with_traceback(tb) 文件“/ebs/d1/agent/conda/envs/py361/lib/python3.6/site-packages/dask/dataframe/io/parquet.py”,第 144 行,在_read_parquet_row_group open=open, assign=views, scheme=scheme) TypeError: read_row_group_file() got an unexpected keyword argument 'scheme'py”,第 1269 行,在 _gather traceback 中)文件“/ebs/d1/agent/conda/envs/py361/lib/python3.6/site-packages/six.py”,第 692 行,在 reraise raise value.with_traceback( tb) _read_parquet_row_group 中的文件“/ebs/d1/agent/conda/envs/py361/lib/python3.6/site-packages/dask/dataframe/io/parquet.py”,第 144 行 open=open,assign=views , scheme=scheme) TypeError: read_row_group_file() got an unexpected keyword argument 'scheme'py”,第 1269 行,在 _gather traceback 中)文件“/ebs/d1/agent/conda/envs/py361/lib/python3.6/site-packages/six.py”,第 692 行,在 reraise raise value.with_traceback( tb) _read_parquet_row_group 中的文件“/ebs/d1/agent/conda/envs/py361/lib/python3.6/site-packages/dask/dataframe/io/parquet.py”,第 144 行 open=open,assign=views , scheme=scheme) TypeError: read_row_group_file() got an unexpected keyword argument 'scheme'scheme=scheme) TypeError: read_row_group_file() got an unexpected keyword argument 'scheme'scheme=scheme) TypeError: read_row_group_file() got an unexpected keyword argument 'scheme'

有人可以指出这个错误的原因以及如何解决它。

0 投票
1 回答
74 浏览

python - 在多用户 dask.distributed 集群中隔离 python 环境有哪些选择?

upload_file当多个用户上传 ( ) 相同 python 文件或 zip 内容的略有不同版本时,我特别有兴趣避免冲突。

看起来这不是一个真正受支持的用例,因为工作进程是长时间运行的,并且会受到其他人的环境更改/添加的影响。

我喜欢这个库,它可以轻松地按需进行本地/远程上下文切换,因此希望能深入了解我们可能拥有的选项,即使这意味着对于特定于用户的工作进程需要一些类似部署的无缝步骤。

0 投票
0 回答
490 浏览

dask - dask 数据帧 to_parquet 抛出错误

我正在尝试将任务数据帧保存到 HDFS 上的镶木地板上。但是它失败并出现错误:Exception: TypeError('expected list of bytes',)

我还将 object_encoding 参数提供为 {"anomaly":"json","sensor_name":"json"}。

这是数据框中的列: Index(['original_value', 'anomaly', 'anomaly_bin', 'sensor_name'], dtype='object')

列 sensor_name 和 anomaly 是字符串。其他列是浮动的。

例如:[18.0 'N' 0.0 'settemp']

我还尝试将其保存为 HDFS 中的 CSV,但 api 失败并出现错误:异常:ValueError('url 类型不理解:CSV 的路径为:hdfs://ip:port/some path

如果有人能引导我朝着正确的方向前进,那就太好了。

0 投票
1 回答
805 浏览

python - 我可以在 Dask Dataframes 上懒惰地(或同时执行) .set_index() 吗?

tl; dr:

是否可以同时并行.set_index()处理多个 Dask 数据帧?或者,是否可以懒惰地处理几个 Dask Dataframe,从而导致同时并行设置索引.set_index()

这是场景:

  • 我有几个时间序列
  • 每个时间序列存储的是几个.csv文件。每个文件都包含与特定日期相关的数据。此外,文件分散在不同的文件夹中(每个文件夹包含一个月的数据)
  • 每个时间序列都有不同的采样率
  • 所有时间序列都有相同的列。都有一列,其中包含DateTime等。
  • 数据太大,无法在内存中处理。这就是我使用 Dask 的原因。
  • 我想将所有时间序列合并到一个 DataFrame 中,由DateTime. 为此,我需要首先将resample()每个时间序列设置为一个共同的采样率。然后.join()是所有时间序列。
  • .resample()只能应用于索引。因此,在重新采样之前,我需要.set_index()在每个时间序列的 DateTime 列上。
  • 当我.set_index()在一个时间序列上询问方法时,计算立即开始。这导致我的代码被阻止并等待。此时,如果我检查我的机器资源使用情况,我可以看到许多内核正在使用,但使用率没有超过 ~15%。这让我认为,理想情况下,我可以将该.set_index()方法同时应用于多个时间序列。

达到上述情况后,我尝试了一些不优雅的解决方案来并行.set_index()处理多个时间序列上的方法应用(例如 create a multiprocessing.Pool),但均未成功。在提供更多详细信息之前,是否有解决上述情况的干净方法?在实施 Dask 时是否考虑过上述场景?

或者,可以.set_index()偷懒吗?如果.set_index()可以懒惰地应用方法,我将使用上述步骤创建一个完整的计算图,最后,所有内容都将同时并行计算我认为)。

0 投票
1 回答
390 浏览

dask - 从并行 txt 文件中读取 dask 数据帧

我有两个(或更多)并行文本文件存储在 S3 中 - 即第一个文件中的第 1 行对应于第二个文件中的第 1 行等。我想将这些文件作为列读取到单个 dask 数据帧中。最好/最简单/最快的方法是什么?

PS。我可以将它们中的每一个读入一个单独的数据帧,但是我不能将它们加入索引,因为数据帧索引值似乎既不唯一也不单调。同时,行的对应关系由它们在每个文件中的位置定义。