2

我目前正在对 Dask 分布式集群进行分布式读取。让所有节点从一个公共 NFS 获取数据是可行的,但我的数据已经分散/存储在每个节点的本地驱动器上。调度程序在单独的节点上运行。

这是代码

from dask.distributed import Client
client = Client(scheduler_file='/path/to/scheduler.json')
import socket
nodes = client.run(socket.gethostname)
hosts =  nodes.values() # something like ['node01','node02',...,'node40']
import dask.dataframe as dd

columns = ['VendorID','tpep_pickup_datetime','tpep_dropoff_datetime','passenger_count', 'trip_distance', 'Pickup_longitude', 'Pickup_latitude', 'RatecodeID', 'store_and_fwd_flag', 'Dropoff_longitude', 'Dropoff_latitude',  'payment_type', 'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge', 'total_amount']
fdf = [ client.submit(dd.read_csv, '/data/nyc_taxi/yellow*.csv.part+worker[-2:]', workers=worker, names=columns ) for worker in hosts]

任何尝试处理这些数据都会导致FileNotFoundError: [Errno 2] No such file or directory: '/data/nyc_taxi/yellow*.csv.part30'

调度程序日志给了我:

distributed.scheduler - ERROR - error from worker tcp://192.168.1.141:43079: [Errno 2] No such file or directory: '/data/nyc_taxi/yellow_tripdata_2016-03.csv.part30'

这表明调度程序将任务分配给了错误的工作人员。不过,询问客户client.who_has(fdf[0])会给我正确的地址。

我查看了这个博客条目这个 wiki 条目,但它没有为我提供任何有用的线索。我在这里想念什么?

最理想的是,我很想做这样的事情:

df = dd.from_delayed(fdf) # is this supposed to work on futures?

我正在开发 dask 0.16.0/distributed 1.20.0,但该错误也发生在较旧的 dask 版本(大约 0.15.4)上。

4

0 回答 0