4

我想通过 Java 中的 Spark 更新 MongoDb 中的特定集合。我正在使用MongoDB Connector for HadoopApache Spark检索信息并将其保存到 Java 中的 MongoDb。

在关注 Sampo Niskanen 关于通过 Spark 检索和保存集合到 MongoDb 的优秀帖子之后,我陷入了更新集合的困境。

MongoOutputFormat.java包含一个采用 String[] updateKeys 的构造函数,我猜它是指一个可能的键列表,用于在现有集合上进行比较并执行更新。saveAsNewApiHadoopFile()但是,使用带有参数的 Spark方法MongoOutputFormat.class,我想知道如何使用该更新构造函数。

save.saveAsNewAPIHadoopFile("file:///bogus", Object.class, Object.class, MongoOutputFormat.class, config);

在此之前,MongoUpdateWritable.java被用于执行集合更新。从我在 Hadoop 上看到的示例来看,这通常设置为mongo.job.output.value,在 Spark 中可能是这样的:

save.saveAsNewAPIHadoopFile("file:///bogus", Object.class, MongoUpdateWritable.class, MongoOutputFormat.class, config);

但是,我仍然想知道如何在MongoUpdateWritable.java.

诚然,作为一种 hacky 方式,我已将对象的“_id”设置为我的文档的 KeyValue,以便在执行保存时,集合将覆盖具有相同 KeyValue 的文档_id

JavaPairRDD<BSONObject,?> analyticsResult; //JavaPairRdd of (mongoObject,result)
JavaPairRDD<Object, BSONObject> save = analyticsResult.mapToPair(s -> {
    BSONObject o = (BSONObject) s._1;

    //for all keys, set _id to key:value_
    String id = "";
    for (String key : o.keySet()){
        id += key + ":" + (String) o.get(key) + "_";
    }
    o.put("_id", id);

    o.put("result", s._2);
    return new Tuple2<>(null, o);
});

save.saveAsNewAPIHadoopFile("file:///bogus", Object.class, Object.class, MongoOutputFormat.class, config);

我想通过 Spark 使用MongoOutputFormatorMongoUpdateWritable或执行 mongodb 集合更新Configuration,最好使用该saveAsNewAPIHadoopFile()方法。可能吗?如果没有,是否有任何其他方式不涉及专门将 _id 设置为我要更新的键值?

4

1 回答 1

7

I tried several combination of config.set("mongo.job.output.value","....") and several combination of

.saveAsNewAPIHadoopFile(
        "file:///bogus",
        classOf[Any],
        classOf[Any],
        classOf[com.mongodb.hadoop.MongoOutputFormat[Any, Any]],
        mongo_config
      )

and none of them worked.

I made it to work by using MongoUpdateWritable class as output of my map method:

items.map(row => {
      val mongo_id = new ObjectId(row("id").toString)
      val query = new BasicBSONObject()
      query.append("_id", mongo_id)
      val update = new BasicBSONObject()

      update.append("$set", new BasicBSONObject().append("field_name", row("new_value")))
      val muw = new MongoUpdateWritable(query,update,false,true)
      (null, muw)
    })
     .saveAsNewAPIHadoopFile(
       "file:///bogus",
       classOf[Any],
       classOf[Any],
       classOf[com.mongodb.hadoop.MongoOutputFormat[Any, Any]],
       mongo_config
     )

The raw query executed in mongo is then something like this:

2014-11-09T13:32:11.609-0800 [conn438] update db.users query: { _id: ObjectId('5436edd3e4b051de6a505af9') } update: { $set: { value: 10 } } nMatched:1 nModified:0 keyUpdates:0 numYields:0 locks(micros) w:24 3ms
于 2014-11-09T21:45:02.430 回答