0

我正在努力理解如何制作数据集模式。我有一个来自聚合的数据集,其中一列中的键元组,第二列中的聚合:

> ds.show
+------+------+
|    _1|    _2|
+------+------+
|[96,0]| 93439|
|[69,0]|174386|
|[42,0]| 12427|
|[15,0]|  2090|
|[80,0]|  2626|
|[91,0]| 71963|
|[64,0]|   191|
|[37,0]|    13|
|[48,0]| 13898|
|[21,0]|  2510|
|[59,0]|  1874|
|[32,0]|   373|
| [5,0]|  1075|
|[97,0]|     2|
|[16,0]|   492|
|[11,0]| 34040|
|[76,0]|     4|
|[22,0]|  1216|
|[60,0]|   522|
|[33,0]|   287|
+------+------+
only showing top 20 rows

> ds.schema
StructType(StructField(_1,StructType(StructField(src,IntegerType,false), StructField(dst,IntegerType,false)),true), StructField(_2,LongType,false))

为什么我不能应用此架构?

> val mySchema = StructType(StructField("_1",StructType(StructField("src",IntegerType,false), 
                                                        StructField("dst",IntegerType,false)),true), 
                            StructField("_2",LongType,false))
> ds.as[mySchema]

Name: Compile Error
Message: <console>:41: error: overloaded method value apply with alternatives:
  (fields: Array[org.apache.spark.sql.types.StructField])org.apache.spark.sql.types.StructType <and>
  (fields: java.util.List[org.apache.spark.sql.types.StructField])org.apache.spark.sql.types.StructType <and>
  (fields: Seq[org.apache.spark.sql.types.StructField])org.apache.spark.sql.types.StructType
 cannot be applied to (org.apache.spark.sql.types.StructField, org.apache.spark.sql.types.StructField)
       val returnSchema = StructType(StructField("_1",StructType(StructField("src",IntegerType,false),      
                                                      ^

我也没有成功地试图反映一个 Scala case class

> final case class AggSchema(edge: (Int, Int), count: Long)
> ds.as[AggSchema]

Name: org.apache.spark.sql.catalyst.analysis.UnresolvedException
Message: Invalid call to dataType on unresolved object, tree: 'edge
StackTrace: org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.dataType(unresolved.scala:59)
...
4

1 回答 1

1

第一种方法不是因为模式不是一种类型。Schema 只是描述列的 Catalyst 类型的对象。换句话说,模式只是解释存储在DataFrame. 没有它只不过DataFrame是一个.Dataset[Row]o.a.s.sql.RowSeq[Any]

第二种方法不起作用,因为字段的名称与架构不匹配。由于列的名称。

case class Edge(src: Int, dst: Int)

val df = Seq((Edge(96, 0), 93439L)).toDF

要么根本不使用名称:

df.as[((Int, Int), Long)]

或使用与用例类匹配的模式,例如

case class AggSchema(edge: Edge, count: Long)

df.toDF("edge", "count").as[AggSchema]
于 2016-07-26T22:09:52.947 回答