我有以下脚本返回正确排序的结果:
from transforms.api import transform, Output
from pyspark.sql import functions as F
@transform(
out=Output("ri.foundry.main.dataset.29fdbff7-168a-457d-bb79-8f7508cede9d"),
)
def compute(out, ctx):
data = [("1", "2022-02-01", "older"),
("1", "2022-02-12", "older"),
("1", "2022-02-09", "new")]
df_inp = (
ctx.spark_session.createDataFrame(data, ["c1", "date", "record_status"])
.withColumn("date", F.to_date("date"))
.withColumn("record_status", F.lit("older"))
)
df_upd = (
ctx.spark_session.createDataFrame([('1',)], ['c1'])
.withColumn('date', F.to_date(F.lit('2022-02-17')))
.withColumn('record_status', F.lit('new'))
)
df = df_inp.unionByName(df_upd)
df = df.coalesce(1)
df = df.sort(F.desc('date'))
out.write_dataframe(df)
通知df = df.coalesce(1)
前sort
。
问题。由于两者都df.coalesce(1)
应该df.repartition(1)
导致一个分区,所以我尝试df = df.coalesce(1)
用df = df.repartition(1)
. 但随后结果出现未排序。为什么?
额外细节
如果我不干扰分区,结果也显示为未排序:
物理计划使用coalesce(1)
:
+- *(3) Sort [date#6 DESC NULLS LAST], true, 0
+- Coalesce 1
+- Union
:- *(1) Project [c1#0, cast(date#1 as date) AS date#6, older AS record_status#10]
: +- *(1) Scan ExistingRDD[c1#0,date#1,record_status#2]
+- *(2) Project [c1#14, 19040 AS date#16, new AS record_status#19]
+- *(2) Scan ExistingRDD[c1#14]
物理计划使用repartition(1)
:
+- *(3) Sort [date#6 DESC NULLS LAST], true, 0
+- CustomShuffleReader coalesced
+- ShuffleQueryStage 1
+- Exchange rangepartitioning(date#6 DESC NULLS LAST, 200), ENSURE_REQUIREMENTS, [id=#85]
+- ShuffleQueryStage 0
+- Exchange RoundRobinPartitioning(1), REPARTITION_WITH_NUM, [id=#83]
+- Union
:- *(1) Project [c1#0, cast(date#1 as date) AS date#6, older AS record_status#10]
: +- *(1) Scan ExistingRDD[c1#0,date#1,record_status#2]
+- *(2) Project [c1#14, 19040 AS date#16, new AS record_status#19]
+- *(2) Scan ExistingRDD[c1#14]
我知道repartition(1) 和 coalesce(1) 之间的问题,这个人说他由于某种原因不能使用coalesce(1)
。就我而言,情况恰恰相反。