3

最终结果按列'timestamp'排序。我有两个脚本,它们在提供给“record_status”列(“旧”与“旧”)的一个值上有所不同。由于数据按列'timestamp'排序,因此结果顺序应该相同。但是,顺序不同。看起来,在第一种情况下,排序在联合之前执行,而在联合之后。

使用orderBy而不是sort没有任何区别。

为什么会发生以及如何预防?

Script1(完整)- 4 次运行(构建)后的结果:

在此处输入图像描述

from transforms.api import transform, Output, incremental
from pyspark.sql import functions as F, types as T


@incremental(
    require_incremental=True,
)
@transform(
    out=Output("ri.foundry.main.dataset.a82be5aa-81f7-45cf-8c59-05912c8ed6c7"),
)
def compute(out, ctx):

    out_schema = T.StructType([
        T.StructField('c1', T.StringType()),
        T.StructField('timestamp', T.TimestampType()),
        T.StructField('record_status', T.StringType()),
    ])
    df_out = (
        out.dataframe('previous', out_schema)
        .withColumn('record_status', F.lit('older'))
    )

    df_upd = (
        ctx.spark_session.createDataFrame([('1',)], ['c1'])
        .withColumn('timestamp', F.current_timestamp())
        .withColumn('record_status', F.lit('new'))
    )

    df = df_out.unionByName(df_upd)
    df = df.sort('timestamp', ascending=False)

    out.set_mode('replace')
    out.write_dataframe(df)

Script2(完整)- 4 次运行(构建)后的结果:

在此处输入图像描述

from transforms.api import transform, Output, incremental
from pyspark.sql import functions as F, types as T


@incremental(
    require_incremental=True,
)
@transform(
    out=Output("ri.foundry.main.dataset.caee8f7a-64b0-4837-b4f3-d5a6d5dedd85"),
)
def compute(out, ctx):

    out_schema = T.StructType([
        T.StructField('c1', T.StringType()),
        T.StructField('timestamp', T.TimestampType()),
        T.StructField('record_status', T.StringType()),
    ])
    df_out = (
        out.dataframe('previous', out_schema)
        .withColumn('record_status', F.lit('old'))
    )

    df_upd = (
        ctx.spark_session.createDataFrame([('1',)], ['c1'])
        .withColumn('timestamp', F.current_timestamp())
        .withColumn('record_status', F.lit('new'))
    )

    df = df_out.unionByName(df_upd)
    df = df.sort('timestamp', ascending=False)

    out.set_mode('replace')
    out.write_dataframe(df)

两种转换中的查询计划都表明必须在联合之后执行排序(检查逻辑计划和物理计划,除了 ID 和 RID 之外,我没有发现任何差异,但所有转换步骤都在同一个地方):

在此处输入图像描述

观察:
使用以下配置文件排序效果很好(查询计划不会改变):

@configure(["KUBERNETES_NO_EXECUTORS_SMALL"])
4

1 回答 1

2

事实证明,这种行为不是由@incremental. 也可以在常规变换中观察到:

from transforms.api import transform, Output
from pyspark.sql import functions as F


@transform(
    out=Output("ri.foundry.main.dataset.beea7dd2-8da3-4abf-9103-464ec646dc00"),
)
def compute(out, ctx):

    data = [("1", "2022-02-16T17:48:15.653Z", "older"),
            ("1", "2022-02-16T17:46:58.054Z", "older"),
            ("1", "2022-02-16T17:50:50.850Z", "new")]
    df_inp = (
        ctx.spark_session.createDataFrame(data, ["c1", "timestamp", "record_status"])
        .withColumn("timestamp", F.to_timestamp("timestamp"))
        .withColumn("record_status", F.lit("older"))
    )
    df_upd = (
        ctx.spark_session.createDataFrame([('1',)], ['c1'])
        .withColumn('timestamp', F.current_timestamp())
        .withColumn('record_status', F.lit('new'))
    )

    df = df_inp.unionByName(df_upd)
    df = df.sort(F.desc('timestamp'))

    out.write_dataframe(df)

使用没有输入数据集的变换装饰器的排序结果不正确

在问这个问题时,我提供了 2 个脚本:一个包含应该工作sort的脚本,另一个包含失败的脚本sort。现实情况是这两个脚本都不起作用,只是“正确”的脚本需要更多运行才能开始显示不正确的排序顺序:

Foundry 错误排序或 groupby 顺序

原因在于输入dfs的分区。显然,sort并且groupBy仅在分区中执行排序(其中有几个)。由于某种原因,数据不会移动到一个执行器或驱动程序。因此,生成的组合数据集没有统一的排序顺序。这就是为什么使用配置文件“KUBERNETES_NO_EXECUTORS_SMALL”会产生正确的排序顺序(所有操作都在一个节点 - 驱动程序中执行)。

我能找到的唯一解决方案df.coalesce是在该行之前使用df.sort()

df = df_out.unionByName(df_upd)
df = df.coalesce(1)
df = df.sort(F.desc('timestamp'))

我也尝试过使用df = df.repartition(1)代替df = df.coalesce(1),但这没有用(相关的新问题)。

我不确定之前将数据移动到同一节点sort的机制orderBy是否是预期的行为。如果有经验的人对此发表评论,我会很高兴...

于 2022-02-16T22:02:17.870 回答