我正在使用 dask 来编写和阅读镶木地板。我正在使用 fastparquet 引擎写作并使用 pyarrow 引擎阅读。我的工人有 1 GB 的内存。使用 fastparquet 内存使用情况很好,但是当我切换到 pyarrow 时,它就会爆炸并导致工作人员重新启动。我在下面有一个可重现的示例,它在 1gb 内存限制的工作人员上使用 pyarrow 失败。实际上,我的数据集比这大得多。使用 pyarrow 的唯一原因是,与 fastparquet 相比,它在扫描时提高了我的速度(大约 7x-8x)
黄昏:0.17.1
pyarrow:0.9.0.post1
快速镶木地板:0.1.3
import dask.dataframe as dd
import numpy as np
import pandas as pd
size = 9900000
tmpdir = '/tmp/test/outputParquet1'
d = {'a': np.random.normal(0, 0.3, size=size).cumsum() + 50,
'b': np.random.choice(['A', 'B', 'C'], size=size),
'c': np.random.choice(['D', 'E', 'F'], size=size),
'd': np.random.normal(0, 0.4, size=size).cumsum() + 50,
'e': np.random.normal(0, 0.5, size=size).cumsum() + 50,
'f': np.random.normal(0, 0.6, size=size).cumsum() + 50,
'g': np.random.normal(0, 0.7, size=size).cumsum() + 50}
df = dd.from_pandas(pd.DataFrame(d), 200)
df.to_parquet(tmpdir, compression='snappy', write_index=True,
engine='fastparquet')
#engine = 'pyarrow' #fails due to worker restart
engine = 'fastparquet' #works fine
df_partitioned = dd.read_parquet(tmpdir + "/*.parquet", engine=engine)
print(df_partitioned.count().compute())
df_partitioned.query("b=='A'").count().compute()
编辑:我的原始设置运行 spark 作业,使用 fastparquet 将数据并行写入分区。因此元数据文件是在最里面的分区而不是父目录中创建的。因此使用 glob 路径而不是父目录(fastparquet 读取父目录时要快得多,而使用 glob 路径扫描时 pyarrow 获胜)