1

我在 Scala 中使用 Spark。我想创建一个图表并动态更新图表。

我已经使用以下代码完成了此操作:

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD

object firstgraph {
  def addVertex(
    sc: SparkContext,
    vertexRDD: RDD[(Long(String,Int))],
    name: String,
    age: Int,
    counter:Long): RDD[(Long, (String, Int))] = {
    val newVertexArray = Array((counter, (name, age)))
    val newVertexRdd: RDD[(Long, (String, Int))] = sc.parallelize(newVertexArray)
    newVertexRdd ++ vertexRDD
  }

  def main(args: Array[String]) {
    val conf = new SparkConf().setMaster("local").setAppName("firstgraph")
    val sc = new SparkContext(conf)

    val vertexArray = Array(
      (1L, ("Alice", 28)),
      (2L, ("Bob", 27)),
      (3L, ("Charlie", 65)),
      (4L, ("David", 42)),
      (5L, ("Ed", 55)),
      (6L, ("Fran", 50)))

    val edgeArray = Array(
      Edge(2L, 1L, 7),
      Edge(2L, 4L, 2),
      Edge(3L, 2L, 4),
      Edge(3L, 6L, 3),
      Edge(4L, 1L, 1),
      Edge(5L, 2L, 2),
      Edge(5L, 3L, 8),
      Edge(5L, 6L, 3))

    var vertexRDD: RDD[(Long, (String, Int))] = sc.parallelize(vertexArray)
    var edgeRDD: RDD[Edge[Int]] = sc.parallelize(edgeArray)
    var graph: Graph[(String, Int), Int] = Graph(vertexRDD, edgeRDD)
    graph.vertices.filter { case (id, (name, age)) => age > 30 }.collect.foreach {
      case (id, (name, age)) => println(s"$name is $age")
    }
    var x = 0
    var counter = 7L
    var name = ""
    var age = 0
    while (x == 0) {
      println("Enter Name")
      name = Console.readLine
      println("Enter age")
      age = Console.readInt
      vertexRDD = addVertex(sc, vertexRDD, name, age, counter)
      graph = Graph(vertexRDD, edgeRDD)
      graph.vertices.filter { case (id, (name, age)) => age > 30 }.collect.foreach {
        case (id, (name, age)) => println(s"$name is $age")
      }
      counter = counter + 1
      println("want to enter more node press 0 for yes and 1 for no ")
      x = Console.readInt
    }
  }
}

该程序正在图中添加一个新顶点,但每当插入新顶点时,它就会一次又一次地计算图。我希望在不重新计算图表的情况下做到这一点。

4

2 回答 2

1

Apache Spark RDD 不是为细粒度更新而设计的。所有对 RDD 的操作都是为了改变整个 RDD。

我首先建议您重新考虑您的方法并尝试按照设计使用 RDD。例如,许多常见的算法都是为在单台机器上运行而设计的。就像快速排序一样。您不能通过在每个步骤中仅交换两个元素来在未更改的 RDD 上实现快速排序。它会浪费分布式系统并行执行许多事情的潜力。相反,您需要重新设计算法以利用并行性。

这可能不适用于您的情况,您可能确实需要进行点更新,例如在您的示例中。在这种情况下,您最好使用不同的后端。HBase 和 Cassandra 设计用于点更新,所有其他 SQL 和非 SQL 数据库也是如此。如果您需要图形数据库,Neo4j 也是如此。

但是在离开 Spark 之前要检查的最后一件事是IndexedRDD。它是一种为点更新而设计的RDD。它是作为 GraphX 的一部分诞生的,因此它可能非常适合您的情况。

于 2015-12-08T09:44:39.033 回答
0

请尝试使用以下代码将一堆顶点添加到现有图形中。这里的 inputGraph 是我现有的图形,它被预定义为全局变量,并在之前使用其他函数创建。这段代码只添加了顶点。这里 rdd 变量是我的集合,其值转换为 Long 并用作顶点 id 并添加到图中。

def addVertex(rdd: RDD[String], sc: SparkContext, session: String): Long = {
val defaultUser = (0, 0)
rdd.collect().foreach { x =>
  {
    val aVertex: RDD[(VertexId, (Int, Int))] = sc.parallelize(Array((x.toLong, (100, 100))))
    gVertices = gVertices.union(aVertex)
  }
}
inputGraph = Graph(gVertices, gEdges, defaultUser)
inputGraph.cache()
gVertices = inputGraph.vertices
gVertices.cache()
val count = gVertices.count
println(count);

return 1;

}

于 2016-02-23T07:48:14.117 回答