4

我正在使用 spark 2.0.1 并希望用列中最后一个众所周知的值填充 nan 值。

我能找到 Spark / Scala 的唯一参考火花:使用最后一次观察进行前向填充或使用似乎使用 RDD的 pyspark 使用先前已知的良好值填充 null 。

我宁愿呆在数据框/数据集世界中,并可能处理多个 nan 值。这可能吗?

我的假设是数据(最初从例如 CSV 文件加载按时间排序,并且此顺序保留在分布式设置中,例如按关闭/最后一个已知值填充是正确的。也许用以前的值填充对于大多数人来说就足够了记录连续没有2个或更多的nan记录。这真的成立吗?关键是a

myDf.sort("foo").show

会破坏任何订单,例如所有null值都将排在第一位。

一个小例子:

import java.sql.{ Date, Timestamp }
case class FooBar(foo:Date, bar:String)
val myDf = Seq(("2016-01-01","first"),("2016-01-02","second"),("2016-wrongFormat","noValidFormat"), ("2016-01-04","lastAssumingSameDate"))
         .toDF("foo","bar")
         .withColumn("foo", 'foo.cast("Date"))
         .as[FooBar]

结果是

+----------+--------------------+
|       foo|                 bar|
+----------+--------------------+
|2016-01-01|               first|
|2016-01-02|              second|
|      null|       noValidFormat|
|2016-01-04|lastAssumingSameDate|
+----------+--------------------+

我想用最后一个已知的值来修复这个值。我怎样才能做到这一点?

+----------+--------------------+
|       foo|                 bar|
+----------+--------------------+
|2016-01-01|               first|
|2016-01-02|              second|
|2016-01-02|       noValidFormat|
|2016-01-04|lastAssumingSameDate|
+----------+--------------------+

编辑

就我而言,填充上面行中的值就足够了,因为只有非常有限的错误值。

编辑2

我尝试添加一个索引列

val myDf = Seq(("2016-01-01", "first"), ("2016-01-02", "second"), ("2016-wrongFormat", "noValidFormat"), ("2016-01-04", "lastAssumingSameDate"))
    .toDF("foo", "bar")
    .withColumn("foo", 'foo.cast("Date"))
    .as[FooBar]
    .withColumn("rowId", monotonically_increasing_id())

然后填充最后一个值。

myDf.withColumn("fooLag", lag('foo, 1) over Window.orderBy('rowId)).show

但这会显示以下警告: 没有为窗口操作定义分区!将所有数据移动到单个分区,这可能会导致严重的性能下降。我怎样才能引入有意义的分区?

+----------+--------------------+-----+----------+
|       foo|                 bar|rowId|    fooLag|
+----------+--------------------+-----+----------+
|2016-01-01|               first|    0|      null|
|2016-01-02|              second|    1|2016-01-01|
|      null|       noValidFormat|    2|2016-01-02|
|2016-01-04|lastAssumingSameDate|    3|      null|
+----------+--------------------+-----+----------+
4

2 回答 2

2

//用我试过的最后一个未知的空值填充空字段, 这确实有效!

val dftxt1 = spark.read.option("header","true").option("sep","\t").csv("/sdata/ph/com/r/ph_com_r_ita_javelin/inbound/abc.txt").toDF("line_name", "merge_key", "line_id")
dftxt2.select("line_name","merge_key","line_id").write.mode("overwrite").insertInto("dbname.tablename")

val df = spark.sql("select * from dbname.tablename")

val Df1 = df.withColumn("rowId", monotonically_increasing_id())

import org.apache.spark.sql.expressions.Window

val partitionWindow = Window.orderBy("rowId")

val Df2 = Df1.withColumn("line_id", last("line_id", true) over (partitionWindow))

Df2.show
于 2019-06-11T13:36:45.547 回答
1

这是一个中间答案。但是,它不是很好,因为没有分区/只使用了一个分区。我仍在寻找更好的方法来解决问题

df
    .withColumn("rowId", monotonically_increasing_id())
    .withColumn("replacement", lag('columnWithNull, 1) over Window.orderBy('rowId))
    .withColumn("columnWithNullReplaced",
      when($"columnWithNull" isNull, "replacement").otherwise($"columnWithNull")

    )

编辑

我正在使用mapPartitionsWithIndex https://gist.github.com/geoHeil/6a23d18ccec085d486165089f9f430f2构建更好的解决方案尚未完成。

编辑2

添加

if (i == 0) {
          lastNotNullRow = toCarryBd.value.get(i + 1).get
        } else {
          lastNotNullRow = toCarryBd.value.get(i - 1).get
        }

将导致预期的结果。

于 2016-11-17T20:04:59.957 回答