我正在尝试通过从 Kafka 读取的(假)apache Web 服务器日志运行有状态的 Spark Streaming 计算。目标是“会话化”类似于此博客文章的网络流量
唯一的区别是我想对 IP 命中的每个页面进行“会话化”,而不是整个会话。我能够在批处理模式下使用 Spark 从虚假网络流量文件中读取,但现在我想在流式上下文中进行。
从 Kafka 读取日志文件并解析K/V
成对(String, (String, Long, Long))
或
(IP, (requestPage, time, time))
.
然后我呼吁groupByKey()
这个K/V pair
。在批处理模式下,这将产生:
(String, CollectionBuffer((String, Long, Long), ...)
或者
(IP, CollectionBuffer((requestPage, time, time), ...)
在 StreamingContext 中,它产生一个:
(String, ArrayBuffer((String, Long, Long), ...)
像这样:
(183.196.254.131,ArrayBuffer((/test.php,1418849762000,1418849762000)))
然而,随着下一个微批处理(DStream)的到来,该信息被丢弃。
最终,我想要的是ArrayBuffer
随着时间的推移,随着给定 IP 继续交互并对其数据运行一些计算以“会话化”页面时间,它会随着时间的推移而填满。
我相信实现这一点的运营商是“ updateStateByKey
。” 我在使用这个运算符时遇到了一些问题(我对 Spark 和 Scala 都是新手);
任何帮助表示赞赏。
迄今:
val grouped = ipTimeStamp.groupByKey().updateStateByKey(updateGroupByKey)
def updateGroupByKey(
a: Seq[(String, ArrayBuffer[(String, Long, Long)])],
b: Option[(String, ArrayBuffer[(String, Long, Long)])]
): Option[(String, ArrayBuffer[(String, Long, Long)])] = {
}