0

不确定这是否是一个好问题,但我希望除了我的主要问题之外,如果嵌套并行化是否不好,答案也可以涵盖。太感谢了!

假设我有任务 a、b、c 和 d。这些任务每个都有其单独的子任务,a1、a2 等等。

有时,a1 可能还有一堆子任务要完成。任务嵌套在 a、b、c 和 d 中,因为它们依赖于作为一个整体一起处理,因此没有办法将它们解耦并创建任务 e、f 和 g(等等..)。

我目前正在利用 Futures 和 Threadpool以并发、并行的方式处理这些任务(例如 a、b.. 并行运行,a1、a2 并行运行)。

然而,我受到了欢迎

thread 'tokio-runtime-worker' panicked at 'called `Result::unwrap()` on an `Err` value: Os { code: 35, kind: WouldBlock, message: "Resource temporarily unavailable" }', src/my_folder/my_file.rs:123:123

上面失败的代码是

let thread_pool = ThreadPool::new().unwrap();

所以问题是:

  1. 在这种情况下嵌套并行化是否正确?(如果不是,我该怎么办?如果是,我该怎么办?)
  2. 如何确保安全地创建线程池,以便在资源不足时不会失败展开?
  3. 如果 (2) 是错误的做法,我应该如何实现我的代码?
