我想通过 Java 中的 Spark 更新 MongoDb 中的特定集合。我正在使用MongoDB Connector for Hadoop从Apache Spark检索信息并将其保存到 Java 中的 MongoDb。
在关注 Sampo Niskanen 关于通过 Spark 检索和保存集合到 MongoDb 的优秀帖子之后,我陷入了更新集合的困境。
MongoOutputFormat.java包含一个采用 String[] updateKeys 的构造函数,我猜它是指一个可能的键列表,用于在现有集合上进行比较并执行更新。saveAsNewApiHadoopFile()
但是,使用带有参数的 Spark方法MongoOutputFormat.class
,我想知道如何使用该更新构造函数。
save.saveAsNewAPIHadoopFile("file:///bogus", Object.class, Object.class, MongoOutputFormat.class, config);
在此之前,MongoUpdateWritable.java被用于执行集合更新。从我在 Hadoop 上看到的示例来看,这通常设置为mongo.job.output.value
,在 Spark 中可能是这样的:
save.saveAsNewAPIHadoopFile("file:///bogus", Object.class, MongoUpdateWritable.class, MongoOutputFormat.class, config);
但是,我仍然想知道如何在MongoUpdateWritable.java
.
诚然,作为一种 hacky 方式,我已将对象的“_id”设置为我的文档的 KeyValue,以便在执行保存时,集合将覆盖具有相同 KeyValue 的文档_id
。
JavaPairRDD<BSONObject,?> analyticsResult; //JavaPairRdd of (mongoObject,result)
JavaPairRDD<Object, BSONObject> save = analyticsResult.mapToPair(s -> {
BSONObject o = (BSONObject) s._1;
//for all keys, set _id to key:value_
String id = "";
for (String key : o.keySet()){
id += key + ":" + (String) o.get(key) + "_";
}
o.put("_id", id);
o.put("result", s._2);
return new Tuple2<>(null, o);
});
save.saveAsNewAPIHadoopFile("file:///bogus", Object.class, Object.class, MongoOutputFormat.class, config);
我想通过 Spark 使用MongoOutputFormat
orMongoUpdateWritable
或执行 mongodb 集合更新Configuration
,最好使用该saveAsNewAPIHadoopFile()
方法。可能吗?如果没有,是否有任何其他方式不涉及专门将 _id 设置为我要更新的键值?