所以我有这个示例代码,我有一个Dataset[Event]
我想根据动态计算的通用类型的键来分组的代码。
import org.apache.spark.sql.{ Dataset, KeyValueGroupedDataset }
case class Event(id: Int, name: String)
trait Key
case class NameKey(name: String) extends Key
abstract class EventProc[K <: Key] {
def key(e: Event): K
def group(ds: Dataset[Event]): KeyValueGroupedDataset[K, Event] = {
import ds.sparkSession.implicits._
ds.groupByKey { e => key(e) }
}
}
class NameEventProc extends EventProc[NameKey] {
def key(e: Event): NameKey = NameKey(e.name)
}
这个想法是我应该能够对new NameEventProc().group(ds)
扩展的不同类执行 a 或类似操作EventProc
。但是代码甚至没有编译并因错误而失败。
<console>:26: error: Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.
ds.groupByKey { e => key(e) }
^
根据我收集的信息,Spark 无法理解类型K
是什么,因此无法使用适当的编码器。但我不知道如何解决这个问题。