0

我对 Spark 和 Scala 比较陌生……我有一个图:Graph[Int, String],我想将我在 DataFrame 中的一些属性附加到这些顶点。

我需要做的是,对于每个顶点,找到每个属性的邻域平均值。到目前为止,这是我的方法,但我不明白如何正确映射从两个数据框的连接中获得的行:

val res = graph.collectNeighbors(EdgeDirection.Either)
         .toDF("ID", "neighbours")
         .join(aDataFrameWithProperties, "ID")
         .map{x => // this is where I am lost
         }

我认为我的方法不正确,因为我将每个顶点的属性与其邻居的数组连接起来,但我仍然不知道邻居的属性值......

编辑

一些数据来帮助理解我想要完成的事情......说你在这个答案中构建图表如何从 Spark 中的数据框创建 EdgeRDD

val sqlc : SQLContext = ???

case class Person(id: Long, country: String, age: Int)

val testPeople = Seq(
   Person(1, "Romania"    , 15),
   Person(2, "New Zealand", 30),
   Person(3, "Romania"    , 17),
   Person(4, "Iceland"    , 20),
   Person(5, "Romania"    , 40),
   Person(6, "Romania"    , 44),
   Person(7, "Romania"    , 45),
   Person(8, "Iceland"    , 21),
   Person(9, "Iceland"    , 22)
 )

 val people = sqlc.createDataFrame(testPeople)
 val peopleR = people
   .withColumnRenamed("id"     , "idR")
   .withColumnRenamed("country", "countryR")
   .withColumnRenamed("age"    , "ageR")

 import org.apache.spark.sql.functions._

 val relations = people.join(peopleR,
       (people("id") < peopleR("idR")) &&
         (people("country") === peopleR("countryR")) &&
         (abs(people("age") - peopleR("ageR")) < 5))

 import org.apache.spark.graphx._

 val edges = EdgeRDD.fromEdges(relations.map(row => Edge(
       row.getAs[Long]("id"), row.getAs[Long]("idR"), ())))

 val users = VertexRDD.apply(people.map(row => (row.getAs[Int]("id").toLong, row.getAs[Int]("id").toInt)))

 val graph = Graph(users, edges)

然后你有一个数据框,如:

case class Person(id:Long, gender:Int, income:Int)
val properties = Seq(
  Person(1, 0, 321),
  Person(2, 1, 212),
  Person(3, 0, 212),
  Person(4, 0, 122),
  Person(5, 1, 898),
  Person(6, 1, 212),
  Person(7, 1, 22),
  Person(8, 0, 8),
  Person(9, 0, 212)
)

val people = sqlc.createDataFrame(properties)

我想计算,对于每个顶点,平均性别是多少,邻居的平均收入是多少,以 DataFrame 形式返回

4

1 回答 1

1

一般来说,您应该使用图形运算符而不是将所有内容都转换为 aDataFrame但这样的事情应该可以解决问题:

import org.apache.spark.sql.functions.{explode, avg}

val statsDF = graph.collectNeighbors(EdgeDirection.Either)
  .toDF("ID", "neighbours")
  // Flatten neighbours column
  .withColumn("neighbour", explode($"neighbours"))
  // and extract neighbour id
  .select($"ID".alias("this_id"), $"neighbour._1".alias("other_id"))
  // join with people 
  .join(people, people("ID") === $"other_id")
  .groupBy($"this_id")
  .agg(avg($"gender"), avg($"income"))

如果我想计算性别而不是平均值,例如,性别=我自己的性别的邻居数量,然后找到所有连接的平均值,该怎么办?

为此,您需要两个单独的连接 - 一个 onthis_id和一个 on ohter_id。接下来,您可以简单地使用以下表达式进行聚合:

avg((this_gender === other_gender).cast("integer"))

关于图形运算符,您可以使用一些操作。对于初学者,您可以使用连接操作向顶点添加属性:

val properties: RDD[(VertexId, (Int, Int))] = sc.parallelize(Seq(
  (1L, (0, 321)), (2L, (1, 212)), (3L, (0, 212)),
  (4L, (0, 122)), (5L, (1, 898)), (6L, (1, 212)),
  (7L, (1, 22)), (8L, (0, 8)), (9L, (0, 212))
))

val graphWithProperties = graph
  .outerJoinVertices(properties)((_, _, prop) => prop)
  // For simplicity this assumes no missing values 
  .mapVertices((_, props) => props.get) 

接下来我们可以聚合消息来创建新的VertexRDD

val neighboursAggregated = graphWithProperties
  .aggregateMessages[(Int, (Int, Int))](
    triplet => {
      triplet.sendToDst(1, triplet.srcAttr)
      triplet.sendToSrc(1, triplet.dstAttr)
    },
    {case ((cnt1, (age1, inc1)), (cnt2, (age2, inc2))) =>
      (cnt1 + cnt2, (age1 + age2, inc1 + inc2))}
  )

最后,我们可以替换现有属性:

graphWithProperties.outerJoinVertices(neighboursAggregated)(
  (_, oldProps, newProps) => newProps match {
    case Some((cnt, (gender, inc))) => Some(
      if (oldProps._1 == 1) gender.toDouble / cnt
      else  1 - gender.toDouble / cnt,
      inc.toDouble / cnt
    )
    case _ => None
  })

如果您只对值感兴趣,您可以传递所有必需的值aggregateMessages并省略第二个outerJoinVertices

于 2015-12-09T23:24:31.660 回答