1

我正在尝试从 a 插入 java 对象Flux并为每个对象生成响应,无论成功还是失败。基本上代码如下所示:

private ReactiveMongoTemplate mongoTemplate;

public <T extends MyData, U extends MyResult> Flux<U> doTransfer(Flux<T> input, String collectionName) {
    return input.buffer(100)
        .map(lst -> mongoTemplate
                .insertAll(lst)
                .map(mydata -> generateResult(mydata))
                .onErrorResume(DataAccessException.class, ex -> fluxForErrorCase(ex, lst))
            )
       .flatMap(Function.identity());
}

private <T extends MyData, U Extends MyResult> Flux<U> fluxForErrorCase(DataAccessException ex, List<T> input) {
   // I only have #inserted, #updated and error message here.
   // My input is not mutated to have non-null _ids, either
}

通过定义唯一性约束并发送适当的数据,我可以为提交的批次获得一个异常(一个有MongoBulkWriteException原因的),但我想知道哪些行到达了我的数据库以及我遇到了哪些问题。此外,一些对象被写入数据库,但它们没有被变异为生成 id。

我知道可以通过在多文档事务中插入(需要用于简单集成测试的副本集)或自己生成_id(需要查询插入的值,或假设错误导致不写入后续项目)来替代解决方案但我想知道这里是否有更简洁的错误处理方法。

4

1 回答 1

0

为了提供“一个”解决方案,我想分享我的方法:

MongoDB 批量写入有两种操作模式:有序和无序。有序模式意味着一旦遇到错误就不会执行后续操作。

浏览源代码后ReactiveMongoTemplate,可以看到ReactiveMongoTemplate.insertAll()call 最终归结为MongoCollection.insertMany()带有 default的 a InsertManyOptions,其中操作模式是有序的。至少,这是 2.1.9.RELEASE 的情况。

因此,_id在客户端生成值并考虑未插入的文档和以下文档(如在给定的代码片段中)是可行的。

于 2020-01-20T12:25:27.920 回答