我有一个非经典格式的文件,所以我需要直接在原始文件上使用spark.DataFrameReader
( spark.read.csv
),以便我可以设置适当的解析配置。
我怎样才能做到这一点?
我有一个非经典格式的文件,所以我需要直接在原始文件上使用spark.DataFrameReader
( spark.read.csv
),以便我可以设置适当的解析配置。
我怎样才能做到这一点?
您需要遵循此处的方法。强烈建议使用基于单元测试的方法来迭代您的代码以恢复文件内容。
您的计算函数代码将如下所示:
from transforms.api import transform, Output, Input
from transforms.verbs.dataframes import union_many
def read_files(spark_session, paths):
parsed_dfs = []
for file_name in paths:
parsed_df = spark_session.read.option("header", "true").csv(file_name)
parsed_dfs += [parsed_df]
output_df = union_many(*parsed_dfs)
return output_df
@transform(
the_output=Output("ri.foundry.main.dataset.my-awesome-output"),
the_input=Input("ri.foundry.main.dataset.my-awesome-input"),
)
def my_compute_function(the_input, the_output, ctx):
session = ctx.spark_session
input_filesystem = the_input.filesystem()
hadoop_path = input_filesystem.hadoop_path
files = input_filesystem.ls('**/*.csv.gz').map(lambda file_name: hadoop_path + file_name)
output_df = read_files(session, files)
the_output.write_dataframe(output_df)