在处理非常大的数据帧时,我很难利用 Dask 分区。想象一个包含出租车行程日志的 200GB csv。我像这样加载数据:
df = dd.read_csv("/data/taxi_data_big.tsv", sep="\t")
然后对于每个司机,我想找出最早到机场的行程(DestinationId == 7)。
df1 = df[(df.DestinationId == 7)][["Date", "DriverId", "TripiId", "TripId_Rank", "DestinationId"]]
df1 数据框看起来像:
2020-01-01 D1 T1 8 7
2020-01-01 D1 T2 11 7
2020-01-01 D1 T3 44 7
2020-01-02 D1 T4 8 7
2020-01-02 D1 T5 13 7
2020-01-01 D2 T77 20 7
2020-01-01 D2 T177 76 7
2020 年 1 月 1 日,D2 司机第 20 次和第 76 次前往机场。
对于我的分析,我需要找到司机在去机场之前的平均行程次数。
df2 = df1.groupby('TripId').TripId_Rank.idxmin()
会给我 TripId 和第一次去机场的索引。
df4 = df2.loc[df3]
选择匹配的行。这适用于小数据集,但是当我移动到大数据集时,我得到了"ValueError: Not all divisions are known, can't align partitions" when performing math on dataframe column.
如果我的理解是正确的,则错误是由于数据帧被加载到多个分区中引起的,并且 Dask 文档要求在数据帧上设置显式索引。
df1 = df[(df.DestinationId == 7)][["Date", "DriverId", "TripiId", "TripId_Rank", "DestinationId"]].compute()
df1['id'] = np.arange(len(df2)) # explicitly add index column to the dataframe
df1 = df1.set_index("id") # is this really necessary? This takes hours to complete
df2 = df1.groupby('TripId').TripId_Rank.idxmin()
df4 = df2.loc[df3]
df
上面的代码有效,但我想知道这个问题是否有更好的解决方案。将 id 列添加到数据帧真的很慢,我不确定上面的代码是否利用了 Dask 并行化。
提前致谢。