1

我有一个非经典格式的文件,所以我需要直接在原始文件上使用spark.DataFrameReader( spark.read.csv),以便我可以设置适当的解析配置。

我怎样才能做到这一点?

4

1 回答 1

4

您需要遵循此处的方法。强烈建议使用基于单元测试的方法来迭代您的代码以恢复文件内容。

您的计算函数代码将如下所示:

from transforms.api import transform, Output, Input
from transforms.verbs.dataframes import union_many


def read_files(spark_session, paths):
    parsed_dfs = []
    for file_name in paths:
        parsed_df = spark_session.read.option("header", "true").csv(file_name)
        parsed_dfs += [parsed_df]
    output_df = union_many(*parsed_dfs)
    return output_df


@transform(
    the_output=Output("ri.foundry.main.dataset.my-awesome-output"),
    the_input=Input("ri.foundry.main.dataset.my-awesome-input"),
)
def my_compute_function(the_input, the_output, ctx):
    session = ctx.spark_session
    input_filesystem = the_input.filesystem()
    hadoop_path = input_filesystem.hadoop_path
    files = input_filesystem.ls('**/*.csv.gz').map(lambda file_name: hadoop_path + file_name)
    output_df = read_files(session, files)
    the_output.write_dataframe(output_df)
于 2022-01-20T20:32:50.903 回答