1

我是 Rust 新手,我正在使用默认的 MongoDB 驱动程序 https://docs.rs/mongodb/2.0.0/mongodb/

我记得在使用 Node.js 编码时,有可能使用一些 Promise.all() 发送事务,以便同时执行所有事务以进行优化,如果没有错误,则提交到交易。(此处为 Node.js 示例:https ://medium.com/@alkor_shikyaro/transactions-and-promises-in-node-js-ca5a3aeb6b74 )

我现在正在尝试使用 try_join 在 Rust 中实现相同的逻辑!但我一直反对这个问题:

错误:一次不能多次借用sessionmutable;标签:第一个可变借用发生在这里

use mongodb::{bson::oid::ObjectId, Client, Database, options};
use async_graphql::{
    validators::{Email, StringMaxLength, StringMinLength},
    Context, ErrorExtensions, Object, Result,
};
use futures::try_join;
//use tokio::try_join; -> same thing

#[derive(Default)]
pub struct UserMutations;

#[Object]
impl UserMutations {


async fn user_followed<'ctx>(
        &self,
        ctx: &Context<'ctx>,
        other_user_id: ObjectId,
        current_user_id: ObjectId,
    ) -> Result<bool> {

    let mut session = Client::with_uri_str(dotenv!("URI"))
        .await
        .expect("DB not accessible!")
        .start_session(Some(session_options))
        .await?;

    session.start_transaction(Some(options::TransactionOptions::builder()
            .read_concern(Some(options::ReadConcern::majority()))
            .write_concern(Some(
                options::WriteConcern::builder()
                    .w(Some(options::Acknowledgment::Majority))
                    .w_timeout(Some(Duration::new(3, 0)))
                    .journal(Some(false))
                    .build(),
            ))
            .selection_criteria(Some(options::SelectionCriteria::ReadPreference(
                options::ReadPreference::Primary
            )))
            .max_commit_time(Some(Duration::new(3, 0)))
            .build())).await?; 
    
   
    let db = Client::with_uri_str(dotenv!("URI"))
        .await
        .expect("DB not accessible!").database("database").collection::<Document>("collection");

             try_join!(
                db.update_one_with_session(
                    doc! {
                        "_id": other_user_id
                    },
                    doc! {
                        "$inc": { "following_number": -1 }
                    },
                    None,
                    &mut session,
                ),
                db.update_one_with_session(
                    doc! {
                        "_id": current_user_id
                    },
                    doc! {
                        "$inc": { "followers_number": -1 }
                    },
                    None,
                    &mut session,
                )
            )?;
    
    Ok(true)
  }
}

849 | |                     &mut session,
    | |                     ------------ first mutable borrow occurs here
...   |
859 | |                     &mut session,
    | |                     ^^^^^^^^^^^^ second mutable borrow occurs here
860 | |                 )
861 | |             )?;
    | |_____________- first borrow later captured here by closure

有什么方法可以同步发送事务函数,以免在独立突变上浪费任何时间?有没有人有任何想法?提前致谢!

4

2 回答 2

3

谢谢,Patrick 和 Zeppi 的回答,我对这个主题做了更多的研究,也做了我自己的测试。那么,让我们开始吧。

首先,我希望尽可能优化事务性写入,因为我想要代码逻辑所需的完全回滚可能性。

如果您错过了我对帕特里克的评论,我将在这里重述它们,以更好地反映我对此的思考方式:

我理解为什么这会限制多次读取,但是如果所有操作都在单独的集合上(或者是对具有不同有效负载的多个文档的独立原子写入),我不明白为什么在同时执行它们时不可能保持随意的一致性。这种事务不应该产生竞争条件/冲突/奇怪的锁定行为,并且在发生错误的情况下,整个事务在被提交之前会回滚。

与 Git 进行类比(这可能是错误的),更新单独的文件/文件夹时不会产生合并冲突。抱歉,我很抱歉,这听起来像是一个重要的提速机会。

但是,查找后我反对这个文档: https ://github.com/mongodb/specifications/blob/master/source/sessions/driver-sessions.rst#why-does-a-network-error-cause-the -serversession-to-be-discarded-from-the-pool

恰好使用同一服务器会话的其他不相关操作可能会阻塞等待前一个操作完成。例如,事务性写入将阻塞后续的事务性写入。

