4

我想在我的分布式集群上运行图形/期货,它们都有一个“加载数据”根任务,然后是一堆在该数据上运行的训练任务。简化版本如下所示:

from dask.distributed import Client
client = Client(scheduler_ip)
load_data_future = client.submit(load_data_func, 'path/to/data/')
train_task_futures = [client.submit(train_func, load_data_future, params) 
                      for params in train_param_set]

像上面那样运行调度程序让一个工作人员读取文件,然后将该数据溢出到磁盘以与其他工作人员共享。但是,加载数据通常是从一个大的 HDF5 文件中读取,这可以同时完成,所以我想知道是否有一种方法可以强制所有工作人员同时读取这个文件(他们都计算根任务)而不是让他们等待一名工作人员完成,然后慢慢地从该工作人员传输数据。

我知道有一种client.run()方法可以让所有工作人员同时读取文件,但是您将如何获取已读取的数据以提供给下游任务?

我不能使用 dask 数据原语同时读取 HDF5 文件,因为我需要多索引和多列分组等内容。

4

2 回答 2

2

重新审视了这个问题,找到了一个相对简单的解决方案,尽管它使用内部 API 方法并涉及对client.run(). 使用与问题中相同的变量:

from distributed import get_worker
client_id = client.id
def load_dataset():
    worker = get_worker()
    data = {'load_dataset-0': load_data_func('path/to/data')}
    info = worker.update_data(data=data, report=False)
    worker.scheduler.update_data(who_has={key: [worker.address] for key in data}, 
                                 nbytes=info['nbytes'], client=client_id)
client.run(load_dataset)

现在,如果您运行client.has_what(),您应该会看到每个工人都持有 key load_dataset-0。要在下游计算中使用它,您可以简单地为密钥创建一个未来:

from distributed import Future
load_data_future = Future('load_dataset-0', client=client)

这可以与往常一起使用client.compute()dask.delayed照常使用。事实上,问题中示例的最后一行可以正常工作:

train_task_futures = [client.submit(train_func, load_data_future, params) 
                      for params in train_param_set]

请记住,它使用内部 API 方法Worker.update_dataScheduler.update_data并且工作正常,distributed.__version__ == 1.21.6但在未来的版本中可能会发生变化。

于 2018-08-29T03:39:31.640 回答
1

截至今天(distributed.__version__ == 1.20.2),您要求的内容是不可能的。最接近的方法是计算一次,然后显式复制数据

future = client.submit(load, path)
wait(future)
client.replicate(future)

您可能想在https://github.com/dask/distributed/issues/new上将此作为功能请求提出

于 2018-01-17T13:24:45.310 回答