0

对不起,我需要再问一个问题。我希望这个不要重复。我编辑了最后一个,但我认为没有人看到编辑后的版本。这是问题的一个简短示例:

val spark = SparkSession
.builder()
.appName("test")
.getOrCreate()

val field = StructField("1", BooleanType, false)
val schema = StructType(field::Nil)
val rowRDD = spark.sparkContext.parallelize(Array(Row(true),Row(false)))
val df = spark.createDataFrame(rowRDD, schema)

val new_df = //Add hundred of new columns

//here is the error
val df_2 = new_df.flatMap(row => if(test(row)) row::Nil else Nil)

错误:

error: Unable to find encoder for type stored in a Dataset.  
Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  
Support for serializing other types will be added in future releases.

我想要做的是,修改每一行。在这种情况下,我知道只有 1 列,我可以在尝试将数据帧行映射到更新的行时像编码器错误一样处理它。但是,如果我有数百列,我该如何解决这个问题?如果某些行不满足条件,我想删除它们。目前我使用:

val df_2 = new_df.rdd.flatMap(row => if(test(row)) row::Nil else Nil)

但我不认为这是最好的解决方案。我也在StackoverflowError中运行:

Exception in thread "main" java.lang.StackOverflowError
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at scala.collection.immutable.List$SerializationProxy.writeObject(List.scala:468)

TY寻求帮助:)

4

1 回答 1

0

添加新列的 withColumn() 选项将适用于整个数据集。如果你有更多的列,它会让事情变得更糟。您可以使用 Spark SQL 并使用 SQL 样式的查询来添加新列。这将需要更多的 sql 技能而不仅仅是 spark。并且有 100 列,可能维护会很困难。

您可以采用另一种方法。

您可以将 rdd 转换为数据框。然后在数据框上使用 map 并根据需要处理每一行。内部地图方法,

一个。您可以根据计算收集新值

湾。如下将这些新列值添加到主 rdd

val newColumns: Seq[Any] = Seq(newcol1,newcol2)
Row.fromSeq(row.toSeq.init ++ newColumns)

这里的row,是map方法中row的引用

C。如下创建新架构

val newColumnsStructType = StructType{Seq(new StructField("newcolName1",IntegerType),new StructField("newColName2", IntegerType))

d。添加到旧架构

val newSchema = StructType(mainDataFrame.schema.init ++ newColumnsStructType)

e. 使用新列创建新数据框

val newDataFrame = sqlContext.createDataFrame(newRDD, newSchema)
于 2016-12-05T16:08:51.983 回答