0

我正在尝试使用mapGroups返回 SparseMatrix 作为列之一来执行聚合,并对列求和。

case class为映射的行创建了一个模式,以便提供列名。矩阵列是键入的org.apache.spark.mllib.linalg.MatrixtoDF如果我在执行聚合 () 之前没有运行,select(sum("mycolumn")我会收到一种类型不匹配错误 ( required: org.apache.spark.sql.TypedColumn[MySchema,?])。如果我包含toDF我会收到另一个类型不匹配错误:cannot resolve 'sum(mycolumn)' due to data type mismatch: function sum requires numeric types, not org.apache.spark.mllib.linalg.MatrixUDT. 那么正确的方法是什么?

4

1 回答 1

2

看起来你在这里至少有两个不同的问题。让我们假设你有Dataset这样的:

val ds = Seq(
  ("foo",  Matrices.dense(3, 2, Array(1.0, 3.0, 5.0, 2.0, 4.0, 6.0))), 
  ("foo",  Matrices.dense(3, 2, Array(1.0, 3.0, 5.0, 2.0, 4.0, 6.0)))
).toDS

选择TypedColumn

  • 使用隐式转换$

    ds.select(col("_1").as[String])
    
  • 使用o.a.s.sql.functions.col

    ds.select(col("_1").as[String])
    

添加矩阵:

  • MLLibMatrix并且MatrixUDT不实现加法。这意味着您将无法sum运行或减少+
  • 您可以使用第三方线性代数库,但 Spark SQL / Spark 数据集不支持此功能

如果你真的想这样做,Datsets你可以尝试做这样的事情:

ds.groupByKey(_._1).mapGroups(
  (key, values) => {
    val matrices = values.map(_._2.toArray)
    val first = matrices.next
    val sum = matrices.foldLeft(first)(
      (acc, m) => acc.zip(m).map { case (x, y) => x + y }
    )
    (key, sum)
})

并映射回矩阵,但我个人只会转换为 RDD 并使用breeze.

于 2016-07-21T23:25:36.427 回答