0

我正在使用java中的Spark。我有一个名为Out1的JavaPairRDD

输出1:

IDCLIENT|INFO|    
1|A|    
1|C|    
1|H|    
5|R|    
2|B|

我想创建一个名为Out2的新JavaPairRDD ,它与Out1相同,但没有第一行:

输出2:

IDCLIENT2|INFO|    
1|C|    
1|H|    
5|R|    
2|B|

之后,我想像这样组合这两个JavaPairRDD

输出3:

IDCLIENT|INFO|IDCLIENT2|
1|A,C|1|   
1|C,H|1|   
1|H,R|5|    
5|R,B|2|    
2|B| |

注意:我们不能使用groupByKey,因为我们可以在多行中拥有相同的键。

4

1 回答 1

1

RDD 中没有“删除”操作,因此删除一行比预期的要困难一些。

我会使用zipWithIndex和加入索引来处理它。它有点重,但会完成工作:

val indexed1 = out1.zipWithIndex
val indexed2 = indexed1.map{(k,v) => ((k-1),v)}
val joined = indexed1 join indexed2
val out3 = joined.map{case (k,(v1,v2)) => format(v1,v2)}
// where format gets the values in the desired output layout

如果数据集适合内存,我会做一个普通的 Scala 'oneliner':

out1.zip(out1.drop(1)).map{case (o1, o2) => format(o1,o2)}
于 2014-10-28T16:20:30.287 回答