0

我正在尝试解决一个问题,例如我有一个这样的数据集:

(1, 3)
(1, 4)
(1, 7)
(1, 2)   <-
(2, 7)   <-
(6, 6)    
(3, 7)   <-
(7, 4)   <-
...

由于 (1 -> 2)and (2 -> 7),我想将集合替换(2, 7)(1, 7) 类似,(3 -> 7)(7 -> 4)替换(7,4)(3, 4)

因此,我的数据集变成

(1, 3)
(1, 4)
(1, 7)
(1, 2)  
(1, 7)  
(6, 6)    
(3, 7)
(3, 4)
...

知道如何解决或解决这个问题吗?

谢谢

4

1 回答 1

2

这个问题看起来像一个图的传递闭包,以分布式边列表的形式表示。

与旧的 Hadoop MR 相比,Spark 的关键特性之一是 Spark 支持交互式算法。为了解决这样的图遍历问题,我们在递归函数中利用了这种能力:

def closure(rdd:RDD[(Int, Int)]):RDD[(Int,Int)] = {
  val transitiveValues = rdd.map(_.swap).join(rdd).filter{case (_,(x,y)) => x != y}
  if (transitiveValues.isEmpty) {
    rdd
  } else {
    val usedTransitions = transitiveValues.flatMap{case (a,(x,y)) => Seq((x,a),(a,y))}
    val newTransitions = transitiveValues.map{case (a,(x,y)) => (x,y)}
    closure(rdd.subtract(usedTransitions).union(newTransitions)).distinct
  }
}

这并不完全导致上面预期的输出,因为没有优先级的概念(隐式排序),所以closure((1, 2),(2, 7)) = (1,7)并不(1, 2), (1, 7)像上面预期的那样。可以以额外的复杂性为代价添加订购。此外,它不支持循环图(带循环)。

该算法应仅作为调整到特定内部要求的起点。

于 2016-10-26T07:25:31.970 回答