-1

假设我有一个顶点数组,我想以每个顶点连接到下一个 x 顶点的方式从它们创建边。x 可以有任何整数值。有没有办法用 Spark 做到这一点?

到目前为止,这就是我对 Scala 的看法:

//array that holds the edges
    var edges = Array.empty[Edge[Double]]
    for(j <- 0 to vertices.size - 2) {
      for(i <- 1 to x) {
        if((j+i) < vertices.size) {
          //add edge
          edges = edges ++ Array(Edge(vertices(j)._1, vertices(j+i)._1, 1.0))
          //add inverse edge, we want both directions
          edges = edges ++ Array(Edge(vertices(j+i)._1, vertices(j)._1, 1.0))
        }
      }
    }

其中 vertices 变量是 (Long, String) 的数组。但整个过程当然是连续的。

编辑

例如,如果我有这样的顶点:Hello, World, and, Planet cosmos. 我需要以下边缘:Hello -> World, World -> Hello, Hello -> and, and -> Hello, Hello-> Planet, Planet -> Hello, World -> and, and -> World, World -> Planet, Planet -> World, World -> cosmos, cosmos -> World, 等等。

4

2 回答 2

3

你的意思是这样的吗?

// Add dummy vertices at the end (assumes that you don't use negative ids)
(vertices ++ Array.fill(n)((-1L, null))) 
  .sliding(n + 1) // Slide over n + 1 vertices at the time
  .flatMap(arr => { 
     val (srcId, _) = arr.head // Take first
     // Generate 2n edges
     arr.tail.flatMap{case (dstId, _) => 
       Array(Edge(srcId, dstId, 1.0), Edge(dstId, srcId, 1.0))
     }}.filter(e => e.srcId != -1L & e.dstId != -1L)) // Drop dummies
  .toArray

如果你想在 RDD 上运行它,你只需像这样调整初始步骤:

import org.apache.spark.mllib.rdd.RDDFunctions._

val nPartitions = vertices.partitions.size - 1

vertices.mapPartitionsWithIndex((i, iter) =>
  if (i == nPartitions) (iter ++ Array.fill(n)((-1L, null))).toIterator
  else iter)

当然还有 drop toArray。如果您想要循环连接(尾部连接到头部),您可以替换Array.fill(n)((-1L, null))vertices.take(n)和 drop filter

于 2015-10-25T19:37:10.407 回答
2

所以,我认为这会让你得到你想要的:

首先,我定义了一个小辅助函数(请注意,我已在此处将边缘数据设置为顶点名称,以便更容易目视检查):

def pairwiseEdges(list: List[(Long, String)]): List[Edge[String]] = {
  list match {
    case x :: xs => xs.flatMap(i => List(Edge(x._1, i._1, x._2 + "--" + i._2), Edge(i._1, x._1, i._2 + "--" + x._2))) ++ pairwiseEdges(xs)
    case Nil => List.empty
  }
}

zipWithIndex在你的数组上做一个得到一个键,然后将数组转换为一个 RDD:

val vertices = List((1L,"hello"), (2L,"world"), (3L,"and"), (4L, "planet"), (5L,"cosmos")).toArray
val indexedVertices = vertices.zipWithIndex
val rdd = sc.parallelize(indexedVertices)

然后生成边缘x=3

val edges = rdd
  .flatMap{case((vertexId, name), index) => for {i <- 0 to 3; if (index - i) >= 0} yield ((index - i, (vertexId, name)))}
  .groupByKey()
  .flatMap{case(index, iterable) => pairwiseEdges(iterable.toList)}
  .distinct()

编辑:按照@zero323 在评论中的建议重写flatmap并删除了。filter

这将生成以下输出:

Edge(1,2,hello--world))
Edge(1,3,hello--and))
Edge(1,4,hello--planet)

Edge(2,1,world--hello)
Edge(2,3,world--and)
Edge(2,4,world--planet)
Edge(2,5,world--cosmos)

Edge(3,1,and--hello)
Edge(3,2,and--world)
Edge(3,4,and--planet)
Edge(3,5,and--cosmos)

Edge(4,1,planet--hello)
Edge(4,2,planet--world)
Edge(4,3,planet--and)
Edge(4,5,planet--cosmos)

Edge(5,2,cosmos--world)
Edge(5,3,cosmos--and)
Edge(5,4,cosmos--planet)
于 2015-10-25T20:53:40.097 回答