7

我正在尝试通过从 Kafka 读取的(假)apache Web 服务器日志运行有状态的 Spark Streaming 计算。目标是“会话化”类似于此博客文章的网络流量

唯一的区别是我想对 IP 命中的每个页面进行“会话化”,而不是整个会话。我能够在批处理模式下使用 Spark 从虚假网络流量文件中读取,但现在我想在流式上下文中进行。

从 Kafka 读取日志文件并解析K/V成对(String, (String, Long, Long))

(IP, (requestPage, time, time)).

然后我呼吁groupByKey()这个K/V pair。在批处理模式下,这将产生:

(String, CollectionBuffer((String, Long, Long), ...)或者

(IP, CollectionBuffer((requestPage, time, time), ...)

在 StreamingContext 中,它产生一个:

(String, ArrayBuffer((String, Long, Long), ...)像这样:

(183.196.254.131,ArrayBuffer((/test.php,1418849762000,1418849762000)))

然而,随着下一个微批处理(DStream)的到来,该信息被丢弃。

最终,我想要的是ArrayBuffer随着时间的推移,随着给定 IP 继续交互并对其数据运行一些计算以“会话化”页面时间,它会随着时间的推移而填满。

我相信实现这一点的运营商是“ updateStateByKey。” 我在使用这个运算符时遇到了一些问题(我对 Spark 和 Scala 都是新手);

任何帮助表示赞赏。

迄今:

val grouped = ipTimeStamp.groupByKey().updateStateByKey(updateGroupByKey) 


    def updateGroupByKey(
                          a: Seq[(String, ArrayBuffer[(String, Long, Long)])],
                          b: Option[(String, ArrayBuffer[(String, Long, Long)])]
                          ): Option[(String, ArrayBuffer[(String, Long, Long)])] = {

  }
4

2 回答 2

2

我想你正在寻找这样的东西:

 def updateGroupByKey(
                          newValues: Seq[(String, ArrayBuffer[(String, Long, Long)])],
                          currentValue: Option[(String, ArrayBuffer[(String, Long, Long)])]
                          ): Option[(String, ArrayBuffer[(String, Long, Long)])] = {
     //Collect the values
     val buffs: Seq[ArrayBuffer[(String, Long, Long)]] = (for (v <- newValues) yield v._2)
     val buffs2 = if (currentValue.isEmpty) buffs else currentValue.get._2 :: buffs
     //Convert state to buffer
     if (buffs2.isEmpty) None else {
        val key = if (currentValue.isEmpty) newValues(0)._1 else currentValue.get._1
        Some((key, buffs2.foldLeft(new ArrayBuffer[(String, Long, Long)])((v, a) => v++a)))
     }
  }
于 2014-12-17T22:34:18.050 回答
2

Gabor 的回答让我走上了正确的道路,但这是一个产生预期输出的答案。

首先,对于我想要的输出:

(100.40.49.235,List((/,1418934075000,1418934075000), (/,1418934105000,1418934105000), (/contactus.html,1418934174000,1418934174000)))

我不需要groupByKey()updateStateByKey已经将值累积到 Seq 中,因此添加groupByKey是不必要的(而且成本很高)。Spark 用户强烈建议不要使用groupByKey.

这是有效的代码:

def updateValues( newValues: Seq[(String, Long, Long)],
                      currentValue: Option[Seq[ (String, Long, Long)]]
                      ): Option[Seq[(String, Long, Long)]] = {

  Some(currentValue.getOrElse(Seq.empty) ++ newValues)

  }


val grouped = ipTimeStamp.updateStateByKey(updateValues)

这里updateStateByKey传递了一个函数 (updateValues),该函数具有随时间累积的值 (newValues) 以及流中当前值的选项 (currentValue)。然后它返回这些的组合。getOrElse是必需的,因为 currentValue 有时可能为空。归功于https://twitter.com/granturing以获得正确的代码。

于 2014-12-18T20:45:36.727 回答