我已经通过数据连接将一个包含 100,000 个大约 100gb 的原始 json 文件的数据集导入到 Foundry 中。我想使用Python Transforms raw file access
转换来读取文件,将结构和结构的数组展平到数据帧中,作为对 df 的增量更新。我想使用 *.json 文件的文档中的以下示例中的内容,并将其转换为使用@incremental()
装饰器的增量更新。
>>> import csv
>>> from pyspark.sql import Row
>>> from transforms.api import transform, Input, Output
>>>
>>> @transform(
... processed=Output('/examples/hair_eye_color_processed'),
... hair_eye_color=Input('/examples/students_hair_eye_color_csv'),
... )
... def example_computation(hair_eye_color, processed):
...
... def process_file(file_status):
... with hair_eye_color.filesystem().open(file_status.path) as f:
... r = csv.reader(f)
...
... # Construct a pyspark.Row from our header row
... header = next(r)
... MyRow = Row(*header)
...
... for row in csv.reader(f):
... yield MyRow(*row)
...
... files_df = hair_eye_color.filesystem().files('**/*.csv')
... processed_df = files_df.rdd.flatMap(process_file).toDF()
... processed.write_dataframe(processed_df)
在@Jeremy David Gamet 的帮助下,我能够开发代码来获得我想要的数据集。
from transforms.api import transform, Input, Output
from pyspark import *
import json
@transform(
out=Output('foundry/outputdataset'),
inpt=Input('foundry/inputdataset'),
)
def update_set(ctx, inpt, out):
spark = ctx.spark_session
sc = spark.sparkContext
filesystem = list(inpt.filesystem().ls())
file_dates = []
for files in filesystem:
with inpt.filesystem().open(files.path,'r', encoding='utf-8-sig') as fi:
data = json.load(fi)
file_dates.append(data)
json_object = json.dumps(file_dates)
df_2 = spark.read.option("multiline", "true").json(sc.parallelize([json_object]))
df_2.drop_duplicates()
# this code to [Flatten array column][1]
df_2 = flatten(df_2)
out.write_dataframe(df_2)
扁平化__df的代码
上面的代码适用于少数文件,因为文件超过 100,0000 我遇到以下错误:
Connection To Driver Lost
This error indicates that connection to the driver was lost unexpectedly, which is often caused by the driver being terminated due to running out of memory. Common reasons for driver out-of-memory (OOM) errors include functions that materialize data to the driver such as .collect(), broadcasted joins, and using Pandas dataframes.
有什么办法吗?