我有两个大型 CSV 文件,每个文件约 2800 万行。我正在执行内部联接,针对新的 Dask Dataframe 添加列,然后GroupBy.Size()
在某些列上请求 a 返回计数。在示例中,输入来自两个 parquet 文件,这些文件是从原始 CSV 生成的。
端到端程序确实可以在 8 核 / 32GB Ram 计算机上运行,并生成 groupBy 大小的 4x6 Pandas DF,但是在 16GB 和 10GB RAM 设备上运行时,出现内存错误。
我能做些什么来避免这个内存错误?
这是有问题的代码:
def merge(ubs_dd, br_dd):
return dd.merge(ubs_dd, br_dd, left_on='mabid', right_on='brid', how='inner', suffixes=('_ubs', '_br'),) # slow
#return dd.merge(ubs_dd, br_dd, left_index=True, right_index=True) # fast
def reconcile(merged_dd):
merged_dd['amount_different'] = merged_dd['AMOUNT_ubs'].astype(float) - merged_dd['AMOUNT_br'].astype(float)
merged_dd['amount_break'] = merged_dd['amount_different'].abs() >= 1 #+/- $1 tolerance
merged_dd['billable_break'] = merged_dd['BILLABLE_ubs'] == merged_dd['BILLABLE_br']
merged_dd['eligible_break'] = merged_dd['ELIGIBLE_ubs'] == merged_dd['ELIGIBLE_br']
return merged_dd
def metrics_report(merged_dd):
return merged_dd.groupby(['amount_break', 'billable_break', 'eligible_break']).size().reset_index().rename(columns={0:'count'}).compute()
merged_dd = merge(ubs_dd, br_dd)
merged_dd = reconcile(merged_dd)
metrics = metrics_report(merged_dd)
在低内存设备上运行时,完成 70% 后我收到以下错误:
generating final outputs
[############################ ] | 70% Completed | 29min 19.5s
Traceback (most recent call last):
File "c:/Users/<>/git/repository/<>/wma_billing_rec.py", line 155, in <module>
metrics = metrics_report(merged_dd)
File "c:/Users/<>/git/repository/<>/wma_billing_rec.py", line 115, in metrics_report
return merged_dd.groupby(['amount_break', 'billable_break', 'eligible_break']).size().reset_index().rename(columns={0:'count'}).compute()
File "C:\Programs\Miniconda3_64\envs\WMABillingRecEnv\lib\site-packages\dask\base.py", line 167, in compute
(result,) = compute(self, traverse=False, **kwargs)
File "C:\Programs\Miniconda3_64\envs\WMABillingRecEnv\lib\site-packages\dask\base.py", line 452, in compute
results = schedule(dsk, keys, **kwargs)
File "C:\Programs\Miniconda3_64\envs\WMABillingRecEnv\lib\site-packages\dask\threaded.py", line 84, in get
**kwargs
File "C:\Programs\Miniconda3_64\envs\WMABillingRecEnv\lib\site-packages\dask\local.py", line 486, in get_async
raise_exception(exc, tb)
File "C:\Programs\Miniconda3_64\envs\WMABillingRecEnv\lib\site-packages\dask\local.py", line 316, in reraise
raise exc
File "C:\Programs\Miniconda3_64\envs\WMABillingRecEnv\lib\site-packages\dask\local.py", line 222, in execute_task
result = _execute_task(task, data)
File "C:\Programs\Miniconda3_64\envs\WMABillingRecEnv\lib\site-packages\dask\core.py", line 121, in _execute_task
return func(*(_execute_task(a, cache) for a in args))
File "C:\Programs\Miniconda3_64\envs\WMABillingRecEnv\lib\site-packages\dask\dataframe\shuffle.py", line 780, in collect
res = p.get(part)
File "C:\Programs\Miniconda3_64\envs\WMABillingRecEnv\lib\site-packages\partd\core.py", line 73, in get
return self.get([keys], **kwargs)[0]
File "C:\Programs\Miniconda3_64\envs\WMABillingRecEnv\lib\site-packages\partd\core.py", line 79, in get
return self._get(keys, **kwargs)
File "C:\Programs\Miniconda3_64\envs\WMABillingRecEnv\lib\site-packages\partd\encode.py", line 28, in _get
raw = self.partd._get(keys, **kwargs)
File "C:\Programs\Miniconda3_64\envs\WMABillingRecEnv\lib\site-packages\partd\buffer.py", line 54, in _get
self.slow.get(keys, lock=False)))
MemoryError