我有一个文件夹,我每个月都会上传一个文件。该文件将在每个月具有相同的格式。
第一个问题
这个想法是将这个文件夹中的所有文件连接到一个文件中。目前我正在硬编码文件名(文件名 [0]、文件名 [1]、文件名 [2]..),但想象一下我以后会有 50 个文件,我应该将它们显式添加到 transform_df 装饰器中吗?有没有其他方法来处理这个?
第二个问题:
目前,我假设有 4 个文件(2021_07、2021_08、2021_09、2021_10),每当我添加显示 2021_12 数据的文件时,我都希望避免更改代码。如果我添加input_5 = Input(path_to_2021_12_do_not_exists)
代码将不会运行并给出错误。
如果没有每月手动向我的代码添加一个新值,我如何为将来的文件实现代码并让代码忽略输入(如果它不存在)?
谢谢
# from pyspark.sql import functions as F
from transforms.api import transform_df, Input, Output
from pyspark.sql.functions import to_date, year, col
from pyspark.sql.types import StringType
from myproject.datasets import utils
from pyspark.sql import DataFrame
from functools import reduce
input_dir = '/Company/Project_name/'
prefix_filename = 'DataInput1_'
suffixes = ['2021_07', '2021_08', '2021_09', '2021_10', '2021_11', '2021_12']
filenames = [input_dir + prefix_filename + suffixe for suffixe in suffixes]
@transform_df(
Output("/Company/Project_name/Data/clean/File_concat"),
input_1=Input(filenames[0]),
input_2=Input(filenames[1]),
input_3=Input(filenames[2]),
input_4=Input(filenames[3]),
)
def compute(input_1, input_2, input_3, input_4):
input_dfs = [input_1, input_2, input_3, input_4]
dfs = []
def transformation_input(df):
# some transformation
return df
for input_df in input_dfs:
dfs.append(transformation_input(input_df))
dfs = reduce(DataFrame.unionByName, dfs)
return dfs