0

所以我有这个示例代码,我有一个Dataset[Event]我想根据动态计算的通用类型的键来分组的代码。

import org.apache.spark.sql.{ Dataset, KeyValueGroupedDataset }

case class Event(id: Int, name: String)
trait Key
case class NameKey(name: String) extends Key

abstract class EventProc[K <: Key] {
  def key(e: Event): K
  def group(ds: Dataset[Event]): KeyValueGroupedDataset[K, Event] = {
    import ds.sparkSession.implicits._
    ds.groupByKey { e => key(e) }
  }
}
class NameEventProc extends EventProc[NameKey] {
  def key(e: Event): NameKey = NameKey(e.name)
}

这个想法是我应该能够对new NameEventProc().group(ds)扩展的不同类执行 a 或类似操作EventProc。但是代码甚至没有编译并因错误而失败。

<console>:26: error: Unable to find encoder for type stored in a Dataset.  Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support for serializing other types will be added in future releases.
       ds.groupByKey { e => key(e) }
                     ^

根据我收集的信息,Spark 无法理解类型K是什么,因此无法使用适当的编码器。但我不知道如何解决这个问题。

4

0 回答 0