0

我已经设置了一个调度程序和 4 个工作节点来对 csv 进行一些处理。csv 的大小仅为 300 mb。

df = dd.read_csv('/Downloads/tmpcrnin5ta',assume_missing=True)

df = df.groupby(['col_1','col_2']).agg('mean').reset_index()
df = client.persist(df)



def create_sep_futures(symbol,df):    

     symbol_df = copy.deepcopy(df[df['symbol' == symbol]])

     return symbol_df
lazy_values = [delayed(create_sep_futures)(symbol, df) for symbol in st]

 future = client.compute(lazy_values)
 result = client.gather(future)

st 列表包含 1000 个元素

当我这样做时,我收到此错误:

 distributed.worker - WARNING -  Compute Failed
 Function:  create_sep_futures
 args:      ('PHG',       symbol  col_3  col_2  \
 0                A            1.451261e+09                23.512857   
 1                A            1.451866e+09                23.886857   
 2                A            1.452470e+09                25.080429   

 kwargs:    {}
 Exception: KeyError(False,)

我的假设是工作人员应该获得完整的数据框并对其进行查询。但我认为它只是获得障碍并试图做到这一点。

它的解决方法是什么?由于数据帧块已经在工作人员的内存中。我不想将数据框移动到每个工作人员。

4

1 回答 1

0

默认情况下,使用数据帧语法和 API 对数据帧的操作是惰性(延迟)的,您无需再做任何事情。

第一个问题:您的语法错误df[df['symbol' == symbol]]=> df[df['symbol'] == symbol]。这就是False钥匙的由来。

因此,您可能正在寻找的解决方案:

future = client.compute(df[df['symbol'] == symbol])

如果您确实想单独处理块,您可以查看 ,您可以将df.map_partitions其与普通函数一起使用,并负责传递数据或延迟/期货或df.to_delayed,这将为您提供一组延迟对象,您可以将其与 a 一起使用延迟功能。

于 2017-11-23T20:45:42.713 回答