3

在下面的代码中,我试图组合值:

val rdd: org.apache.spark.rdd.RDD[((String), Double)] =
    sc.parallelize(List(
      (("a"), 1.0),
      (("a"), 3.0),
      (("a"), 2.0)
      ))

val reduceByKey = rdd.reduceByKey((a , b) => String.valueOf(a) + String.valueOf(b))

reduceByValue应该包含 (a , 1,3,2) 但收到编译时错误:

Multiple markers at this line - type mismatch; found : String required: Double - type mismatch; found : String 
 required: Double

什么决定了reduce函数的类型?不能转换类型吗?

我可以groupByKey用来达到相同的结果,但只是想了解reduceByKey

4

2 回答 2

7

不,给定 type 的 rdd RDD[(K,V)]reduceByKey将采用 type 的关联函数(V,V) => V

如果我们想应用减少将值的类型更改为另一种任意类型,那么我们可以使用aggregateByKey

def aggregateByKey[U](zeroValue: U)(seqOp: (U, V) ⇒ U, combOp: (U, U) ⇒ U)

使用zeroValueandseqOp函数,它在 map 端提供了类似折叠的操作,而 associate 函数combOp将结果seqOp与最终结果结合起来,就像 reduceByKey 所做的那样。从签名中我们可以看出,虽然集合值是类型V,但结果aggregateByKey将是任意类型U

应用到上面的例子, aggregateByKey看起来像这样:

rdd.aggregateByKey("")({case (aggr , value) => aggr + String.valueOf(value)}, (aggr1, aggr2) => aggr1 + aggr2)
于 2014-12-17T21:35:53.703 回答
1

您的代码的问题是您的值类型不匹配。如果您更改了 RDD 中的值类型,您可以使用 reduceByKey 实现相同的输出。

val rdd: org.apache.spark.rdd.RDD[((String), String)] =
    sc.parallelize(List(
      ("a", "1.0"),
      ("a", "3.0"),
      ("a", "2.0")
      ))

    val reduceByKey = rdd.reduceByKey((a , b) => a.concat(b))

这是相同的示例。只要您传递给 reduceByKey 的函数接受两个 Value 类型的参数(在您的情况下为 Double )并返回一个相同类型的参数,您的 reduceByKey 就可以工作。

于 2015-01-28T08:18:54.133 回答