基本上,这意味着即使您将并发发送事务写入,您也不会获得太多效率,因为 MongoDB 本身就是一个阻塞器。我决定检查这是否属实,并且由于 NodeJS 驱动程序设置允许同时发送事务(根据:https ://medium.com/@alkor_shikyaro/transactions-and-promises-in-node-js-ca5a3aeb6b74 )我做到了使用 NodeJS 快速设置,指向免费层中由 Atlas 托管的同一数据库。

其次,统计数据和代码:这是我将用于测试的 NodeJS 突变(每个测试有 4 个事务写入)。我启用了 GraphQL 跟踪来对此进行基准测试,这是我的测试结果......

export const testMutFollowUser = async (_parent, _args, _context, _info) => {
  try {

    const { user, dbClient } = _context;
    isLoggedIn(user);
    const { _id } = _args;


    const session = dbClient.startSession();
    const db = dbClient.db("DB");

    await verifyObjectId().required().validateAsync(_id);

    //making sure asked user exists
    const otherUser = await db.collection("users").findOne(
      { _id: _id },
      {
        projection: { _id: 1 }
      });


    if (!otherUser)
      throw new Error("User was not found");
    

    const transactionResult = session.withTransaction(async () => {
        
        //-----using this part when doing concurrency test------

        await Promise.all([
          await createObjectIdLink({ db_name: 'links', from: user._id, to: _id, db }),
          await db.collection('users').updateOne(
            { _id: user._id },
            { $inc: { following_number: 1 } },

          ),
          await db.collection('users').updateOne(
            { _id },
            {
              $inc: { followers_number: 1, unread_notifications_number: 1 }
            },

          ),

          await createNotification({
            action: 'USER_FOLLOWED',
            to: _id
          }, _context)

        ]);
        //-----------end of concurrency part--------------------
        

        
        //------using this part when doing sync test--------

        //this as a helper for db.insertOne(...)
        const insertedId = await createObjectIdLink({ db_name: 'links', from: user._id, to: _id, db });


        const updDocMe = await db.collection('users').updateOne(
          { _id: user._id },
          { $inc: { following_number: 1 } },

        );

        const updDocOther = await db.collection('users').updateOne(
          { _id },
          {
            $inc: { followers_number: 1, unread_notifications_number: 1 }
          },

        );
        
        //this as another helper for db.insertOne(...)
        await createNotification({
          action: 'USER_FOLLOWED',
          to: _id
        }, _context);
        //-----------end of sync part---------------------------


        return true;


      }, transactionOptions);

      if (transactionResult) {
        console.log("The reservation was successfully created.");
      } else {
        console.log("The transaction was intentionally aborted.");
      }

      await session.endSession();

      return true;


    }

及相关性能结果:

format: 
Request/Mutation/Response = Total (all in ms)

1) For sync writes in the transaction:

4/91/32 = 127
4/77/30 = 111
7/71/7 = 85
6/66/8 = 80
2/74/9 = 85
4/70/8 = 82
4/70/11 = 85
--waiting more time (~10secs)
9/73/34 = 116

totals/8 = **96.375 ms in average**

//---------------------------------

2) For concurrent writes in transaction:

3/85/7 = 95
2/81/14 = 97
2/70/10 = 82
5/81/11 = 97
5/73/15 = 93
2/82/27 = 111
5/69/7 = 81
--waiting more time (~10secs)
6/80/32 = 118

totals/8 = ** 96.75 ms ms in average **

结论:两者之间的差异在误差范围内(但仍然在同步方面)。

我的假设是使用同步方式,您要花时间等待数据库请求/响应,而在并发方式中,您正在等待 MongoDB 对请求进行排序,然后全部执行,这在结束时一天将花费相同的时间。

因此,我想,对于当前的 MongoDB 策略,我的问题的答案将是“不需要并发,因为它无论如何都不会影响性能。” 但是,如果 MongoDB 允许在未来版本中使用文档级别的锁(至少对于 WiredTiger 引擎)而不是数据库级别的锁来并行化事务中的写入,那将是令人难以置信的,因为它目前用于事务(因为您正在等待整个写完直到下一个)。

如果我错过/误解了某些内容,请随时纠正我。谢谢!

于 2021-10-09T06:42:40.127 回答
2

这种限制实际上是设计使然。在 MongoDB 中,客户端会话不能同时使用(参见此处此处),因此 Rust 驱动程序接受它们&mut以防止在编译时发生这种情况。Node 示例只是偶然工作,绝对不推荐或支持的行为。如果您希望将这两个更新作为事务的一部分执行,则必须在另一个更新之后运行一个更新。如果您想同时运行它们,则需要在没有会话或事务的情况下执行它们。

作为旁注,客户端会话只能与创建它的客户端一起使用。在提供的示例中,会话正在与另一个会话一起使用,这将导致错误。

于 2021-10-07T18:10:19.413 回答