2

我创建了一个 spark 作业,它每天从我的 hdfs 中读取一个文本文件,并从文本文件的每一行中提取唯一键。每个文本文件中大约有 50000 个键。然后通过提取的密钥过滤相同的数据并保存到 hdfs。

我想在我的 hdfs 中创建一个目录,其结构为: hdfs://.../date/key 包含过滤后的数据。问题是写入 hdfs 需要长时间,因为键太多了。

现在的写法:

val inputData = sparkContext.textFile(""hdfs://...", 2)
val keys = extractKey(inputData) //keys is an array of approx 50000 unique strings
val cleanedData = cleanData(inputData) //cleaned data is an RDD of strings
keys.map(key => {
    val filteredData = cleanedData.filter(line => line.contains(key))
    filteredData.repartition(1).saveAsTextFile("hdfs://.../date/key")
})

有没有办法让它更快?我曾考虑将数据重新分区为提取的密钥数量,但我无法以 hdfs://.../date/key 格式保存。我也尝试过 groupByKey 但我无法保存这些值,因为它们不是 RDD。

任何帮助表示赞赏:)

4

3 回答 3

0

我认为该方法应该类似于通过键 Spark 写入多个输出 - 一个 Spark 作业。分区号与目录号无关。要实现它,您可能需要使用自定义版本覆盖 generateFileNameForKeyValue 以保存到不同的目录。

关于可扩展性,这不是 spark 的问题,而是 hdfs。但无论你如何实现,只要需求不改变,就无法避免。但我认为 Hdfs 可能适合 50,000 个文件处理程序

于 2014-10-11T19:40:47.190 回答
0

您只为输入指定 2 个分区,为输出指定 1 个分区。这样做的一个影响是严重限制了这些操作的并行性。为什么需要这些?

与其计算 50,000 个过滤的 RDD,这也很慢,不如直接按 key 分组怎么样?我知道您想将它们输出到不同的目录中,但这确实导致了这里的瓶颈。是否有另一种方法来构建它,让您读取(键,值)结果?

于 2014-10-11T19:49:22.577 回答
0
  def writeLines(iterator: Iterator[(String, String)]) = {
  val writers = new mutalbe.HashMap[String, BufferedWriter] // (key, writer) map
  try {
  while (iterator.hasNext) {
    val item = iterator.next()
    val key = item._1
    val line = item._2
    val writer = writers.get(key) match {
      case Some(writer) => writer
      case None =>
        val path = arg(1) + key
        val outputStream = FileSystem.get(new Configuration()).create(new Path(path))
        writer = new BufferedWriter(outputStream)
    }
    writer.writeLine(line)
    } finally {
    writers.values.foreach(._close())
    }
}

val inputData = sc.textFile()    
val keyValue = inputData.map(line => (key, line))
val partitions = keyValue.partitionBy(new MyPartition(10))    
partitions.foreachPartition(writeLines)


class MyPartitioner(partitions: Int) extends Partitioner {
    override def numPartitions: Int = partitions

    override def getPartition(key: Any): Int = {
        // make sure lines with the same key in the same partition 
        (key.toString.hashCode & Integer.MAX_VALUE) % numPartitions 
    }
}
于 2014-08-12T13:30:40.300 回答