2

my accumulator is an Array[Array[Int]] after updating accumalutor in foreach operation of RDD,accumulator(0) is as expected where as accumulator(1) is Array(0,0,0) which is completely lost

inside RDD ,accumulator value is Array(Array(4,5,6),Array(4,5,6)) outside RDD, accumulator value is Array(Array(4,5,6),Array(0,0,0))

below is the code

import org.apache.spark.Accumulable
import org.apache.spark.AccumulableParam
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object acc {
  def main(args: Array[String]) {
     val conf = new SparkConf().setAppName("Simple Application")
  val sc = new SparkContext(conf)
  val a =Array(Array(1,2,3),Array(4,5,6))
  val rdd = sc.parallelize(a)
  val initialValue = Array.fill[Array[Int]](2)(Array.fill[Int](3)(1))
  val accumulator = sc.accumulable(initialValue)(MatrixAccumulatorParam)
  rdd.foreach{x=>
     accumulator += (x(0),0,0)
     accumulator += (x(1),0,1)
     accumulator += (x(2),0,2)
     accumulator += (x(0),1,0)
     accumulator += (x(1),1,1)
     accumulator += (x(2),1,2)
     println("accumulator value in rdd is"+accumulator.localValue)
     }

  println("accumulator value out of rdd is :" + accumulator.value )

  }

}
object MatrixAccumulatorParam extends AccumulableParam[Array[Array[Int]], (Int, Int,   Int)] {

  def zero(initialValue: Array[Array[Int]]): Array[Array[Int]] = {
    initialValue
  }

  def addAccumulator(acc: Array[Array[Int]], value: (Int, Int, Int)): Array[Array[Int]] = {

    acc(value._2)(value._3) = value._1
    acc

  }

   def addInPlace(m1: Array[Array[Int]], m2: Array[Array[Int]]): Array[Array[Int]] = {
    val columnLength: Int = m1.length
    val rowLength: Int = m1(0).length
    var updatedMatrix = Array.ofDim[Int](columnLength, rowLength)

    var j: Int = 0
    while (j < columnLength) {
      var i =0
    while (i < rowLength) {
         val a = Math.max(m1(j)(i), m2(j)(i))
        updatedMatrix(j)(i) = a
        i += 1
      } 
      j += 1
    }

    updatedMatrix
      }


}

results: inside RDD ,accumalator value is Array(Array(4,5,6),Array(4,5,6)) outside RDD, accumalator value is Array(Array(4,5,6),Array(0,0,0))

but what i'm expecting outside RDD is Array(Array(4,5,6),Array(4,5,6))

4

3 回答 3

3

每当 accumulator.variable 有更新时,就会调用 addAccumulator方法

在上面的代码中 accumulator += (x(0),0,0) 调用 addAccumulator 方法。

完成所有任务后,将调用addInPlace方法来聚合所有任务的累积值。

在上面的代码中,initialValue Array(1, 1, 1)Array(1, 1, 1) 和任务 Accumulator value Array(4, 5, 6) Array(4, 5, 6) 调用了 addInPlace 方法。

