最终结果按列'timestamp'排序。我有两个脚本,它们仅在提供给“record_status”列(“旧”与“旧”)的一个值上有所不同。由于数据按列'timestamp'排序,因此结果顺序应该相同。但是,顺序不同。看起来,在第一种情况下,排序在联合之前执行,而在联合之后。
使用orderBy
而不是sort
没有任何区别。
为什么会发生以及如何预防?
Script1(完整)- 4 次运行(构建)后的结果:
from transforms.api import transform, Output, incremental
from pyspark.sql import functions as F, types as T
@incremental(
require_incremental=True,
)
@transform(
out=Output("ri.foundry.main.dataset.a82be5aa-81f7-45cf-8c59-05912c8ed6c7"),
)
def compute(out, ctx):
out_schema = T.StructType([
T.StructField('c1', T.StringType()),
T.StructField('timestamp', T.TimestampType()),
T.StructField('record_status', T.StringType()),
])
df_out = (
out.dataframe('previous', out_schema)
.withColumn('record_status', F.lit('older'))
)
df_upd = (
ctx.spark_session.createDataFrame([('1',)], ['c1'])
.withColumn('timestamp', F.current_timestamp())
.withColumn('record_status', F.lit('new'))
)
df = df_out.unionByName(df_upd)
df = df.sort('timestamp', ascending=False)
out.set_mode('replace')
out.write_dataframe(df)
Script2(完整)- 4 次运行(构建)后的结果:
from transforms.api import transform, Output, incremental
from pyspark.sql import functions as F, types as T
@incremental(
require_incremental=True,
)
@transform(
out=Output("ri.foundry.main.dataset.caee8f7a-64b0-4837-b4f3-d5a6d5dedd85"),
)
def compute(out, ctx):
out_schema = T.StructType([
T.StructField('c1', T.StringType()),
T.StructField('timestamp', T.TimestampType()),
T.StructField('record_status', T.StringType()),
])
df_out = (
out.dataframe('previous', out_schema)
.withColumn('record_status', F.lit('old'))
)
df_upd = (
ctx.spark_session.createDataFrame([('1',)], ['c1'])
.withColumn('timestamp', F.current_timestamp())
.withColumn('record_status', F.lit('new'))
)
df = df_out.unionByName(df_upd)
df = df.sort('timestamp', ascending=False)
out.set_mode('replace')
out.write_dataframe(df)
两种转换中的查询计划都表明必须在联合之后执行排序(检查逻辑计划和物理计划,除了 ID 和 RID 之外,我没有发现任何差异,但所有转换步骤都在同一个地方):
观察:
使用以下配置文件排序效果很好(查询计划不会改变):
@configure(["KUBERNETES_NO_EXECUTORS_SMALL"])