1

我有如下数据,

n1  d1  un1 mt1 1
n1  d1  un1 mt2 2
n1  d1  un1 mt3 3
n1  d1  un1 mt4 4
n1  d2  un1 mt1 3
n1  d2  un1 mt3 3
n1  d2  un1 mt4 4
n1  d2  un1 mt5 6
n1  d2  un1 mt2 3

我想得到如下输出

n1 d1 un1 0.75
n1 d2 un1 1.5

i,e 在第 1、第 2 和第 3 列上进行分组,对于第 4 列,请遵循以下公式, 第 4 列 = 在组内,(mt1+mt2)/mt4

我正在尝试对 Spark DF 做同样的事情,假设数据在数据帧 a 中,列名为 n,d,un,mt,r 我正在尝试这个。

sqlContext.udf.register("aggUDF",(v:List(mt,r))=> ?)
val b = a.groupBy("n","d","un").agg(callUdf("aggUDF",List((mt,r)) should go here))
4

2 回答 2

4

如果我理解正确,您首先要计算 mt1 和 mt2 的行总和,然后除以 mt4 中每个不同 n1、d1、un1 的总和。

虽然可以使用上面回答的自定义聚合函数,但您也可以使用一点蛮力(我将在 pyspark 中展示它,但您应该能够轻松转换为 scala)。

假设您的原始数据框称为 df 并且列按顺序排列:n,d,un,mt,r

首先为 mt1、mt2 和 mt4 分别创建一个新列,如下所示:

from pyspark.sql import functions as F
newdf = df.withColumn("mt1", when(df.mt == "mt1", df.r).otherwise(0).alias("mt1"))
newdf = newdf .withColumn("mt2", when(newdf.mt == "mt2", newdf .r).otherwise(0).alias("mt2"))
newdf = newdf .withColumn("mt4", when(newdf.mt == "mt4", newdf .r).otherwise(0).alias("mt4"))

现在对前 3 个值进行分组,并作为聚合对新的 3 个值求和。

aggregated = newdf.groupBy(["n","d","n"]).agg(F.sum(newdf.mt1).alias("sum_mt1"),F.sum(newdf.mt2).alias("sum_mt2"), F.sum(newdf.mt4).alias("sum_mt4"))

现在只需进行计算:

final = aggregated.withColumn("res", (aggregated.sum_mt1 +  aggregated.sum_mt2) / aggregated.sum_mt4)    

不是最优雅的解决方案,但它可能对你有用......

于 2016-03-13T11:40:39.653 回答
0

目前(Spark 1.4)不支持自定义聚合函数。但是,您可以使用 Hive UDAF。您可以在此处查看Spark 中 Hive 用户定义聚合函数 (UDAF) 的示例。

于 2015-08-27T05:21:13.950 回答