我想提交一个 dask 任务,它将执行以下操作:
def fakejob
使用 dask.bag ( )构建惰性 dask 图- 从 1. 计算图形并将其保存到镶木地板(这部分省略,只是一个动机)
我需要对多个输入执行此操作,因此我一直在尝试像这样使用 dask.distributed 的期货功能。
from dask.distributed import Client
client = Client(processes=True)
def fakejob(path):
return (
dask.bag
.read_text(path)
.to_dataframe()
)
futures = client.map(fakejob, [input_path1, input_path2])
问题是我不断得到:AssertionError: daemonic processes are not allowed to have children
我已经尝试关注这个链接并最终得到了第二个版本(与第一个版本不同的是 1 行),但期货永远保持“待定”。
from dask.distributed import Client
client = Client(processes=True)
def fakejob(path):
with dask.set_options(get=client.get):
return (
dask.bag
.read_text(path)
.to_dataframe()
)
futures = client.map(fakejob, [input_path1, input_path2])
关于如何做到这一点的任何线索?
干杯。