我有一个 dask 数据框,其中包含一些转换后的数据。我想将这些数据写回mysql表。我已经实现了一个函数,该函数将数据帧作为 db url 并将数据帧写回数据库。因为我需要对数据框的数据进行一些最终编辑,所以我使用 pandasdf.to_dict('record')
来处理写入。
函数看起来像这样
def store_partition_to_db(df, db_url):
from sqlalchemy import create_engine
from mymodels import DBTableBaseModel
records_dict = df.to_dict(records)
records_to_db = []
for record in records_dict:
transformed_record = transform_record_some_how # transformed_record is a dictionary
records_to_db.append(transformed_record)
engine = create_engine(db_uri)
engine.execute(DBTableBaseModel.__table__.insert(), records_to_db)
return records_to_db
在我的代码中:
from functools import partial
partial_store_partition_to_db(store_partition_to_db db_url=url)
dask_dataframe = dask_dataframe_data.map_partitions(partial_store_partition_to_db)
all_records = dask_dataframe.compute()
print len([record_dict for record_list in all_records for record_dict in record_list]] # Gives me 7700
但是,当我转到 MySQL 中受尊重的表时,我得到 7702,所有列上的值为 1。当我尝试使用该值过滤 all_records 时,没有返回任何字典。有没有人遇到过这种情况?您如何使用 dask 处理来自分区的数据库写入?
PS:我使用 LocalCluster 和 dask 分布式