我正在尝试使用 Spark GraphX,但遇到了我认为如何使用 Scala 的问题。我是 Scala 和 Spark 的新手。
我通过调用我自己的函数来创建一个图表:
val initialGraph: Graph[VertexAttributes, Int] = sim.createGraph
VertexAttributes 是我定义的一个类:
class VertexAttributes(var pages: List[Page], var ads: List[Ad], var step: Long, val inDegree: Int, val outDegree: Int)
extends java.io.Serializable
{
// Define alternative methods to be used as the score
def averageScore() =
{
this.ads.map(_.score).sum / this.ads.length
}
def maxScore() =
{
if(this.ads.length == 0) None else Some(this.ads.map(_.score).max)
}
// Select averageScore as the function to be used
val score = averageScore _
}
经过一些计算,我使用 GraphX vertices() 函数来获取每个顶点的分数:
val nodeRdd = g.vertices.map(v => if(v._2.score() == 0)(v._1 + ",'0,0,255'") else (v._1 + ",'255,0,0'"))
但这不会编译,sbt 消息是:
value score is not a member of type parameter VertexAttributes
我已经用谷歌搜索了这条错误消息,但坦率地说,我无法关注对话。谁能解释错误的原因以及我该如何解决?
谢谢你。
PS 下面是我的 createGraph 方法代码:
// Define a class to run the simulation
class Butterflies() extends java.io.Serializable
{
// A boolean flag to enable debug statements
var debug = true
// A boolean flag to read an edgelist file rather than compute the edges
val readEdgelistFile = true;
// Create a graph from a page file and an ad file
def createGraph(): Graph[VertexAttributes, Int] =
{
// Just needed for textFile() method to load an RDD from a textfile
// Cannot use the global Spark context because SparkContext cannot be serialized from master to worker
val sc = new SparkContext
// Parse a text file with the vertex information
val pages = sc.textFile("hdfs://ip-172-31-4-59:9000/user/butterflies/data/1K_nodes.txt")
.map { l =>
val tokens = l.split("\\s+") // split("\\s") will split on whitespace
val id = tokens(0).trim.toLong
val tokenList = tokens.last.split('|').toList
(id, tokenList)
}
println("********** NUMBER OF PAGES: " + pages.count + " **********")
// Parse a text file with the ad information
val ads = sc.textFile("hdfs://ip-172-31-4-59:9000/user/butterflies/data/1K_ads.txt")
.map { l =>
val tokens = l.split("\\s+") // split("\\s") will split on whitespace
val id = tokens(0).trim.toLong
val tokenList = tokens.last.split('|').toList
val next: VertexId = 0
val score = 0
//val vertexId: VertexId = id % 1000
val vertexId: VertexId = id
(vertexId, Ad(id, tokenList, next, score))
}
println("********** NUMBER OF ADS: " + ads.count + " **********")
// Check if we should simply read an edgelist file, or compute the edges from scratch
val edgeGraph =
if (readEdgelistFile)
{
// Create a graph from an edgelist file
GraphLoader.edgeListFile(sc, "hdfs://ip-172-31-4-59:9000/user/butterflies/data/1K_edges.txt")
}
else
{
// Create the edges between similar pages
// Create of list of all possible pairs of pages
// Check if any pair shares at least one token
// We only need the pair id's for the edgelist
val allPairs = pages.cartesian(pages).filter{ case (a, b) => a._1 < b._1 }
val similarPairs = allPairs.filter{ case (page1, page2) => page1._2.intersect(page2._2).length >= 1 }
val idOnly = similarPairs.map{ case (page1, page2) => Edge(page1._1, page2._1, 1)}
println("********** NUMBER OF EDGES: " + idOnly.count + " **********")
// Save the list of edges as a file, to be used instead of recomputing the edges every time
//idOnly.saveAsTextFile("hdfs://ip-172-31-4-59:9000/user/butterflies/data/saved_edges")
// Create a graph from an edge list RDD
Graph.fromEdges[Int, Int](idOnly, 1);
}
// Copy into a graph with nodes that have vertexAttributes
//val attributeGraph: Graph[VertexAttributes, Int] =
val attributeGraph =
edgeGraph.mapVertices{ (id, v) => new VertexAttributes(Nil, Nil, 0, 0, 0) }
// Add the node information into the graph
val nodeGraph = attributeGraph.outerJoinVertices(pages) {
(vertexId, attr, pageTokenList) =>
new VertexAttributes(List(Page(vertexId, pageTokenList.getOrElse(List.empty), 0)),
attr.ads, attr.step, attr.inDegree, attr.outDegree)
}
// Add the node degree information into the graph
val degreeGraph = nodeGraph
.outerJoinVertices(nodeGraph.inDegrees)
{
case (id, attr, inDegree) => new VertexAttributes(attr.pages, attr.ads, attr.step, inDegree.getOrElse(0), attr.outDegree)
}
.outerJoinVertices(nodeGraph.outDegrees)
{
case (id, attr, outDegree) =>
new VertexAttributes(attr.pages, attr.ads, attr.step, attr.inDegree, outDegree.getOrElse(0))
}
// Add the ads to the nodes
val adGraph = degreeGraph.outerJoinVertices(ads)
{
(vertexId, attr, ad) =>
{
if (ad.isEmpty)
{
new VertexAttributes(attr.pages, List.empty, attr.step, attr.inDegree, attr.outDegree)
}
else
{
new VertexAttributes(attr.pages, List(Ad(ad.get.id, ad.get.tokens, ad.get.next, ad.get.score)),
attr.step, attr.inDegree, attr.outDegree)
}
}
}
// Display the graph for debug only
if (debug)
{
println("********** GRAPH **********")
//printVertices(adGraph)
}
// return the generated graph
return adGraph
}
}