18

我在 Scala 中使用 Spark,我的聚合列是匿名的。有没有一种方便的方法来重命名数据集中的多个列?我考虑过强加一个模式,as但键列是一个结构(由于groupBy操作),我不知道如何在其中定义case classa StructType

我尝试按如下方式定义模式:

val returnSchema = StructType(StructField("edge", StructType(StructField("src", IntegerType, true),
                                                             StructField("dst", IntegerType), true)), 
                              StructField("count", LongType, true))
edge_count.as[returnSchema]

但我得到一个编译错误:

Message: <console>:74: error: overloaded method value apply with alternatives:
  (fields: Array[org.apache.spark.sql.types.StructField])org.apache.spark.sql.types.StructType <and>
  (fields: java.util.List[org.apache.spark.sql.types.StructField])org.apache.spark.sql.types.StructType <and>
  (fields: Seq[org.apache.spark.sql.types.StructField])org.apache.spark.sql.types.StructType
 cannot be applied to (org.apache.spark.sql.types.StructField, org.apache.spark.sql.types.StructField, Boolean)
       val returnSchema = StructType(StructField("edge", StructType(StructField("src", IntegerType, true),
4

3 回答 3

20

最好的解决方案是明确命名您的列,例如,

df
  .groupBy('a, 'b)
  .agg(
    expr("count(*) as cnt"),
    expr("sum(x) as x"),
    expr("sum(y)").as("y")
  )

如果您使用的是数据集,则必须提供列的类型,例如expr("count(*) as cnt").as[Long].

您可以直接使用 DSL,但我经常发现它比简单的 SQL 表达式更冗长。

如果要进行批量重命名,请使用 a Map,然后foldLeft使用数据框。

于 2016-07-25T20:20:58.043 回答
2

我最终在语句中使用了aliases ;select例如,

ds.select($"key.src".as[Short], 
          $"key.dst".as[Short], 
          $"sum(count)".alias("count").as[Long])

首先,我必须使用printSchema来确定派生列名:

> ds.printSchema

root
 |-- key: struct (nullable = false)
 |    |-- src: short (nullable = false)
 |    |-- dst: short (nullable = false)
 |-- sum(count): long (nullable = true)
于 2016-07-27T18:43:30.097 回答
0

我同意 Sim 的回答,即最方便的方法是在创建时明确命名列。这只是另一种为列命名的方法(不使用expr):

import org.apache.spark.sql.functions._
import spark.implicits._

val df = Seq(
  (1, "a"),
  (2, "b"),
  (3, "c")
).toDF("number", "word")

val aggDf = df.agg(
  collect_list(struct(col("number"), col("word"))) as "myStruct",
  sum(col("number")) as "mySum",
  count(col("*")) as "myCnt"
)

aggDf.printSchema

// |-- myStruct: array (nullable = true)
// |    |-- element: struct (containsNull = true)
// |    |    |-- number: integer (nullable = false)
// |    |    |-- word: string (nullable = true)
// |-- mySum: long (nullable = true)
// |-- myCnt: long (nullable = false)

aggDf.show()

// +------------------------+-----+-----+
// |myStruct                |mySum|myCnt|
// +------------------------+-----+-----+
// |[[1, a], [2, b], [3, c]]|6    |3    |
// +------------------------+-----+-----+
于 2021-10-19T19:44:54.717 回答