5

我正在使用 Spark 数据集(Spark 1.6.1 版本)。下面是我的代码

object App { 

val conf = new SparkConf()
.setMaster("local")
.setAppName("SparkETL")

val sc = new SparkContext(conf)
sc.setLogLevel("ERROR")
val sqlContext = new SQLContext(sc);
import sqlContext.implicits._

}

override def readDataTable(tableName:String):DataFrame={
val dataFrame= App.sqlContext.read.jdbc(JDBC_URL, tableName, JDBC_PROP);
return dataFrame;
}


case class Student(stud_id , sname , saddress)
case class Student(classid, stud_id, name)


var tbl_student = JobSqlDAO.readDataTable("tbl_student").filter("stud_id = '" + studId + "'").as[Student].as("tbl_student")

var tbl_class_student = JobSqlDAO.readDataTable("tbl_class_student").as[StudentClass].as("tbl_class_student")


 var result = tbl_class_student.joinWith(tbl_student, $"tbl_student.stud_id" === $"tbl_class_student.stud_id").as("ff")

现在我想在多个列上执行 group by 子句?怎么做? result.groupBy(_._1._1.created_at)这样我可以吗?如果是,那么我无法将结果视为一个组,如何在多列上执行此操作?

4

1 回答 1

0

如果我正确理解了您的要求,那么您最好的选择是在PairRDDFunctions类中使用reduceByKey函数。

该函数的签名是 def reduceByKey(func: (V, V) ⇒ V): RDD[(K, V)],它只是意味着您使用一系列键/值对。

让我解释一下工作流程:

  1. 您检索要使用的集合(在您的代码中result:)
  2. 使用 RDDmap函数,您可以将结果集拆分为一个元组,该元组包含两个子元组,其中包含组成键的字段和要聚合的字段(例如result.map(row => ((row.key1, row.key2), (row.value1, row.value2)):)
  3. 现在你有一个 RDD[(K,V)] 其中类型 K 是键字段元组的类型,V 是值字段元组的类型
  4. 您可以reduceByKey通过传递(V,V) => V聚合值类型的函数来直接使用(例如(agg: (Int, Int), val: (Int, Int)) => (agg._1 + val._1, agg._2 + val._2):)

请注意:

  • 您必须从聚合函数返回相同的值类型
  • 您必须导入org.apache.spark.SparkContext._才能自动使用 PairRDDFunctions 实用程序函数
  • 同样的道理groupBy,您必须从起始 RDD 映射到一对RDD[K,V],但您没有聚合函数,因为您只是将值存储在 seq 中以供进一步计算
  • 如果您需要聚合的起始值(例如:0 表示计数),请使用代替foldByKey函数
于 2016-06-27T13:19:08.567 回答