31

我正在使用 Spark 1.0.1 处理大量数据。每行包含一个 ID 号,其中一些具有重复的 ID。我想将具有相同 ID 号的所有行保存在同一位置,但我无法有效地执行此操作。我创建了(ID 号,数据行)对的 RDD[(String, String)]:

val mapRdd = rdd.map{ x=> (x.split("\\t+")(1), x)} 

一种有效但不高效的方法是收集 ID 号,过滤每个 ID 的 RDD,并将具有相同 ID 的值的 RDD 保存为文本文件。

val ids = rdd.keys.distinct.collect
ids.foreach({ id =>
    val dataRows = mapRdd.filter(_._1 == id).values
    dataRows.saveAsTextFile(id)
})

我还尝试了 groupByKey 或 reduceByKey ,这样 RDD 中的每个元组都包含一个唯一的 ID 号作为键,以及由该 ID 号的新行分隔的一串组合数据行。我只想使用 foreach 遍历 RDD 一次来保存数据,但它不能将值作为 RDD

groupedRdd.foreach({ tup =>
  val data = sc.parallelize(List(tup._2)) //nested RDD does not work
  data.saveAsTextFile(tup._1)
})

本质上,我想通过 ID 号将 RDD 拆分为多个 RDD,并将该 ID 号的值保存到它们自己的位置。

4

3 回答 3

13

我认为这个问题类似于 通过键 Spark 写入多个输出 - 一个 Spark 作业

请参考那里的答案。

import org.apache.hadoop.io.NullWritable

import org.apache.spark._
import org.apache.spark.SparkContext._

import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat

class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat[Any, Any] {
  override def generateActualKey(key: Any, value: Any): Any = 
    NullWritable.get()

  override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String = 
    key.asInstanceOf[String]
}

object Split {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("Split" + args(1))
    val sc = new SparkContext(conf)
    sc.textFile("input/path")
    .map(a => (k, v)) // Your own implementation
    .partitionBy(new HashPartitioner(num))
    .saveAsHadoopFile("output/path", classOf[String], classOf[String],
      classOf[RDDMultipleTextOutputFormat])
    spark.stop()
  }
}

刚刚在上面看到类似的答案,但实际上我们不需要自定义分区。MultipleTextOutputFormat 将为每个键创建文件。具有相同键的多条记录落入同一个分区是可以的。

new HashPartitioner(num),其中 num 是你想要的分区号。如果您有大量不同的键,您可以将 number 设置为 big。在这种情况下,每个分区不会打开太多的 hdfs 文件处理程序。

于 2014-10-11T19:27:25.553 回答
0

这将保存每个用户 ID 的数据

val mapRdd = rdd.map{ x=> (x.split("\\t+")(1),
x)}.groupByKey(numPartitions).saveAsObjectFile("file")

如果您需要根据用户 ID 再次检索数据,您可以执行类似的操作

val userIdLookupTable = sc.objectFile("file").cache() //could use persist() if data is to big for memory  
val data = userIdLookupTable.lookup(id) //note this returns a sequence, in this case you can just get the first one  

请注意,在这种情况下,没有特别的理由保存到文件中,因为 OP 要求它,所以我只是这样做了,据说保存到文件确实允许您在初始分组完成后的任何时间加载 RDD。

最后一件事,lookup比访问 ids 的过滤方法更快,但如果你愿意从 spark 发出拉取请求,你可以查看这个答案以获得更快的方法

于 2014-07-30T23:10:05.517 回答
0

你可以直接在分组RDD上调用saveAsTextFile,这里它会根据分区保存数据,我的意思是,如果你有4个distinctID,并且你指定groupedRDD的分区数为4,那么spark将每个分区数据存储到一个文件中(所以通过它您只能拥有一个文件每个 ID)您甚至可以将数据视为文件系统中每个 ID 的可迭代对象。

于 2014-08-11T12:27:14.587 回答