24

我想在 Spark RDD 中选择一系列元素。例如,我有一个包含一百个元素的 RDD,我需要选择 60 到 80 个元素。我该怎么做?

我看到 RDD 有一个 take(i: int) 方法,它返回第一个 i 元素。但是没有对应的方法来取最后的 i 个元素,或者从某个索引开始从中间开始的 i 个元素。

4

4 回答 4

12

我认为还没有一种有效的方法来做到这一点。但是最简单的方法是使用filter(),假设您有一个pairs带有键值对的 RDD,并且您只想要 60 到 80 之间的元素就可以了。

val 60to80 = pairs.filter {
    _ match {
        case (k,v) => k >= 60 && k <= 80
        case _ => false //incase of invalid input
    }
}

sortByKey我认为通过使用和保存有关映射到每个分区的值范围的信息,将来可以更有效地完成此操作。请记住,如果您计划多次查询范围,这种方法只会节省任何东西,因为排序显然很昂贵。

通过查看火花源,肯定可以使用以下方法进行有效的范围查询RangePartitioner

// An array of upper bounds for the first (partitions - 1) partitions
  private val rangeBounds: Array[K] = {

RangePartitioner这是一个知道所有分区上限的私有成员,很容易只查询必要的分区。看起来这是火花用户将来可能会看到的东西:SPARK-911

更新:更好的答案,基于我为 SPARK-911 写的请求请求。如果对 RDD 进行排序并且您多次查询它,它将有效地运行。

val sorted = sc.parallelize((1 to 100).map(x => (x, x))).sortByKey().cache()
val p: RangePartitioner[Int, Int] = sorted.partitioner.get.asInstanceOf[RangePartitioner[Int, Int]];
val (lower, upper) = (10, 20)
val range = p.getPartition(lower) to p.getPartition(upper)
println(range)
val rangeFilter = (i: Int, iter: Iterator[(Int, Int)]) => {
  if (range.contains(i))
    for ((k, v) <- iter if k >= lower && k <= upper) yield (k, v)
  else
    Iterator.empty
}
for((k,v) <- sorted.mapPartitionsWithIndex(rangeFilter, preservesPartitioning = true).collect()) println(s"$k, $v")

如果在内存中拥有整个分区是可以接受的,你甚至可以做这样的事情。
val glommedAndCached = sorted.glom()cache(); glommedAndCached.map(a => a.slice(a.search(lower),a.search(upper)+1)).collect()

search不是成员 BTW 我刚刚创建了一个具有二分查找功能的隐式类,此处未显示

于 2014-07-10T13:52:33.500 回答
8

你的数据集有多大?你也许可以做你需要的:

data.take(80).drop(59)

这似乎效率低下,但对于中小型数据,应该可以工作。

是否有可能以另一种方式解决这个问题?从数据中间准确挑选某个范围的情况是什么?会takeSample更好地为您服务吗?

于 2014-07-10T17:26:05.620 回答
5

以下应该可以得到范围。注意缓存会为你节省一些开销,因为 zipWithIndex 内部需要扫描 RDD 分区以获取每个分区中的元素数量。

scala>val r1 = sc.parallelize(List("a", "b", "c", "d", "e", "f", "g"), 3).cache
scala>val r2 = r1.zipWithIndex
scala>val r3 = r2.filter(x=> {x._2>2 && x._2 < 4}).map(x=>x._1)
scala>r3.foreach(println)
d
于 2014-09-28T04:51:20.590 回答
1

对于那些偶然发现这个问题并寻找与 Spark 2.x 兼容的答案的人,您可以使用filterByRange

于 2017-11-20T05:38:54.770 回答