5

我是 Apache Spark(版本 1.4.1)的新手。我编写了一个小代码来读取文本文件并将其数据存储在 Rdd 中。

有没有办法可以获取 rdd 中数据的大小。

这是我的代码:

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.util.SizeEstimator
import org.apache.spark.sql.Row

object RddSize {

  def main(args: Array[String]) {

    val sc = new SparkContext("local", "data size")
    val FILE_LOCATION = "src/main/resources/employees.csv"
    val peopleRdd = sc.textFile(FILE_LOCATION)

    val newRdd = peopleRdd.filter(str => str.contains(",M,"))
    //Here I want to find whats the size remaining data
  }
} 

我想在过滤器转换(peopleRdd)之前和之后(newRdd)获取数据大小。

4

3 回答 3

9

有多种方法可以获取 RDD 大小

1.在您的火花上下文中添加火花监听器

SparkDriver.getContext.addSparkListener(new SparkListener() {
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) {
  val map = stageCompleted.stageInfo.rddInfos
  map.foreach(row => {
      println("rdd memSize " + row.memSize)
      println("rdd diskSize " + row.diskSize)
   })
}})

2. 将您的 rdd 保存为文本文件。

myRDD.saveAsTextFile("person.txt")

并调用Apache Spark REST API

/applications/[app-id]/stages

3.你也可以试试SizeEstimater

val rddSize = SizeEstimator.estimate(myRDD)
于 2015-08-27T19:06:05.190 回答
4

我不确定你是否需要这样做。您可以缓存 rdd 并在 Spark UI 中检查大小。但是假设您确实想以编程方式执行此操作,这是一个解决方案。

    def calcRDDSize(rdd: RDD[String]): Long = {
        //map to the size of each string, UTF-8 is the default
        rdd.map(_.getBytes("UTF-8").length.toLong) 
           .reduce(_+_) //add the sizes together
    }

然后你可以为你的两个 RDD 调用这个函数:

println(s"peopleRdd is [${calcRDDSize(peopleRdd)}] bytes in size")
println(s"newRdd is [${calcRDDSize(newRdd)}] bytes in size")

即使文件大小大于集群中可用的内存,此解决方案也应该有效。

于 2015-08-26T18:00:03.877 回答
0

Spark API 文档说:

  1. 您可以从 Spark 上下文中获取有关您的 RDD 的信息:sc.getRDDStorageInfo
  2. RDD 信息包括内存和磁盘大小:RDDInfo doc
于 2015-08-27T07:44:50.720 回答