我对 Spark 2.0 DataSets 非常满意,因为它具有编译时类型安全性。但是这里有几个我无法解决的问题,我也没有找到好的文档。
问题 #1 - 聚合列上的除法操作 - 考虑下面的代码 - 我有一个 DataSet[MyCaseClass],我想在 c1、c2、c3 和 sum(c4) / 8 上进行 groupByKey。如果我只计算以下代码,则效果很好sum 但它给出了divide(8)的编译时间错误。我想知道如何实现以下目标。
final case class MyClass (c1: String,
c2: String,
c3: String,
c4: Double)
val myCaseClass: DataSet[MyCaseClass] = ??? // assume it's being loaded
import sparkSession.implicits._
import org.apache.spark.sql.expressions.scalalang.typed.{sum => typedSum}
myCaseClass.
groupByKey(myCaseClass =>
(myCaseClass.c1, myCaseClass.c2, myCaseClass.c3)).
agg(typedSum[MyCaseClass](_.c4).name("sum(c4)").
divide(8)). //this is breaking with exception
show()
如果我删除 .divide(8) 操作并运行上面的命令,它会给我下面的输出。
+-----------+-------------+
| key|sum(c4) |
+-----------+-------------+
| [A1,F2,S1]| 80.0|
| [A1,F1,S1]| 40.0|
+-----------+-------------+
问题 #2 - 将 groupedByKey 结果转换为另一个 Typed DataFrame - 现在我的问题的第二部分是我想再次输出一个 typed DataSet。为此,我有另一个案例类(不确定是否需要),但我不确定如何使用分组结果进行映射 -
final case class AnotherClass(c1: String,
c2: String,
c3: String,
average: Double)
myCaseClass.
groupByKey(myCaseClass =>
(myCaseClass.c1, myCaseClass.c2, myCaseClass.c3)).
agg(typedSum[MyCaseClass](_.c4).name("sum(c4)")).
as[AnotherClass] //this is breaking with exception
但这再次失败并出现异常,因为按关键结果分组没有直接与 AnotherClass 映射。
PS:任何其他实现上述目标的解决方案都非常受欢迎。