0

我想计算数据中唯一行的数量。下面是一个快速输入/输出示例。

#input
A,B
0,0
0,1
1,0
1,0
1,1
1,1

#output
A,B,count
0,0,1
0,1,1
1,0,2
1,1,2

我的管道中的数据有超过 5000 列和超过 1M 行,每个单元格是 0 或 1。下面是我使用 Dask 进行缩放的两次尝试(26 列):

import numpy as np
import string
import time

client = Client(n_workers=6, threads_per_worker=2, processes=True)

columns = list(string.ascii_uppercase)

data = np.random.randint(2, size = (1000000, len(columns)))

ddf_parent = dd.from_pandas(pd.DataFrame(data, columns = columns), npartitions=20)

#1st solution
ddf = ddf_parent.astype(str)

ddf_concat = ddf.apply(''.join, axis =1).to_frame()

ddf_concat.columns = ['pattern']

ddf_concat = ddf_concat.groupby('pattern').size()

start = time.time()
ddf_concat = ddf_concat.compute()
print(time.time()-start)

#2nd solution
ddf_concat_other = ddf_parent.groupby(list(ddf.columns)).size()

start = time.time()
ddf_concat_other = ddf_concat_other.compute()
print(time.time() - start)

结果:

9.491615056991577
12.688117980957031

第一个解决方案首先将每一列连接成一个字符串,然后在其上运行 group-by。第二个只是按所有列分组。我倾向于使用第一个,因为它在我的测试中更快,但我愿意接受建议。如果在性能方面有任何更好的东西,请随意完全改变我的解决方案(另外,有趣的是, sort=False 不会加速分组,这实际上可能与此有关:https ://github.com/dask /dask/issues/5441和这个https://github.com/rapidsai/cudf/issues/2717

注意:经过一些测试后,第一个解决方案与列数的比例相对较好。我想一项改进可能是将字符串散列以始终具有固定长度。在这种情况下对分区号有什么建议吗?从远程仪表板我可以看到,经过几次操作后,计算图中的节点减少到只有 3 个,没有利用其他可用的工作人员。

当列增加时,第二种解决方案失败。

注意 2:此外,对于第一个解决方案,我猜 Dask 调度和映射操作的方式发生了一些非常奇怪的事情。正在发生的事情是,一段时间后,单个工作人员获得的任务比其他工作人员多得多,然后工作人员超过 95% 的内存,崩溃,然后任务被正确拆分,但一段时间后另一个工作人员获得更多任务(并且循环重新开始)。管道运行良好,但我想知道这是否是预期的行为。附上截图: 在此处输入图像描述

4

0 回答 0