2

我正在使用具有以下架构的 Spark 2.0 Dataframes 处理地理空间数据:

root
 |-- date: timestamp (nullable = true)
 |-- lat: double (nullable = true)
 |-- lon: double (nullable = true)
 |-- accuracy: double (nullable = true)
 |-- track_id: long (nullable = true)

我已经看到位置信号跳到了完全不同的地方。奇怪的是,信号会保持一段时间,比如在远程位置大约 25 秒或 5 个样本,然后跳回我所在的位置。

我想通过计算点之间的速度来计算当前和最后一个有效行之间的速度来消除异常值。如果速度高于给定阈值,则应删除当前行,最后一个有效行保持不变。如果速度低于阈值,则将当前行添加到结果数据框中并成为新的最后一个有效行。

我试图通过使用 mapPartition 函数来实现这一点:

case class OutlierFilterTrackEntry(val dateSec: Long, val track_id: Long, val lat: Double, val lon: Double, val accuracy: Double)

def filterOutlier(iter : Iterator[OutlierFilterTrackEntry]) :  Iterator[OutlierFilterTrackEntry] = {
    val maxSpeed = 1000 / 3.6 // 1000 km/h in m/s
    val buf = scala.collection.mutable.ListBuffer.empty[OutlierFilterTrackEntry]
    var lastValid : OutlierFilterTrackEntry = null
    while (iter.hasNext){
        val cur = iter.next;
        if(lastValid == null){
            lastValid = cur
        }else{
            //val distance = calculateDistance(cur.lat, cur.lon, lastValid.lat, lastValid.lon)
            val distance = GeometryEngine.geodesicDistanceOnWGS84(new Point(cur.lon, cur.lat), new Point(lastValid.lon, lastValid.lat))
            val diffTime = (cur.dateSec - lastValid.dateSec) 
            val speed = distance / diffTime
            println(speed)
            if(speed < maxSpeed) {
                lastValid = cur
                buf.append(cur)
            }
        }
    }
    buf.toList.iterator
}

val numPartitions = tracks6.select($"track_id").distinct().count.toInt
println(numPartitions)
val partitionedTracks = tracks6.select(col("date").cast("BigInt").alias("dateSec"), $"track_id", $"lat", $"lon", $"accuracy")
                               .orderBy($"date")
                               .repartition(numPartitions, $"track_id")

val tracksWithoutOutliers = partitionedTracks
                                    .as[OutlierFilterTrackEntry]
                                    .mapPartitions(filterOutlier)

println(tracksWithoutOutliers.count)
tracksWithoutOutliers.toDF.show(400)

但我收到以下错误:

org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: org.apache.spark.sql.expressions.WindowSpec
Serialization stack:
    - object not serializable (class: org.apache.spark.sql.expressions.WindowSpec, value: org.apache.spark.sql.expressions.WindowSpec@711a5138)
    - field (class: linebb5d1438a8a14914aa58d2465d24fa2a59.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw, name: windowSpecAllPrev, type: class org.apache.spark.sql.expressions.WindowSpec)

欢迎任何其他方法。

4

0 回答 0