21

我有一个在单节点上运行良好的小型 Scala 程序。但是,我正在扩展它,以便它在多个节点上运行。这是我的第一次这样的尝试。我只是想了解 RDD 在 Spark 中是如何工作的,所以这个问题是基于理论的,可能不是 100% 正确的。

假设我创建了一个 RDD: val rdd = sc.textFile(file)

现在,一旦我这样做了,这是否意味着文件file现在跨节点分区(假设所有节点都可以访问文件路径)?

其次,我想计算 RDD 中的对象数量(足够简单),但是,我需要在需要应用于 RDD 中的对象的计算中使用该数字 - 一个伪代码示例:

rdd.map(x => x / rdd.size)

假设有 100 个对象rdd,假设有 10 个节点,因此每个节点计数 10 个对象(假设这是 RDD 概念的工作原理),现在当我调用该方法时,每个节点将使用rdd.sizeas执行计算10还是100?因为,总体而言,RDD 是大小100,但在每个节点上本地它只是10. 在进行计算之前,我是否需要制作广播变量?这个问题与下面的问题有关。

最后,如果我对 RDD 进行转换,例如rdd.map(_.split("-")),然后我想要新size的 RDD,我是否需要对 RDD 执行操作,例如count(),以便将所有信息发送回驱动节点?

4

2 回答 2

19
val rdd = sc.textFile(file)

这是否意味着文件现在跨节点分区?

该文件保留在原来的位置。结果的元素是RDD[String]文件的行。RDD 被分区以匹配底层文件系统的自然分区。分区数不取决于您拥有的节点数。

重要的是要了解,执行此行时它不会读取文件。RDD 是一个惰性对象,只会在必须的时候做一些事情。这很棒,因为它避免了不必要的内存使用。

例如,如果你写val errors = rdd.filter(line => line.startsWith("error")),仍然没有任何反应。如果你val errorCount = errors.count现在写你的操作序列将需要执行,因为结果count是一个整数。然后,每个工作核心(执行程序线程)将并行执行的操作是读取一个文件(或一段文件),遍历其行,并计算以“错误”开头的行。除了缓冲和 GC,每个内核一次只有一行在内存中。这使得在不使用大量内存的情况下处理非常大的数据成为可能。

我想计算 RDD 中的对象数量,但是,我需要在计算中使用该数字,该计算需要应用于 RDD 中的对象 - 一个伪代码示例:

rdd.map(x => x / rdd.size)

没有rdd.size方法。有rdd.count,它计算 RDD 中元素的数量。rdd.map(x => x / rdd.count)不管用。代码将尝试将rdd变量发送给所有工作人员,并且会失败并显示NotSerializableException. 你可以做的是:

val count = rdd.count
val normalized = rdd.map(x => x / count)

这行得通,因为count它是一个Int并且可以序列化。

如果我对 RDD 进行转换,例如rdd.map(_.split("-")),然后我想要 RDD 的新大小,我是否需要对 RDD 执行操作,例如count(),以便将所有信息发送回驱动节点?

map不改变元素的数量。我不知道你说的“大小”是什么意思。但是,是的,您需要执行一个操作,例如count从 RDD 中获取任何内容。您会看到,在您执行某个操作之前,根本不会执行任何工作。(当您执行 时count,只会将每个分区的计数发送回驱动程序,当然,不是“所有信息”。)

于 2014-12-15T00:06:47.730 回答
6

通常,文件(或文件的一部分,如果太大)会复制到集群中的 N 个节点(在 HDFS 上默认 N=3)。并不是要在所有可用节点之间拆分每个文件。

但是,对于您(即客户端)来说,使用 Spark 处理文件应该是透明的 -rdd.size无论拆分和/或复制多少个节点,您都不应该看到任何差异。有一些方法(至少在 Hadoop 中)可以找出文件目前可以位于哪些节点(部分)上。但是,在简单的情况下,您很可能不需要使用此功能。

更新:一篇描述 RDD 内部的文章:https ://cs.stanford.edu/~matei/papers/2012/nsdi_spark.pdf

于 2014-12-12T20:47:39.717 回答