我正在尝试在 Scala 中编写一个简单的 Spark 代码。
在这里,我得到了一个 DStream。我成功地打印了这个 DStream。但是,当我尝试在此 DStream 上执行任何类型的“foreach”、“foreachRDD”或“transform”功能时,在我的程序执行期间,我的控制台就会冻结。在这里我没有收到任何错误,但控制台只是变得无响应,直到我手动终止 Eclipse 控制台操作。我在这里附上代码。请告诉我我做错了什么。
我的主要目标是在 DStream 上应用 RDD 操作,据我所知,我需要使用“foreach”、“foreachRDD”或“transform”函数将我的 DStream 转换为 RDD。
我已经通过使用 Java 实现了相同的目标。但是在scala中我遇到了这个问题。
还有其他人面临同样的问题吗?如果没有,请帮助我。谢谢
我在这里写一个示例代码
object KafkaStreaming {
def main(args: Array[String]) {
if (args.length < 4) {
System.err.println("Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>")
System.exit(1)
}
val Array(zkQuorum, group, topics, numThreads) = args
val ssc = new StreamingContext("local", "KafkaWordCount", Seconds(2))
val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2)
val splitLines:DStream[String] = lines.flatMap(_.split("\n"))
val pairAlarm = splitLines.map(
x=>{
//Some Code
val alarmPair = new Tuple2(key, value)
alarmPair
}
)
//pairAlarm.print
pairAlarm.foreachRDD(x=>{
println("1 : "+x.first)
x.collect // When the execution reaching this part its getting freeze
println("2: "+x.first)
})
ssc.start()
ssc.awaitTermination()
}
}