我不断收到以下编译时错误:
找不到存储在数据集中的类型的编码器。 原始类型(Int、String 等)和产品类型(案例类) 通过导入 spark.implicits._ 来支持 未来版本中将添加对序列化其他类型的支持。
我刚从 Spark v1.6 升级到 v2.0.2,一大堆代码DataFrame
都在抱怨这个错误。它抱怨的代码如下所示。
def doSomething(data: DataFrame): Unit = {
data.flatMap(row => {
...
})
.reduceByKey(_ + _)
.sortByKey(ascending = false)
}
以前的 SO 帖子建议
但是,我没有任何 case 类,因为我使用DataFrame
的是等于DataSet[Row]
,而且,我已经按如下方式内联了 2 个隐式导入,而没有任何帮助来消除此消息。
val sparkSession: SparkSession = ???
val sqlContext: SQLContext = ???
import sparkSession.implicits._
import sqlContext.implicits._
请注意,我查看了DataSet和Encoder的文档。文档说如下。
斯卡拉 编码器通常是通过隐式自动创建的 SparkSession,或者可以通过调用静态方法显式创建 编码器。 导入 spark.implicits._ val ds = Seq(1, 2, 3).toDS() // 隐式提供 (spark.implicits.newIntEncoder)
但是,我的方法无法访问SparkSession
. 另外,当我尝试那条线时import spark.implicits._
,IntelliJ 甚至找不到它。当我说我的 DataFrame 是 DataSet[Row] 时,我是认真的。
这个问题被标记为可能重复,但请让我澄清一下。
- 我没有关联的案例类或业务对象。
- 我正在使用 .flatMap 而另一个问题是使用 .map
- 隐式导入似乎没有帮助
- 传递 RowEncoder 会产生编译时错误,例如
data.flatMap(row => { ... }, RowEncoder(data.schema))
(参数过多)
我正在阅读其他帖子,让我补充一下,我想我不知道这个新的 Spark 2.0 Datasets/DataFrame API 应该如何工作。在 Spark shell 中,下面的代码有效。请注意,我像这样启动 spark shell$SPARK_HOME/bin/spark-shell --packages com.databricks:spark-csv_2.10:1.5.0
val schema = StructType(Array(
StructField("x1", StringType, true),
StructField("x2", StringType, true),
StructField("x3", StringType, true),
StructField("x4", StringType, true),
StructField("x5", StringType, true)))
val df = sqlContext.read
.format("com.databricks.spark.csv")
.option("header", "true")
.schema(schema)
.load("/Users/jwayne/Downloads/mydata.csv")
df.columns.map(col => {
df.groupBy(col)
.count()
.map(_.getString(0))
.collect()
.toList
})
.toList
但是,当我将其作为测试单元的一部分运行时,我会遇到同样的无法找到编码器错误。为什么这在 shell 中有效,但在我的测试单元中无效?
在 shell 中,我输入:imports
并将:implicits
它们放在我的 scala 文件/源中,但这也无济于事。