3

我想在 Palantir Foundry 中合并多个数据集,数据集的名称是动态的,因此我无法transform_df()静态给出数据集名称。有没有一种方法可以动态地将多个输入输入transform_df并合并所有这些数据帧?

我尝试遍历数据集,例如:

li = ['dataset1_path', 'dataset2_path']

union_df = None
for p in li:
  @transforms_df(
    my_input = Input(p), 
    Output(p+"_output")
  )
  def my_compute_function(my_input):
    return my_input

  if union_df is None:
    union_df = my_compute_function
  else:
    union_df = union_df.union(my_compute_function)

但是,这不会产生联合输出。

4

3 回答 3

3

这应该能够对您进行一些更改,这是带有 json 文件的动态数据集的示例,您的情况可能只会略有不同。这是一种通用的方法,您可以执行动态 json 输入数据集,该数据集应该适用于您可以指定的任何类型的动态输入文件类型或内部铸造数据集。这个通用示例正在处理上传到平台中数据集节点的一组 json 文件。这应该是完全动态的。在这之后做一个工会应该是一件简单的事情。

这里也有一些奖金记录。

希望这可以帮助

from transforms.api import Input, Output, transform
from pyspark.sql import functions as F
import json
import logging


def transform_generator():
    transforms = []
    transf_dict = {## enter your dynamic mappings here ##}

    for value in transf_dict:
        @transform(
            out=Output(' path to your output here '.format(val=value)),
            inpt=Input(" path to input here ".format(val=value)),
        )
        def update_set(ctx, inpt, out):
            spark = ctx.spark_session
            sc = spark.sparkContext

            filesystem = list(inpt.filesystem().ls())
            file_dates = []
            for files in filesystem:
                with inpt.filesystem().open(files.path) as fi:
                    data = json.load(fi)
                file_dates.append(data)

            logging.info('info logs:')
            logging.info(file_dates)
            json_object = json.dumps(file_dates)
            df_2 = spark.read.option("multiline", "true").json(sc.parallelize([json_object]))
            df_2 = df_2.withColumn('upload_date', F.current_date())

            df_2.drop_duplicates()
            out.write_dataframe(df_2)
        transforms.append(update_logs)
    return transforms


TRANSFORMS = transform_generator()
于 2021-03-29T15:03:09.773 回答
1

所以这个问题分为两个问题。

如何使用编程输入路径处理转换

要处理带有程序输入的转换,重要的是要记住两件事:

1st - 转换将在 CI 时确定您的输入和输出。这意味着您可以拥有生成转换的 Python 代码,但不能从数据集中读取路径,它们需要硬编码到生成转换的 Python 代码中。

2nd - 您的转换将在 CI 执行期间创建一次。这意味着无论何时构建数据集,您都无法使用增量或特殊逻辑来生成不同的路径。

有了这两个前提,就像在您的示例或@jeremy-david-gamet 的(回复的ty,给了您+1)中,您可以拥有在CI 时生成路径的python 代码。

dataset_paths = ['dataset1_path', 'dataset2_path']

for path in dataset_paths:
  @transforms_df(
    my_input = Input(path), 
    Output(f"{path}_output")
  )
  def my_compute_function(my_input):
    return my_input

但是要将它们联合起来,您需要第二次转换来执行联合,您需要传递多个输入,因此您可以使用*argsor **kwargs

dataset_paths = ['dataset1_path', 'dataset2_path']

all_args = [Input(path) for path in dataset_paths]
all_args.append(Output("path/to/unioned_dataset"))
@transforms_df(*all_args)
def my_compute_function(*args):
    input_dfs = []
    for arg in args:
       # there are other arguments like ctx in the args list, so we need  to check for type. You can also use kwargs for more determinism.
       if isinstance(arg, pyspark.sql.DataFrame):
            input_dfs.append(arg)
    
    # now that you have your dfs in a list you can union them
    # Note I didn't test this code, but it should be something like this
    ...

如何合并具有不同模式的数据集。

对于这一部分,有很多关于如何在 spark 中合并不同数据帧的问答。这是从https://stackoverflow.com/a/55461824/26004复制的简短代码示例

from pyspark.sql import SparkSession, HiveContext
from pyspark.sql.functions import lit
from pyspark.sql import Row

def customUnion(df1, df2):
    cols1 = df1.columns
    cols2 = df2.columns
    total_cols = sorted(cols1 + list(set(cols2) - set(cols1)))
    def expr(mycols, allcols):
        def processCols(colname):
            if colname in mycols:
                return colname
            else:
                return lit(None).alias(colname)
        cols = map(processCols, allcols)
        return list(cols)
    appended = df1.select(expr(cols1, total_cols)).union(df2.select(expr(cols2, total_cols)))
    return appended
于 2021-03-31T09:39:36.777 回答
0

由于输入和输出是在 CI 时确定的,因此我们无法形成真正的动态输入。我们将不得不以某种方式指向代码中的特定数据集。假设数据集的路径共享相同的根,以下似乎需要最少的维护:

from transforms.api import transform_df, Input, Output
from functools import reduce


datasets = [
    'dataset1',
    'dataset2',
    'dataset3',
]
inputs = {f'inp{i}': Input(f'input/folder/path/{x}') for i, x in enumerate(datasets)}
kwargs = {
    **{'output': Output('output/folder/path/unioned_dataset')},
    **inputs
}


@transform_df(**kwargs)
def my_compute_function(**inputs):
    unioned_df = reduce(lambda df1, df2: df1.unionByName(df2), inputs.values())
    return unioned_df

关于不同模式的联合,因为Spark 3.1可以使用这个

df1.unionByName(df2, allowMissingColumns=True)
于 2021-08-05T13:40:23.973 回答