1

我有许多 csv 文件(90+)对于内存来说太大了(每个压缩约 0.5gb),它们都具有相同的模式。我想转换为镶木地板,然后使用 dask 进行时间序列分析。

目前我将它们读入 pandas,执行一些类型检查和业务逻辑,然后用于ddf.to_parquet发送到 parquet。我想按日期和版本号分区。因此代码如下所示:

import pandas as pd
import dask.dataframe as ddf
import pyarrow.dataset as ds

d = pd.DataFrame({'DATE':[202001, 202002, 202003, 202004, 202005]*20,'VERSION':[0,1]*50,'OTHER':[1]*100})
d_dask = ddf.from_pandas(d,npartitions=2)
ddf.to_parquet(d_dask, 'some/path/', engine = 'pyarrow', partition_on = ['DATE', 'VERSION'], \
write_metadata_file = True, schema='infer')

现在这可行,我最终得到一个文件夹结构,如下所示:

'一些/路径/_元数据'

'一些/路径/_common_metadata'

'一些/路径/日期=202001/版本=0/part.0.parquet'

.... 等等。

现在我想读入它并在内存之外进行基本操作。我在阅读时得到奇怪的结果。

test_read = ddf.read_parquet('some/path/', engine='pyarrow',partitioning="hive")
test_read.groupby('DATE').sum()

我得到以下信息:

[IN]: test_read.groupby('DATE').sum().compute()
[OUT]:

DATE     OTHER
202001    0
202002    0
202003    0
202004    0
202005    0

现在使用pyarrow.datasetsAPI 效果很好。

test_read = ds.dataset('some/path/',format='parquet',partitioning="hive")
test_read.to_table().to_pandas().groupby('DATE').sum()

我得到以下信息:

[OUT]:
        OTHER  VERSION
DATE                  
202001     20       10
202002     20       10
202003     20       10
202004     20       10
202005     20       10

这是我所期望的。

有谁知道这里发生了什么?

版本:

[IN]:
print(dask.__version__,pyarrow.__version__)
[OUT]:
2.30.0 2.0.0
4

0 回答 0