2

我有一个文件夹,我每个月都会上传一个文件。该文件将在每个月具有相同的格式。

第一个问题

这个想法是将这个文件夹中的所有文件连接到一个文件中。目前我正在硬编码文件名(文件名 [0]、文件名 [1]、文件名 [2]..),但想象一下我以后会有 50 个文件,我应该将它们显式添加到 transform_df 装饰器中吗?有没有其他方法来处理这个?

第二个问题:

目前,我假设有 4 个文件(2021_07、2021_08、2021_09、2021_10),每当我添加显示 2021_12 数据的文件时,我都希望避免更改代码。如果我添加input_5 = Input(path_to_2021_12_do_not_exists) 代码将不会运行并给出错误。

如果没有每月手动向我的代码添加一个新值,我如何为将来的文件实现代码并让代码忽略输入(如果它不存在)?

谢谢

# from pyspark.sql import functions as F
from transforms.api import transform_df, Input, Output
from pyspark.sql.functions import to_date, year, col
from pyspark.sql.types import StringType
from myproject.datasets import utils
from pyspark.sql import DataFrame
from functools import reduce


input_dir = '/Company/Project_name/'
prefix_filename = 'DataInput1_'
suffixes = ['2021_07', '2021_08', '2021_09', '2021_10', '2021_11', '2021_12']

filenames = [input_dir + prefix_filename + suffixe for suffixe in suffixes]


@transform_df(
    Output("/Company/Project_name/Data/clean/File_concat"),
    input_1=Input(filenames[0]),
    input_2=Input(filenames[1]),
    input_3=Input(filenames[2]),
    input_4=Input(filenames[3]),
    )
def compute(input_1, input_2, input_3, input_4):
    input_dfs = [input_1, input_2, input_3, input_4]
    dfs = []

    def transformation_input(df):
        # some transformation
        return df
    for input_df in input_dfs:
        dfs.append(transformation_input(input_df))

    dfs = reduce(DataFrame.unionByName, dfs)
    return dfs
4

2 回答 2

2

这个问题出现了很多,简单的答案是你没有。定义数据集并在其上执行构建是在不同阶段执行的两个不同步骤。

每当您提交代码并运行检查时renderSchrinkwrap,除了计算部分之外,您的整个 Python 代码都会在该阶段执行。这允许 Foundry 发现存在哪些数据集并发布。

发布涉及创建数据集并将计算函数中的任何内容发布到数据集的作业规范中,因此代工厂知道每当您运行构建时要执行哪些代码。

一旦你在数据集上构建,Foundry 只会选择作业规范中的任何内容并执行它。在您的检查期间,任何其他代码都已经运行过,并且只运行过一次。

因此,任何动态输入/输出都需要您对 repo 重新运行检查,这意味着必须进行一些代码更改,因为检查是 CI 过程的一部分,而不是构建的一部分。

于 2022-02-16T16:44:29.960 回答
0

退后一步,假设您的每个输入文件具有相同的架构,Foundry 会希望您将所有这些文件作为附加事务放在同一个数据集中。

这可能是不可能的,例如,如果数据“年份”的唯一指示嵌入在文件名中,但您的示例代码将表明您希望所有这些数据集具有相同的模式并轻松合并在一起。

您可以通过数据集预览手动执行此操作 - 只需使用上传文件按钮或将新文件拖放到预览窗口中 - 或者,如果它是“最终用户”工作流程,则使用 Workshop 应用程序中的文件上传小部件. 如果此小部件不可用,您可能需要与您的 Foundry 支持团队协调。

于 2022-02-16T17:07:28.510 回答