我从一个大的 csvs zip 文件开始,我在 Palantir Foundry 中解压缩了它。
我现在有一个由多个 csv 组成的数据集(每年一个),其中 csv 几乎是相同的模式,但有一些差异。如何将架构单独应用于每个 csv 或规范化它们之间的架构?
我从一个大的 csvs zip 文件开始,我在 Palantir Foundry 中解压缩了它。
我现在有一个由多个 csv 组成的数据集(每年一个),其中 csv 几乎是相同的模式,但有一些差异。如何将架构单独应用于每个 csv 或规范化它们之间的架构?
如果您的文件已解压缩并且只是.csv
在数据集中以 s 的形式存在,您可以使用 Spark 的本地spark_session.read.csv
方法,类似于我在此处的回答。
这将如下所示:
from transforms.api import transform, Output, Input
from transforms.verbs.dataframes import union_many
def read_files(spark_session, paths):
parsed_dfs = []
for file_name in paths:
parsed_df = spark_session.read.format('csv').load(file_name)
parsed_dfs += [parsed_df]
output_df = union_many(*parsed_dfs, how="wide")
return output_df
@transform(
the_output=Output("my.awesome.output"),
the_input=Input("my.awesome.input"),
)
def my_compute_function(the_input, the_output, ctx):
session = ctx.spark_session
input_filesystem = the_input.filesystem()
hadoop_path = input_filesystem.hadoop_path
files = [hadoop_path + "/" + file_name.path for file_name in input_filesystem.ls()]
output_df = read_files(session, files)
the_output.write_dataframe(output_df)
请注意,union_many
动词会将您的架构堆叠在一起,因此如果您有许多具有不同架构的文件,则许多行将为空,因为它们仅存在于一个文件中。
如果您知道每个模式的公共字段,并且知道只有一列会更改文件之间的名称,那么您可以更改逻辑以重命名列parsed_df
以协调模式。这将取决于您希望对架构强制执行多少要求。
我还将包含与其他响应相同的测试方法,以便您可以快速验证正确的解析行为。