0

使用 apache-spark 的 graphX API,我编写了以下代码,成功生成了 Graph。但是,当我尝试查询此图时,会发生一些与内存相关的错误

val RDDorg = sc.textFile("output.txt")
val RDDstart = RDDorg.map(line => line.split("#"))
val rddPersons = RDDstart.map(line => line(0)).union(RDDstart.map(line=>line(2))).distinct().zipWithIndex()
val verticesRDD = rddPersons.map(_.swap)
val rddSender = rddPersons.join(RDDstart.map(line => (line(0),line(1)))).values.map(_.swap).distinct()
val rddReceiver = rddPersons.join(RDDstart.map(line => (line(2),line(1)))).values.map(_.swap).distinct()
val msgid= RDDstart.map(line => line(1)).distinct().zipWithIndex().map(_.swap)
val mrg = verticesRDD.union(msgid).zipWithIndex().map(_.swap)
val distVertices = (mrg.map(line => (line._1,line._2._2))).map(_.swap)
val rddEdge = rddSender.join(rddReceiver).map(line => (line._2._1,line._2._2,line._1))
val mrgrdd = distVertices.join(rddEdge.map(line=> (line._3,line._2)))
val nxt_2 = (mrgrdd.map(line => (line._1,line._2._2,line._2._1)))
val test = nxt_2.map(line=> (line._1,line._3))
val test2 = test.join(rddEdge.map(line => (line._3,line._2)))
val test3 = test.join(rddEdge.map(line => (line._3,line._1)))
val final_coll = test3.join(test2)
val senderEdge = final_coll.map(line => (line._2._1._2, line._2._1._1,line._1))
val RcvrEdge = final_coll.map(line => (line._2._2._1, line._2._2._2,line._1))
val FinalEdge = senderEdge.union(RcvrEdge)
val edges: RDD[Edge[String]] = FinalEdge.map { line =>
Edge(line._1,line._2,line._3 ) }.distinct()

 /////////////////////////////////////
class rootclass{}
case class UserNode(sName:String, sno:String, sDept:String) extends rootclass with Serializable
case class MessageNode(mID:String, received:String, toAddr:String, ccAddr:String, dkim:String, recv_spf:String, in_reply:String,
                              references:String, return_path:String, from:String, subject:String, content_type:String, dateTime:String,
                              del_dateTime:String, envelopTo:String, deliveredTo:String, importance:String, userAgent:String,
                              Xpriority:Int) extends rootclass with Serializable
 val users: RDD[rootclass] = verticesRDD.map { line =>val cols = line._2.split(",")
        (new UserNode(cols(0), cols(1), cols(2)))
      }
val Message: RDD[rootclass] = msgid.map { line => val cols = line._2.split("%")
        (new MessageNode(cols(0), cols(1), cols(2), cols(3), cols(4), cols(5),cols(6),cols(7), cols(8), cols(9), cols(10), cols(11), cols(12), cols(13), cols(14), cols(15),cols(16), cols(17),cols(18).toInt))
 }

val nodes = (users union Message).zipWithIndex().map(_.swap)

val graph = Graph.apply(nodes, edges)
val num= graph.numVertices

如果我尝试使用 graph.numvertices ,我会收到错误

15/10/09 12:10:55 INFO TaskSetManager: Starting task 6.0 in stage 38.0 (TID 90, localhost, PROCESS_LOCAL, 3940 bytes)
15/10/09 12:10:55 INFO Executor: Running task 6.0 in stage 38.0 (TID 90)
15/10/09 12:10:55 INFO TaskSetManager: Finished task 5.0 in stage 38.0 (TID 89) in 45114 ms on localhost (5/12)
15/10/09 12:10:55 INFO ShuffleBlockFetcherIterator: Getting 6 non-empty blocks out of 6 blocks
15/10/09 12:10:55 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
15/10/09 12:10:55 INFO ShuffleBlockFetcherIterator: Getting 4 non-empty blocks out of 4 blocks
15/10/09 12:10:55 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
15/10/09 12:10:55 INFO ShuffleBlockFetcherIterator: Getting 6 non-empty blocks out of 6 blocks
15/10/09 12:10:55 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
15/10/09 12:11:06 INFO ShuffleBlockFetcherIterator: Getting 4 non-empty blocks out of 4 blocks
15/10/09 12:11:06 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
15/10/09 12:16:59 INFO Executor: Finished task 6.0 in stage 38.0 (TID 90). 1011 bytes result sent to driver
15/10/09 12:16:59 ERROR Executor: Exception in task 4.0 in stage 38.0 (TID 88)
java.lang.OutOfMemoryError: GC overhead limit exceeded
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$join$1$$anonfun$apply$3$$anonfun$apply$4.apply(PairRDDFunctions.scala:485)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$join$1$$anonfun$apply$3$$anonfun$apply$4.apply(PairRDDFunctions.scala:485)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at org.apache.spark.util.collection.CompactBuffer$$anon$1.foreach(CompactBuffer.scala:113)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    at org.apache.spark.util.collection.CompactBuffer.foreach(CompactBuffer.scala:28)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
    at org.apache.spark.util.collection.CompactBuffer.map(CompactBuffer.scala:28)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$join$1$$anonfun$apply$3.apply(PairRDDFunctions.scala:485)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$join$1$$anonfun$apply$3.apply(PairRDDFunctions.scala:485)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at org.apache.spark.util.collection.CompactBuffer$$anon$1.foreach(CompactBuffer.scala:113)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    at org.apache.spark.util.collection.CompactBuffer.foreach(CompactBuffer.scala:28)
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
    at org.apache.spark.util.collection.CompactBuffer.flatMap(CompactBuffer.scala:28)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$join$1.apply(PairRDDFunctions.scala:485)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$join$1.apply(PairRDDFunctions.scala:484)
    at org.apache.spark.rdd.FlatMappedValuesRDD$$anonfun$compute$1.apply(FlatMappedValuesRDD.scala:32)
    at org.apache.spark.rdd.FlatMappedValuesRDD$$anonfun$compute$1.apply(FlatMappedValuesRDD.scala:31)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:202)
    at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:58)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)

请建议我在哪里犯了错误。

谢谢

4

0 回答 0