这个问题看起来像一个图的传递闭包,以分布式边列表的形式表示。
与旧的 Hadoop MR 相比,Spark 的关键特性之一是 Spark 支持交互式算法。为了解决这样的图遍历问题,我们在递归函数中利用了这种能力:
def closure(rdd:RDD[(Int, Int)]):RDD[(Int,Int)] = {
val transitiveValues = rdd.map(_.swap).join(rdd).filter{case (_,(x,y)) => x != y}
if (transitiveValues.isEmpty) {
rdd
} else {
val usedTransitions = transitiveValues.flatMap{case (a,(x,y)) => Seq((x,a),(a,y))}
val newTransitions = transitiveValues.map{case (a,(x,y)) => (x,y)}
closure(rdd.subtract(usedTransitions).union(newTransitions)).distinct
}
}
这并不完全导致上面预期的输出,因为没有优先级的概念(隐式排序),所以closure((1, 2),(2, 7)) = (1,7)
并不(1, 2), (1, 7)
像上面预期的那样。可以以额外的复杂性为代价添加订购。此外,它不支持循环图(带循环)。
该算法应仅作为调整到特定内部要求的起点。