2

我正在使用累加器,想知道这些对象是否是线程安全的?

accumInt是 的一种AccumulatorParam<Integer>

// Current value accumInt -> 6
AccumulatorThread t1 = new AccumulatorThread();
t1.setAccum(accumInt); 
t1.setValueToAdd(5);

AccumulatorThread t2 = new AccumulatorThread();
t2.setAccum(accumInt);
t2.setValueToAdd(7);

new Thread(t1).start();
new Thread(t2).start();

System.out.println(accumInt.value()); // 11 or 13 or 18

AccumlatorThread班级:

class AccumulatorThread implements Runnable {
    Accumulator<Integer> accum;
    Integer              valueToAdd;

    public Integer getValueToAdd() {
        return valueToAdd;
    }


    public void setValueToAdd(Integer valueToAdd) {
        this.valueToAdd = valueToAdd;
    }

    public Accumulator<Integer> getAccum() {
        return accum;
    }


    public void setAccum(Accumulator<Integer> accum) {
        this.accum = accum;
    }

    public void run() {
        System.out.println("Value to Add in Thread : "+valueToAdd);
        accum.add(valueToAdd);
    }
}

该行为表明它不是线程安全的。我错过了什么吗?

4

4 回答 4

5

OOC 为什么你在同一个程序中设置和读取累加器?累加器通常由工作线程添加,并且只能由驱动线程读取。

Worker1:   accumulator.add(increment)
Worker2:   accumulator.add(someOtherIncrement)

Driver:  println(accumulator.value)

现在您正在询问用于在驱动程序的不同线程中设置/读取值的多线程。达到什么目的?在这种情况下,只需使用本地 JVMAtomicIntegerAtomicLong.

累加器是仅通过关联操作“添加”到的变量,因此可以有效地并行支持。

于 2014-12-14T02:02:23.683 回答
5

累加器不是线程安全的。只能SparkContext在多个线程中使用。

于 2014-12-12T07:27:23.207 回答
1

To expand on the other two great answers from @javadba and @zsxwing.

My understanding of Apache Spark is that they may or may not be thread-safe. It does not really matter. Since the driver is "far away" from its workers (they usually talk to each other over the network or at least between JVMs -- unless it's local mode) all updates to an accumulator arrive in messages that are processed one by one and therefore ensure single-threaded update to the accumulator.

于 2016-04-22T06:44:01.750 回答
0

累加器不是线程安全的,实际上它们不需要是线程安全的。对于执行器来说,累加器是只写变量,它们可以被执行器添加,也可以被驱动读取。驱动程序使用 DAGScheduler.updateAccumulators 方法在任务完成后更新累加器的值,并且该方法只能从运行调度循环的线程中调用。一次只处理一个任务完成事件。这就是为什么累加器不需要线程安全的原因。

于 2017-05-11T10:24:08.760 回答