我正在对缓存在内存中的 rdd 执行 mapPartitions,然后执行 reduce。这是我的代码片段
// myRdd is an rdd consisting of Tuple2[Int,Long]
myRdd.mapPartitions(rangify).reduce( (x,y) => (x._1+y._1,x._2 ++ y._2))
//The rangify function- For each partition, it's adding the first element of the tuples & constructing ranges from the second element of the tuples
def rangify(l: Iterator[ Tuple2[Int,Long] ]) : Iterator[ Tuple2[Long, List [ ArrayBuffer[ Tuple2[Long,Long] ] ] ] ]= {
var sum=0L
val mylist=ArrayBuffer[ Tuple2[Long,Long] ]()
if(l.isEmpty)
return List( (0L,List [ ArrayBuffer[ Tuple2[Long,Long] ] ] ())).toIterator
var prev= -1000L
var begin= -1000L
for (x <- l){
sum+=x._1
if(prev<0){
prev=x._2
begin=x._2
}
else if(x._2==prev+1)
prev=x._2
else {
mylist+=((begin,prev))
prev=x._2
begin=x._2
}
}
mylist+= ((begin,prev))
List((sum, List(mylist) ) ).toIterator
}
rdd 缓存在内存中。我使用 20 个执行器,每个执行器有 1 个核心。缓存的 rdd 有 60 个块。问题是每运行 2-3 次作业,就会有一个反序列化时间异常长的任务。附上截图
这种行为的原因可能是什么?
PS - 1. 在所有情况下我都没有得到这种行为。我做了很多相同的工作,我在大约 40% 的情况下得到了这种行为
运行的 Spark 日志 - http://pastebin.com/jnqTzPXS