2

我正在处理一个java jar。累加器将流值相加。问题是,我想在我的 UI 中每次递增或以特定的周期间隔显示该值。

但是,由于累加器值只能从 Driver 程序中获取,因此在进程完成执行之前我无法访问该值。关于如何定期访问此值的任何想法?

我的代码如下

package com.spark;

import java.util.HashMap;
import java.util.Map;

import org.apache.spark.Accumulator;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;

import scala.Tuple2;

public class KafkaSpark {

    /**
     * @param args
     */
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("Simple Application");
        conf.setMaster("local");
        JavaStreamingContext jssc = new JavaStreamingContext(conf,
                new Duration(5000));
        final Accumulator<Integer> accum = jssc.sparkContext().accumulator(0);
        Map<String, Integer> topicMap = new HashMap<String, Integer>();
        topicMap.put("test", 1);
        JavaPairDStream<String, String> lines = KafkaUtils.createStream(jssc,
                "localhost:2181", "group1", topicMap);

        JavaDStream<Integer> map = lines
                .map(new Function<Tuple2<String, String>, Integer>() {

                    public Integer call(Tuple2<String, String> v1)
                            throws Exception {
                        if (v1._2.contains("the")) {
                            accum.add(1);
                            return 1;
                        }
                        return 0;
                    }
                });

        map.print();
        jssc.start();
        jssc.awaitTermination();
        System.out.println("*************" + accum.value());
        System.out.println("done");
    }
}

我正在使用 Kafka 流式传输数据。

4

2 回答 2

1

在 spark 中,仅当调用 jssc.star() 时,实际代码才开始执行。现在控制是火花它开始运行循环,你所有的 system.out.println 将只被调用一次。并且不会每次都使用循环执行。

对于输出操作检查文档

你可以使用

print() forEachRDD() 保存为对象文本或 hadoop 文件

希望这可以帮助

于 2014-10-01T07:11:46.747 回答
0
jssc.start();
while(true) {
    System.out.println("current:" + accum.value());
    Thread.sleep(1000);
}
于 2016-04-29T05:55:24.870 回答