我目前正在对 Dask 分布式集群进行分布式读取。让所有节点从一个公共 NFS 获取数据是可行的,但我的数据已经分散/存储在每个节点的本地驱动器上。调度程序在单独的节点上运行。
这是代码
from dask.distributed import Client
client = Client(scheduler_file='/path/to/scheduler.json')
import socket
nodes = client.run(socket.gethostname)
hosts = nodes.values() # something like ['node01','node02',...,'node40']
import dask.dataframe as dd
columns = ['VendorID','tpep_pickup_datetime','tpep_dropoff_datetime','passenger_count', 'trip_distance', 'Pickup_longitude', 'Pickup_latitude', 'RatecodeID', 'store_and_fwd_flag', 'Dropoff_longitude', 'Dropoff_latitude', 'payment_type', 'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge', 'total_amount']
fdf = [ client.submit(dd.read_csv, '/data/nyc_taxi/yellow*.csv.part+worker[-2:]', workers=worker, names=columns ) for worker in hosts]
任何尝试处理这些数据都会导致FileNotFoundError: [Errno 2] No such file or directory: '/data/nyc_taxi/yellow*.csv.part30'
调度程序日志给了我:
distributed.scheduler - ERROR - error from worker tcp://192.168.1.141:43079: [Errno 2] No such file or directory: '/data/nyc_taxi/yellow_tripdata_2016-03.csv.part30'
这表明调度程序将任务分配给了错误的工作人员。不过,询问客户client.who_has(fdf[0])
会给我正确的地址。
我查看了这个博客条目和这个 wiki 条目,但它没有为我提供任何有用的线索。我在这里想念什么?
最理想的是,我很想做这样的事情:
df = dd.from_delayed(fdf) # is this supposed to work on futures?
我正在开发 dask 0.16.0/distributed 1.20.0,但该错误也发生在较旧的 dask 版本(大约 0.15.4)上。