3

我不断收到以下编译时错误:

找不到存储在数据集中的类型的编码器。  
原始类型(Int、String 等)和产品类型(案例类)
通过导入 spark.implicits._ 来支持  
未来版本中将添加对序列化其他类型的支持。

我刚从 Spark v1.6 升级到 v2.0.2,一大堆代码DataFrame都在抱怨这个错误。它抱怨的代码如下所示。

def doSomething(data: DataFrame): Unit = {
 data.flatMap(row => {
  ...
 })
 .reduceByKey(_ + _)
 .sortByKey(ascending = false)
}

以前的 SO 帖子建议

但是,我没有任何 case 类,因为我使用DataFrame的是等于DataSet[Row],而且,我已经按如下方式内联了 2 个隐式导入,而没有任何帮助来消除此消息。

val sparkSession: SparkSession = ???
val sqlContext: SQLContext = ???

import sparkSession.implicits._
import sqlContext.implicits._

请注意,我查看了DataSetEncoder的文档。文档说如下。

斯卡拉

编码器通常是通过隐式自动创建的
SparkSession,或者可以通过调用静态方法显式创建
编码器。

导入 spark.implicits._

val ds = Seq(1, 2, 3).toDS() // 隐式提供 (spark.implicits.newIntEncoder)

但是,我的方法无法访问SparkSession. 另外,当我尝试那条线时import spark.implicits._,IntelliJ 甚至找不到它。当我说我的 DataFrame 是 DataSet[Row] 时,我是认真的。

这个问题被标记为可能重复,但请让我澄清一下。

  • 我没有关联的案例类或业务对象。
  • 我正在使用 .flatMap 而另一个问题是使用 .map
  • 隐式导入似乎没有帮助
  • 传递 RowEncoder 会产生编译时错误,例如data.flatMap(row => { ... }, RowEncoder(data.schema))(参数过多)

我正在阅读其他帖子,让我补充一下,我想我不知道这个新的 Spark 2.0 Datasets/DataFrame API 应该如何工作。在 Spark shell 中,下面的代码有效。请注意,我像这样启动 spark shell$SPARK_HOME/bin/spark-shell --packages com.databricks:spark-csv_2.10:1.5.0

val schema = StructType(Array(
 StructField("x1", StringType, true),
 StructField("x2", StringType, true),
 StructField("x3", StringType, true),
 StructField("x4", StringType, true),
 StructField("x5", StringType, true)))

val df = sqlContext.read
 .format("com.databricks.spark.csv")
 .option("header", "true")
 .schema(schema)
 .load("/Users/jwayne/Downloads/mydata.csv")

df.columns.map(col => {
 df.groupBy(col)
   .count()
   .map(_.getString(0))
   .collect()
   .toList
 })
 .toList

但是,当我将其作为测试单元的一部分运行时,我会遇到同样的无法找到编码器错误。为什么这在 shell 中有效,但在我的测试单元中无效?

在 shell 中,我输入:imports并将:implicits它们放在我的 scala 文件/源中,但这也无济于事。

4

0 回答 0