3

我是 spark 新手,我正在使用 Spark 流式传输和 Kafka ..

我的流媒体持续时间是 1 秒。

假设我在第一批中获得 100 条记录,在第二批中获得 120 条记录,在第三批中获得 80 条记录

--> {sec 1   1,2,...100} --> {sec 2 1,2..120} --> {sec 3 1,2,..80}

我在第一批中应用我的逻辑并得到结果 => result1

我想在处理第二批时使用 result1 并将第二批的 result1 和 120 条记录的组合结果作为 => result2

我试图缓存结果,但我无法在 2 秒内获得缓存的结果 1 可能吗?或说明如何在这里实现我的目标?

 JavaPairReceiverInputDStream<String, String> messages =   KafkaUtils.createStream(jssc, String.class,String.class, StringDecoder.class,StringDecoder.class, kafkaParams, topicMap, StorageLevel.MEMORY_AND_DISK_SER_2());

我处理消息并找到结果为 1 秒的单词。

if(resultCp!=null){
                resultCp.print();
                result = resultCp.union(words.mapValues(new Sum()));

            }else{
                result = words.mapValues(new Sum());
            }

 resultCp =  result.cache();

在第二批中,resultCp 不应为空,但它返回空值,因此在任何给定时间,我只有特定的秒数据,我想找到累积结果。有没有人知道怎么做..

我了解到,一旦启动火花流jssc.start(),控制就不再是我们的终点,它取决于火花。那么是否可以将第一批的结果发送到第二批以查找累积值?

非常感谢任何帮助。提前致谢。

4

1 回答 1

1

我认为您正在寻找updateStateByKey通过对提供的 DStream 和某些状态应用累积函数来创建新 DStream 的方法。Spark示例包中的这个示例涵盖了问题中的情况:

首先,您需要一个更新函数来获取新值和先前已知的值:

val updateFunc = (values: Seq[Int], state: Option[Int]) => {
  val currentCount = values.sum

  val previousCount = state.getOrElse(0)

  Some(currentCount + previousCount)
}

该函数用于创建从源 dstream 累积值的 Dstream。像这样:

// Create a NetworkInputDStream on target ip:port and count the
// words in input stream of \n delimited test (eg. generated by 'nc')
val lines = ssc.socketTextStream(args(0), args(1).toInt)
val words = lines.flatMap(_.split(" "))
val wordDstream = words.map(x => (x, 1))

// Update the cumulative count using updateStateByKey
// This will give a Dstream made of state (which is the cumulative count of the words)
val stateDstream = wordDstream.updateStateByKey[Int](updateFunc) 

来源:https ://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala

于 2014-10-20T10:00:20.683 回答