我有许多 csv 文件(90+)对于内存来说太大了(每个压缩约 0.5gb),它们都具有相同的模式。我想转换为镶木地板,然后使用 dask 进行时间序列分析。
目前我将它们读入 pandas,执行一些类型检查和业务逻辑,然后用于ddf.to_parquet
发送到 parquet。我想按日期和版本号分区。因此代码如下所示:
import pandas as pd
import dask.dataframe as ddf
import pyarrow.dataset as ds
d = pd.DataFrame({'DATE':[202001, 202002, 202003, 202004, 202005]*20,'VERSION':[0,1]*50,'OTHER':[1]*100})
d_dask = ddf.from_pandas(d,npartitions=2)
ddf.to_parquet(d_dask, 'some/path/', engine = 'pyarrow', partition_on = ['DATE', 'VERSION'], \
write_metadata_file = True, schema='infer')
现在这可行,我最终得到一个文件夹结构,如下所示:
'一些/路径/_元数据'
'一些/路径/_common_metadata'
'一些/路径/日期=202001/版本=0/part.0.parquet'
.... 等等。
现在我想读入它并在内存之外进行基本操作。我在阅读时得到奇怪的结果。
test_read = ddf.read_parquet('some/path/', engine='pyarrow',partitioning="hive")
test_read.groupby('DATE').sum()
我得到以下信息:
[IN]: test_read.groupby('DATE').sum().compute()
[OUT]:
DATE OTHER
202001 0
202002 0
202003 0
202004 0
202005 0
现在使用pyarrow.datasets
API 效果很好。
test_read = ds.dataset('some/path/',format='parquet',partitioning="hive")
test_read.to_table().to_pandas().groupby('DATE').sum()
我得到以下信息:
[OUT]:
OTHER VERSION
DATE
202001 20 10
202002 20 10
202003 20 10
202004 20 10
202005 20 10
这是我所期望的。
有谁知道这里发生了什么?
版本:
[IN]:
print(dask.__version__,pyarrow.__version__)
[OUT]:
2.30.0 2.0.0