2

我想提交一个 dask 任务,它将执行以下操作:

  1. def fakejob使用 dask.bag ( )构建惰性 dask 图
  2. 从 1. 计算图形并将其保存到镶木地板(这部分省略,只是一个动机)

我需要对多个输入执行此操作,因此我一直在尝试像这样使用 dask.distributed 的期货功能。

from dask.distributed import Client

client = Client(processes=True)

def fakejob(path):
    return (
        dask.bag
        .read_text(path)
        .to_dataframe()
    )

futures = client.map(fakejob, [input_path1, input_path2])

问题是我不断得到:AssertionError: daemonic processes are not allowed to have children

我已经尝试关注这个链接并最终得到了第二个版本(与第一个版本不同的是 1 行),但期货永远保持“待定”。

from dask.distributed import Client

client = Client(processes=True)

def fakejob(path):
    with dask.set_options(get=client.get):
        return (
            dask.bag
            .read_text(path)
            .to_dataframe()
        )

futures = client.map(fakejob, [input_path1, input_path2])

关于如何做到这一点的任何线索?

干杯。

4

1 回答 1

2

The strange and slightly humerous error message comes from trying to construct the dask graph (which is what a bag is) within a worker process, which is where things end up if called with client.map. Your second attempt would work with a local client if you could put the whole work-flow within the function, including writing to parquet, and didn't attempt to pass the bag back to the caller.

The solution is simpler.

bags = [dask.bag.read_text(path)
        .to_dataframe() for path in [input_path1, input_path2])
futures = client.compute(bags)   # run in background on the cluster
client.gather(futures)   # wait and get results

Here, bags is a list of dask-bags, i.e., work tasks defined but not yet running. You could replace the last two lines with dask.compute(*bags) to get the result without worrying about futures.

于 2017-07-26T13:54:49.680 回答