在上面的代码中,addInPlace 方法中的变量 i 在进入循环时必须重置while (j < columnLength) {

下面的代码就像一个魅力。

            while (j < columnLength) {
              i=0
                while (i < rowLength) {
                  println("m1(j)(i)"+ m1(j)(i))
                  println(" m2(j)(i))"+ m2(j)(i))
                    val a = Math.max(m1(j)(i), m2(j)(i))
                            updatedMatrix(j)(i) = a
                            i += 1
                } 
                j += 1
            }
于 2014-12-09T13:52:27.730 回答
0

根据文档,localValue 应该是不同的:

  • 这不是累加器的全局值。之后获取全局值
  • 完成对数据集的操作,调用value. *
  • 这种方法的典型用途是直接改变局部值,例如,添加
  • 一个元素到一个集合。*/
于 2014-12-19T10:37:19.427 回答
0

我发现这与将 var i=0 修改为 i=0 没有区别,最终结果是 Array(Array(4,5,6),Array(4,5,6))

应用程序的输出由 yarn logs -applicationId 获取。

代码是:

import org.apache.spark.Accumulable
import org.apache.spark.AccumulableParam
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object acc {
  def main(args: Array[String]) {
     //val conf = new SparkConf().setAppName("Simple Application")
  val conf = new SparkConf()
  conf.setSparkHome("/usr/lib/spark")
  conf.setAppName("Simple Application")
  val sc = new SparkContext(conf)
  val a =Array(Array(1,2,3),Array(4,5,6))
  val rdd = sc.parallelize(a)
  val initialValue = Array.fill[Array[Int]](2)(Array.fill[Int](3)(1))
  val accumulator = sc.accumulable(initialValue)(MatrixAccumulatorParam)
  rdd.foreach{x=>
     accumulator += (x(0),0,0)
     accumulator += (x(1),0,1)
     accumulator += (x(2),0,2)
     accumulator += (x(0),1,0)
     accumulator += (x(1),1,1)
     accumulator += (x(2),1,2)
     val columnLength: Int = accumulator.localValue.length
     val rowLength: Int = accumulator.localValue(0).length
     var j: Int = 0
     var i: Int = 0
     println("accumulator")
     while(j < columnLength){
        i =0 
        while(i<rowLength){
            println(accumulator.localValue(j)(i))
            i += 1
        }
        j+=1
     }
     println("accumulator value in rdd is"+accumulator.localValue)
     }
     val columnLength: Int = accumulator.value.length
     val rowLength: Int = accumulator.value(0).length
     var j: Int = 0
     var i: Int = 0
     println("total")
     while(j < columnLength){
        i =0 
        while(i<rowLength){
            println(accumulator.value(j)(i))
            i += 1
        }
        j+=1
     }

  println("accumulator value out of rdd is :" + accumulator.value )

  }

}
object MatrixAccumulatorParam extends AccumulableParam[Array[Array[Int]], (Int, Int,   Int)] {

  def zero(initialValue: Array[Array[Int]]): Array[Array[Int]] = {
    initialValue
  }

  def addAccumulator(acc: Array[Array[Int]], value: (Int, Int, Int)): Array[Array[Int]] = {

    acc(value._2)(value._3) = value._1
    acc
  }

   def addInPlace(m1: Array[Array[Int]], m2: Array[Array[Int]]): Array[Array[Int]] = {
    val columnLength: Int = m1.length
    val rowLength: Int = m1(0).length
    var updatedMatrix = Array.ofDim[Int](columnLength, rowLength)

    var j: Int = 0
    var i: Int = 0
    while (j < columnLength) {
    i =0
    while (i < rowLength) {
        println("m1("+j+")("+i+")="+ m1(j)(i) + " m2("+j+")("+i+")="+ m2(j)(i))
        val a = Math.max(m1(j)(i), m2(j)(i))
        updatedMatrix(j)(i) = a
        i += 1
      } 
      j += 1
    }

    updatedMatrix
  }
}

结果是:

accumulator
4
5
6
4
5
6

accumulator
1
2
3
1
2
3

m1(0)(0)=1 m2(0)(0)=1
m1(0)(1)=1 m2(0)(1)=2
m1(0)(2)=1 m2(0)(2)=3
m1(1)(0)=1 m2(1)(0)=1
m1(1)(1)=1 m2(1)(1)=2
m1(1)(2)=1 m2(1)(2)=3
m1(0)(0)=1 m2(0)(0)=4
m1(0)(1)=2 m2(0)(1)=5
m1(0)(2)=3 m2(0)(2)=6
m1(1)(0)=1 m2(1)(0)=4
m1(1)(1)=2 m2(1)(1)=5
m1(1)(2)=3 m2(1)(2)=6

total
4
5
6
4
5
6

并将代码修改为:

    //var i: Int = 0
    while (j < columnLength) {
    var i =0

结果是:

m1(0)(0)=1 m2(0)(0)=1
m1(0)(1)=1 m2(0)(1)=2
m1(0)(2)=1 m2(0)(2)=3
m1(1)(0)=1 m2(1)(0)=1
m1(1)(1)=1 m2(1)(1)=2
m1(1)(2)=1 m2(1)(2)=3
m1(0)(0)=1 m2(0)(0)=4
m1(0)(1)=2 m2(0)(1)=5
m1(0)(2)=3 m2(0)(2)=6
m1(1)(0)=1 m2(1)(0)=4
m1(1)(1)=2 m2(1)(1)=5
m1(1)(2)=3 m2(1)(2)=6
total
4
5
6
4
5
6

accumulator
1
2
3
1
2
3

accumulator
4
5
6
4
5
6

最后的结果是一样的。

但我有两个问题:

  • 我不知道为什么两个输出顺序不一样。
  • 为什么 addInplace 函数会被调用两次?
    • 我想我知道为什么这个函数会被调用两次,但我不确定
      • 初始化:数组(数组(1,1,1),数组(1,1,1)
      • 任务的输出:Array(Array(1,2,3),Array(1,2,3)
      • 其他任务的输出:Array(Array(4,5,6),Array(4,5,6)

@Vijay Innamuri

于 2015-11-10T08:07:13.743 回答