4

对于自定义 Estimator 的 transformSchema 方法,我需要能够将输入数据框的模式与案例类中定义的模式进行比较。通常这可以像从案例类中生成 Spark StructType / Schema一样执行,如下所述。但是,使用了错误的可空性:

由 df 推断的真实模式spark.read.csv().as[MyClass]可能如下所示:

root
 |-- CUSTOMER_ID: integer (nullable = false)

案例类:

case class MySchema(CUSTOMER_ID: Int)

为了比较我使用:

val rawSchema = ScalaReflection.schemaFor[MySchema].dataType.asInstanceOf[StructType]
  if (!rawSchema.equals(rawDf.schema))

不幸的是,这总是产生false,因为从案例类手动推断的新模式设置为可空true(因为 ja java.Integer 实际上可能为空)

root
 |-- CUSTOMER_ID: integer (nullable = true)

nullable = false创建架构时如何指定?

4

1 回答 1

10

可以说,您正在混合实际上不属于同一空间的事物。ML Pipelines 本质上是动态的,引入静态类型的对象并没有真正改变这一点。

此外,一个类的模式定义为:

case class MySchema(CUSTOMER_ID: Int)

将不能为空CUSTOMER_IDscala.Int不一样java.lang.Integer

scala> import org.apache.spark.sql.catalyst.ScalaReflection.schemaFor
import org.apache.spark.sql.catalyst.ScalaReflection.schemaFor

scala> case class MySchema(CUSTOMER_ID: Int)
defined class MySchema

scala> schemaFor[MySchema].dataType
res0: org.apache.spark.sql.types.DataType = StructType(StructField(CUSTOMER_ID,IntegerType,false))

话虽如此,如果您想要nullable字段Option[Int]

case class MySchema(CUSTOMER_ID: Option[Int])

如果你不想像Int上面那样使用 null 。

您在这里遇到的另一个问题是,对于csv每个字段,根据定义都可以为空,并且此状态由编码的Dataset. 所以在实践中:

spark.read.csv(...)

总是会导致:

root
 |-- CUSTOMER_ID: integer (nullable = true)

这就是为什么你会得到架构不匹配的原因。不幸的是,无法覆盖nullable不强制为空性约束的源的字段,例如csvor json

如果没有可空架构是一项硬性要求,您可以尝试:

spark.createDataFrame(
  spark.read.csv(...).rdd,
  schemaFor[MySchema].dataType.asInstanceOf[StructType]
).as[MySchema]

仅当您知道数据实际上是null免费的时,此方法才有效。任何null值都会导致运行时异常。

于 2016-11-27T15:20:09.300 回答