我正在使用 Spark 1.0.1 处理大量数据。每行包含一个 ID 号,其中一些具有重复的 ID。我想将具有相同 ID 号的所有行保存在同一位置,但我无法有效地执行此操作。我创建了(ID 号,数据行)对的 RDD[(String, String)]:
val mapRdd = rdd.map{ x=> (x.split("\\t+")(1), x)}
一种有效但不高效的方法是收集 ID 号,过滤每个 ID 的 RDD,并将具有相同 ID 的值的 RDD 保存为文本文件。
val ids = rdd.keys.distinct.collect
ids.foreach({ id =>
val dataRows = mapRdd.filter(_._1 == id).values
dataRows.saveAsTextFile(id)
})
我还尝试了 groupByKey 或 reduceByKey ,这样 RDD 中的每个元组都包含一个唯一的 ID 号作为键,以及由该 ID 号的新行分隔的一串组合数据行。我只想使用 foreach 遍历 RDD 一次来保存数据,但它不能将值作为 RDD
groupedRdd.foreach({ tup =>
val data = sc.parallelize(List(tup._2)) //nested RDD does not work
data.saveAsTextFile(tup._1)
})
本质上,我想通过 ID 号将 RDD 拆分为多个 RDD,并将该 ID 号的值保存到它们自己的位置。