1

使用 dask-ec2 脚本设置集群后,我尝试将 sql 表读取为 dask 数据帧。我的查询看起来像:

import dask.dataframe as dd
from dask.distributed import Client, progress
c = Client('127.0.0.1:8786')

df = dd.read_sql_table(sql_table_name, uri, index_col=column_1,  
columns=[column_2, column_3, column_4], npartitions=393, parse_dates=
[date_column_1, date_column_2])

其中 sql_table_name 和 uri 是特定于我的数据库的字符串。然后我尝试使用 dask.distributed 客户端持久化 df:

df = c.persist(df)
progress(df)

进度条在 0.6 秒时显示异常。我使用 c.get_futures_error(df) 尝试了解原因并返回如下内容:

(<function distributed.worker.execute_task>,
  ((<function dask.compatibility.apply>,
   <function pandas.io.sql.read_sql>,
   [<sqlalchemy.sql.selectable.Select at 0x7fd5e3b0a4e0; Select object>,
*the uri I used*],
   (dict,
[['parse_dates',
  [date_column_1, date_column_2]],
 ['index_col', column_1]])),),
{},
[])

我将不胜感激有关为什么我可能会遇到这些期货错误以及我可以做些什么来减轻它们的任何指导。

4

1 回答 1

0

我相信您正在寻找Client.recreate_error_locally

>>> future = c.submit(div, 1, 0)         
>>> future.status                        
'error'
>>> c.recreate_error_locally(future)     
ZeroDivisionError: division by zero
于 2018-01-08T21:49:41.517 回答