let tasks: Vec<_> = block_transactions.into_iter()
    .map(|encoded_tx| {
        let pool = core_pool.clone();
        let rpc_url = core_rpc_url.clone();
        let block = core_block.clone();

        tokio::spawn(async move {
            let sol_client = RpcClient::new(rpc_url.clone());

            let mut tx = crate::models::transactions::Transaction {
                hash: "".to_string(),
                block: block.number.clone(),
                is_confirmed: true,
                status: TransactionStatus::Success,
                fee: 0,
                timestamp: block.timestamp.clone()
            };

            if let Some(decoded_tx) = (&encoded_tx.transaction).decode() {
                // Each item in the signatures array is a digital signature
                // of the given message

                // Additional safety net
                if decoded_tx.signatures.len() >= 1 {
                    // Signatures should be Vec<Signature>.
                    // The first one, signatures[0], is the hash that is used to
                    // identify the transaction (eg. in the explorer), and you can
                    // get the base58-encoded string using the .to_string() method.
                    // Seed first signature
                    tx.hash = decoded_tx.signatures[0].to_string();

                    // Seed the transaction
                    let tx_res = crate::actions::transactions::create_or_ignore_transaction(
                        &*pool.get().unwrap(),
                        &tx,
                    );

                    // Seed everything that depends ONLY on the created transaction
                    if let Ok(created_tx) = tx_res {
                        // Spawn the futures threader first
                        let thread_pool = ThreadPool::new().unwrap();

                        // Seed subsequents
                        let tst_pool = pool.clone();
                        let tst_dtx = decoded_tx.clone();
                        let tst_tx = tx.clone();
                        let transaction_signatures_task = async move {
                            let transaction_signatures: Vec<_> = tst_dtx
                                .signatures
                                .as_slice()
                                .into_par_iter()
                                .filter(|signature| &signature.to_string() != &tst_tx.hash)
                                .map(|signature| {
                                    // Subsequent signature are lead to the same tx, just
                                    // signed from a different key pair.
                                    InsertableTransactionSignature {
                                        transaction_hash: tst_tx.hash.clone(),
                                        signature: signature.to_string().clone(),
                                        timestamp: tst_tx.timestamp.clone(),
                                    }
                                })
                                .collect();

                            let cts_result =
                                crate::actions::transaction_signatures::create_transaction_signatures(
                                    &*tst_pool.get().unwrap(),
                                    transaction_signatures.as_slice(),
                                );

                            if let Ok(created_ts) = cts_result {
                                if created_ts.len() != (tst_dtx.signatures.len() - 1) {
                                    eprintln!("[processors/transaction] WARN: Looks like there's a signature \
                        creation count mismatch for tx {}", tst_tx.hash.clone());
                                }
                            } else {
                                let cts_result_err = cts_result.err().unwrap();
                                eprintln!(
                                    "[processors/transaction] FATAL: signature seeding error for \
                    tx {} due to: {}",
                                    tst_tx.hash.clone(),
                                    cts_result_err.to_string()
                                );
                            }
                        };
                        let tst_handle = thread_pool
                            .spawn_with_handle(transaction_signatures_task)
                            .unwrap();

                        // A message contains a header,
                        // The message header contains three unsigned 8-bit values.
                        // The first value is the number of required signatures in the
                        // containing transaction. The second value is the number of those
                        // corresponding account addresses that are read-only.
                        // The third value in the message header is the number of read-only
                        // account addresses not requiring signatures.
                        // identify which are required addresses, addresses requesting write
                        // access, then address with readonly access
                        let req_sig_count =
                            (decoded_tx.message.header.num_required_signatures.clone()) as usize;
                        let rw_sig_count = (decoded_tx
                            .message
                            .header
                            .num_readonly_signed_accounts
                            .clone()) as usize;
                        let ro_sig_count = (decoded_tx
                            .message
                            .header
                            .num_readonly_unsigned_accounts
                            .clone()) as usize;

                        // tx.message.account_keys is a compact-array of account addresses,
                        // The addresses that require signatures appear at the beginning of the
                        // account address array, with addresses requesting write access first
                        // and read-only accounts following. The addresses that do not require
                        // signatures follow the addresses that do, again with read-write
                        // accounts first and read-only accounts following.
                        let at_pool = pool.clone();
                        let at_tx = tx.clone();
                        let at_dtx = decoded_tx.clone();
                        let at_block = block.clone();
                        let accounts_task = async move {
                            crate::processors::account::process_account_data(
                            &at_pool,
                            &sol_client,
                            &at_tx.hash,
                            &req_sig_count,
                            &rw_sig_count,
                            &ro_sig_count,
                            &at_dtx.message.account_keys,
                            &at_block.timestamp)
                                .await;
                        };
                        let at_handle = thread_pool
                            .spawn_with_handle(accounts_task)
                            .unwrap();

                        // Each instruction specifies a single program, a subset of
                        // the transaction's accounts that should be passed to the program,
                        // and a data byte array that is passed to the program. The program
                        // interprets the data array and operates on the accounts specified
                        // by the instructions. The program can return successfully, or
                        // with an error code. An error return causes the entire
                        // transaction to fail immediately.
                        let it_pool = pool.clone();
                        let it_ctx = created_tx.clone();
                        let it_dtx = decoded_tx.clone();
                        let instruction_task = async move {
                            crate::processors::instruction::
                            process_instructions(&it_pool, &it_dtx,
                                                 &it_ctx,
                                                 it_dtx.message.instructions.as_slice(),
                                                 it_dtx.message.account_keys.as_slice())
                                .await;
                        };
                        let it_handle = thread_pool
                            .spawn_with_handle(instruction_task)
                            .unwrap();

                        // Ensure we have the tx meta as well
                        let tmt_pool = pool.clone();
                        let tmt_block = block.clone();
                        let tmt_meta = encoded_tx.meta.clone();
                        let tx_meta_task = async move {
                            if let Some(tx_meta) = tmt_meta {
                                let tm_thread_pool = ThreadPool::new().unwrap();

                                // Process the transaction's meta and validate the various
                                // data structures. Seed the accounts, alternate tx hashes,
                                // logs, balance inputs first
                                // pub log_messages: Option<Vec<String>>,
                                let tlt_pool = tmt_pool.clone();
                                let tlt_txm = tx_meta.clone();
                                let tlt_tx = tx.clone();
                                let transaction_log_task = async move {
                                    if let Some(log_messages) = &tlt_txm.log_messages {
                                        let transaction_logs =
                                            log_messages.into_par_iter().enumerate()
                                                .map(|(idx, log_msg)| {
                                                    InsertableTransactionLog {
                                                        transaction_hash: tlt_tx.hash.clone(),
                                                        data: log_msg.clone(),
                                                        line: idx as i32,
                                                        timestamp: tlt_tx.timestamp.clone()
                                                    }
                                                })
                                                .collect::<Vec<InsertableTransactionLog>>();

                                        let tl_result = crate::actions::transaction_logs::batch_create(
                                            &*tlt_pool.get().unwrap(),
                                            transaction_logs.as_slice()
                                        );

                                        if let Err(err) = tl_result {
                                            eprintln!("[processors/transaction] WARN: Problem pushing \
                                    transaction logs for tx {} due to {}", tlt_tx.hash.clone(),
                                                      err.to_string());
                                        }
                                    }
                                };
                                let tlt_handle = tm_thread_pool
                                    .spawn_with_handle(transaction_log_task)
                                    .unwrap();

                                // Gather and seed all account inputs
                                let ait_pool = tmt_pool.clone();
                                let ait_txm = tx_meta.clone();
                                let ait_tx = tx.clone();
                                let ait_dtx = decoded_tx.clone();
                                let account_inputs_task = async move {
                                    let account_inputs: Vec<InsertableAccountInput> = (0..ait_dtx.message.account_keys.len())
                                        .into_par_iter()
                                        .map(|i| {
                                            let current_account_hash =
                                                ait_dtx.message.account_keys[i].clone().to_string();

                                            InsertableAccountInput {
                                                transaction_hash: ait_tx.hash.clone(),
                                                account: current_account_hash.to_string(),
                                                token_id: "".to_string(),
                                                pre_balance: (ait_txm.pre_balances[i] as i64),
                                                post_balance: Option::from(ait_txm.post_balances[i] as i64),
                                                timestamp: block.timestamp.clone()
                                            }
                                        }).collect();

                                    let result = crate::actions::account_inputs::batch_create(
                                        &*ait_pool.get().unwrap(),
                                        account_inputs);

                                    if let Err(error) = result {
                                        eprintln!("[processors/transaction] FATAL: Problem indexing \
                                    account inputs for tx {} due to {}", ait_tx.hash.clone(),
                                                  error.to_string());
                                    }
                                };
                                let ait_handle = tm_thread_pool
                                    .spawn_with_handle(account_inputs_task)
                                    .unwrap();

                                // If there are token balances for this transaction
                                // pub pre_token_balances: Option<Vec<UiTransactionTokenBalance>>,
                                // pub post_token_balances: Option<Vec<UiTransactionTokenBalance>>,
                                let tai_pool = tmt_pool.clone();
                                let tai_txm = tx_meta.clone();
                                let tai_tx = tx.clone();
                                let tai_block = tmt_block.clone();
                                let tai_dtx = decoded_tx.clone();
                                let token_account_inputs_task = async move {
                                    if let (Some(pre_token_balances),
                                        Some(post_token_balances)) =
                                    (tai_txm.pre_token_balances, tai_txm.post_token_balances)
                                    {
                                        super::account_input::process_token_account_inputs(
                                            &tai_pool,
                                            &tai_tx.hash,
                                            &pre_token_balances,
                                            &post_token_balances,
                                            &tai_dtx.message.account_keys,
                                            &tai_block.timestamp,
                                        ).await;
                                    }
                                };
                                let tai_handle = tm_thread_pool
                                    .spawn_with_handle(token_account_inputs_task)
                                    .unwrap();

                                let iit_pool = tmt_pool.clone();
                                let iit_dtx = decoded_tx.clone();
                                let iit_txm = tx_meta.clone();
                                let iit_tx = tx.clone();
                                let inner_instructions_task = async move {
                                    if let Some(inner_instructions) = iit_txm.inner_instructions {
                                        crate::processors::inner_instruction::process(&iit_pool,
                                                                                      inner_instructions.as_slice(),
                                                                                      &iit_dtx, &iit_tx,
                                                                                      iit_dtx.message.account_keys.as_slice())
                                            .await;
                                    }
                                };
                                let iit_handle = tm_thread_pool
                                    .spawn_with_handle(inner_instructions_task)
                                    .unwrap();

                                let tasks_future = future::join_all(vec![tlt_handle, ait_handle, tai_handle, iit_handle]);
                                tasks_future.await;

                                // Update the tx's metadata and proceed with instructions processing
                                let update_result =
                                    crate::actions::transactions::update(&*tmt_pool.get().unwrap(), &tx);

                                if let Err(update_err) = update_result {
                                        eprintln!(
                                            "[blockchain_syncer] FATAL: Problem updating tx: {}",
                                            update_err
                                        );
                                    }
                            } else {
                                eprintln!(
                                    "[processors/transaction] WARN: tx {} has no metadata!",
                                    tx.hash
                                );
                            }
                        };
                        let tmt_handle = thread_pool
                            .spawn_with_handle(tx_meta_task)
                            .unwrap();

                        let future_batch =
                            future::join_all(vec![at_handle, it_handle, tst_handle, tmt_handle]);

                        future_batch.await;
                    } else {
                        let tx_err = tx_res.err();

                        if let Some(err) = tx_err {
                            eprintln!(
                                "[blockchain_syncer] WARN: Problem pushing tx {} to DB \
                            due to: {}",
                                tx.hash, err
                            )
                        } else {
                            eprintln!(
                                "[blockchain_syncer] FATAL: Problem pushing tx {} to DB \
                            due to an unknown error",
                                tx.hash
                            );
                        }
                    }
                } else {
                    eprintln!(
                        "[blockchain_syncer] FATAL: a transaction in block {} has no hashes!",
                        &block.number
                    );
                }
            } else {
                eprintln!(
                    "[blockchain_syncer] FATAL: Unable to obtain \
                                    account information vec from chain for tx {}",
                    &tx.hash
                );
            }
        })
    })
    .collect();

future::join_all(tasks).await;
4

0 回答 0