所以这个问题分为两个问题。
如何使用编程输入路径处理转换
要处理带有程序输入的转换,重要的是要记住两件事:
1st - 转换将在 CI 时确定您的输入和输出。这意味着您可以拥有生成转换的 Python 代码,但不能从数据集中读取路径,它们需要硬编码到生成转换的 Python 代码中。
2nd - 您的转换将在 CI 执行期间创建一次。这意味着无论何时构建数据集,您都无法使用增量或特殊逻辑来生成不同的路径。
有了这两个前提,就像在您的示例或@jeremy-david-gamet 的(回复的ty,给了您+1)中,您可以拥有在CI 时生成路径的python 代码。
dataset_paths = ['dataset1_path', 'dataset2_path']
for path in dataset_paths:
@transforms_df(
my_input = Input(path),
Output(f"{path}_output")
)
def my_compute_function(my_input):
return my_input
但是要将它们联合起来,您需要第二次转换来执行联合,您需要传递多个输入,因此您可以使用*args
or **kwargs
:
dataset_paths = ['dataset1_path', 'dataset2_path']
all_args = [Input(path) for path in dataset_paths]
all_args.append(Output("path/to/unioned_dataset"))
@transforms_df(*all_args)
def my_compute_function(*args):
input_dfs = []
for arg in args:
# there are other arguments like ctx in the args list, so we need to check for type. You can also use kwargs for more determinism.
if isinstance(arg, pyspark.sql.DataFrame):
input_dfs.append(arg)
# now that you have your dfs in a list you can union them
# Note I didn't test this code, but it should be something like this
...
如何合并具有不同模式的数据集。
对于这一部分,有很多关于如何在 spark 中合并不同数据帧的问答。这是从https://stackoverflow.com/a/55461824/26004复制的简短代码示例
from pyspark.sql import SparkSession, HiveContext
from pyspark.sql.functions import lit
from pyspark.sql import Row
def customUnion(df1, df2):
cols1 = df1.columns
cols2 = df2.columns
total_cols = sorted(cols1 + list(set(cols2) - set(cols1)))
def expr(mycols, allcols):
def processCols(colname):
if colname in mycols:
return colname
else:
return lit(None).alias(colname)
cols = map(processCols, allcols)
return list(cols)
appended = df1.select(expr(cols1, total_cols)).union(df2.select(expr(cols2, total_cols)))
return appended