0

我正在尝试返回RDD[(String,String,String)],但我无法使用flatMap. 我试过了(tweetId, tweetBody, gender)(tweetId, tweetBody, gender)但它给了我一个类型不匹配的错误,你能指导我知道如何RDD[(String, String, String)]flatMap

override def transform(sqlContext: SQLContext, rdd: RDD[Array[Byte]], config: UserTransformConfig, logger: PhaseLogger): DataFrame = {
    val idColumnName = config.getConfigString("column_name").getOrElse("id")
    val bodyColumnName = config.getConfigString("column_name").getOrElse("body")
    val genderColumnName = config.getConfigString("column_name").getOrElse("gender")

    // convert each input element to a JsonValue
    val jsonRDD = rdd.map(r => byteUtils.bytesToUTF8String(r))

    val hashtagsRDD: RDD[(String,String, String)] = jsonRDD.mapPartitions(r => {
      // register jackson mapper (this needs to be instantiated per partition
      // since it is not serializable)
      val mapper = new ObjectMapper()
      mapper.registerModule(DefaultScalaModule)

      r.flatMap(tweet => tweet match {
        case _ :: tweet =>
        val rootNode = mapper.readTree(tweet)
        val tweetId = rootNode.path("id").asText.split(":")(2)
        val tweetBody = rootNode.path("body").asText
        val tweetVector =  new HashingTF().transform(tweetBody.split(" "))
        val result =genderModel.predict(tweetVector)
        val gender = if(result == 1.0){"Male"}else{"Female"}

        (tweetId, tweetBody, gender)
       // Array(1).map(x => (tweetId, tweetBody, gender))

      })

    })

    val rowRDD: RDD[Row] = hashtagsRDD.map(x => Row(x._1,x._2,x._3))
    val schema = StructType(Array(StructField(idColumnName,StringType, true),StructField(bodyColumnName, StringType, true),StructField(genderColumnName,StringType, true)))
    sqlContext.createDataFrame(rowRDD, schema)
  }
}
4

1 回答 1

0

尝试使用map而不是flatMap. flatMap当参数函数的结果类型是集合或RDD

flatMap当当前集合的每个元素都映射到零个或多个元素时,将使用Ie 。当map当前集合的每个元素都映射到一个元素时使用 while。

map with与函数类型中的符号A => B交换符号,即转换为ABRDD[A]RDD[B]

flatMap可以在monadic types中读为mapthenflatten。例如,您拥有 and和参数函数的类型为 simple will be 的结果,并且可以将这对事件简化为仅通过RDD[A]A => RDD[B]mapRDD[RDD[B]]RDD[B]flatten

这里是成功编译代码的示例。

import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.types.{StringType, StructField, StructType}

class UserTransformConfig {
  def getConfigString(name: String): Option[String] = ???
}

class PhaseLogger
object byteUtils {
  def bytesToUTF8String(r: Array[Byte]): String = ???
}

class HashingTF {
  def transform(strs: Array[String]): Array[Double] = ???
}

object genderModel {
  def predict(v: Array[Double]): Double = ???
}

def transform(sqlContext: SQLContext, rdd: RDD[Array[Byte]], config: UserTransformConfig, logger: PhaseLogger): DataFrame = {
  val idColumnName = config.getConfigString("column_name").getOrElse("id")
  val bodyColumnName = config.getConfigString("column_name").getOrElse("body")
  val genderColumnName = config.getConfigString("column_name").getOrElse("gender")

  // convert each input element to a JsonValue
  val jsonRDD = rdd.map(r => byteUtils.bytesToUTF8String(r))

  val hashtagsRDD: RDD[(String, String, String)] = jsonRDD.mapPartitions(r => {
    // register jackson mapper (this needs to be instantiated per partition
    // since it is not serializable)
    val mapper = new ObjectMapper
    mapper.registerModule(DefaultScalaModule)

    r.map { tweet =>
      val rootNode = mapper.readTree(tweet)
      val tweetId = rootNode.path("id").asText.split(":")(2)
      val tweetBody = rootNode.path("body").asText
      val tweetVector = new HashingTF().transform(tweetBody.split(" "))
      val result = genderModel.predict(tweetVector)
      val gender = if (result == 1.0) {"Male"} else {"Female"}

      (tweetId, tweetBody, gender)

    }
  })

  val rowRDD: RDD[Row] = hashtagsRDD.map(x => Row(x._1, x._2, x._3))
  val schema = StructType(Array(StructField(idColumnName, StringType, true), StructField(bodyColumnName, StringType, true), StructField(genderColumnName, StringType, true)))
  sqlContext.createDataFrame(rowRDD, schema)
}

请注意我应该从我的想象中带来多少,因为你没有提供最低限度的例子。一般来说,这样的问题不值得回答

于 2015-11-16T14:58:45.703 回答