不确定这是否是一个好问题,但我希望除了我的主要问题之外,如果嵌套并行化是否不好,答案也可以涵盖。太感谢了!
假设我有任务 a、b、c 和 d。这些任务每个都有其单独的子任务,a1、a2 等等。
有时,a1 可能还有一堆子任务要完成。任务嵌套在 a、b、c 和 d 中,因为它们依赖于作为一个整体一起处理,因此没有办法将它们解耦并创建任务 e、f 和 g(等等..)。
我目前正在利用 Futures 和 Threadpool以并发、并行的方式处理这些任务(例如 a、b.. 并行运行,a1、a2 并行运行)。
然而,我受到了欢迎
thread 'tokio-runtime-worker' panicked at 'called `Result::unwrap()` on an `Err` value: Os { code: 35, kind: WouldBlock, message: "Resource temporarily unavailable" }', src/my_folder/my_file.rs:123:123
上面失败的代码是
let thread_pool = ThreadPool::new().unwrap();
所以问题是:
- 在这种情况下嵌套并行化是否正确?(如果不是,我该怎么办?如果是,我该怎么办?)
- 如何确保安全地创建线程池,以便在资源不足时不会失败展开?
- 如果 (2) 是错误的做法,我应该如何实现我的代码?
let tasks: Vec<_> = block_transactions.into_iter()
.map(|encoded_tx| {
let pool = core_pool.clone();
let rpc_url = core_rpc_url.clone();
let block = core_block.clone();
tokio::spawn(async move {
let sol_client = RpcClient::new(rpc_url.clone());
let mut tx = crate::models::transactions::Transaction {
hash: "".to_string(),
block: block.number.clone(),
is_confirmed: true,
status: TransactionStatus::Success,
fee: 0,
timestamp: block.timestamp.clone()
};
if let Some(decoded_tx) = (&encoded_tx.transaction).decode() {
// Each item in the signatures array is a digital signature
// of the given message
// Additional safety net
if decoded_tx.signatures.len() >= 1 {
// Signatures should be Vec<Signature>.
// The first one, signatures[0], is the hash that is used to
// identify the transaction (eg. in the explorer), and you can
// get the base58-encoded string using the .to_string() method.
// Seed first signature
tx.hash = decoded_tx.signatures[0].to_string();
// Seed the transaction
let tx_res = crate::actions::transactions::create_or_ignore_transaction(
&*pool.get().unwrap(),
&tx,
);
// Seed everything that depends ONLY on the created transaction
if let Ok(created_tx) = tx_res {
// Spawn the futures threader first
let thread_pool = ThreadPool::new().unwrap();
// Seed subsequents
let tst_pool = pool.clone();
let tst_dtx = decoded_tx.clone();
let tst_tx = tx.clone();
let transaction_signatures_task = async move {
let transaction_signatures: Vec<_> = tst_dtx
.signatures
.as_slice()
.into_par_iter()
.filter(|signature| &signature.to_string() != &tst_tx.hash)
.map(|signature| {
// Subsequent signature are lead to the same tx, just
// signed from a different key pair.
InsertableTransactionSignature {
transaction_hash: tst_tx.hash.clone(),
signature: signature.to_string().clone(),
timestamp: tst_tx.timestamp.clone(),
}
})
.collect();
let cts_result =
crate::actions::transaction_signatures::create_transaction_signatures(
&*tst_pool.get().unwrap(),
transaction_signatures.as_slice(),
);
if let Ok(created_ts) = cts_result {
if created_ts.len() != (tst_dtx.signatures.len() - 1) {
eprintln!("[processors/transaction] WARN: Looks like there's a signature \
creation count mismatch for tx {}", tst_tx.hash.clone());
}
} else {
let cts_result_err = cts_result.err().unwrap();
eprintln!(
"[processors/transaction] FATAL: signature seeding error for \
tx {} due to: {}",
tst_tx.hash.clone(),
cts_result_err.to_string()
);
}
};
let tst_handle = thread_pool
.spawn_with_handle(transaction_signatures_task)
.unwrap();
// A message contains a header,
// The message header contains three unsigned 8-bit values.
// The first value is the number of required signatures in the
// containing transaction. The second value is the number of those
// corresponding account addresses that are read-only.
// The third value in the message header is the number of read-only
// account addresses not requiring signatures.
// identify which are required addresses, addresses requesting write
// access, then address with readonly access
let req_sig_count =
(decoded_tx.message.header.num_required_signatures.clone()) as usize;
let rw_sig_count = (decoded_tx
.message
.header
.num_readonly_signed_accounts
.clone()) as usize;
let ro_sig_count = (decoded_tx
.message
.header
.num_readonly_unsigned_accounts
.clone()) as usize;
// tx.message.account_keys is a compact-array of account addresses,
// The addresses that require signatures appear at the beginning of the
// account address array, with addresses requesting write access first
// and read-only accounts following. The addresses that do not require
// signatures follow the addresses that do, again with read-write
// accounts first and read-only accounts following.
let at_pool = pool.clone();
let at_tx = tx.clone();
let at_dtx = decoded_tx.clone();
let at_block = block.clone();
let accounts_task = async move {
crate::processors::account::process_account_data(
&at_pool,
&sol_client,
&at_tx.hash,
&req_sig_count,
&rw_sig_count,
&ro_sig_count,
&at_dtx.message.account_keys,
&at_block.timestamp)
.await;
};
let at_handle = thread_pool
.spawn_with_handle(accounts_task)
.unwrap();
// Each instruction specifies a single program, a subset of
// the transaction's accounts that should be passed to the program,
// and a data byte array that is passed to the program. The program
// interprets the data array and operates on the accounts specified
// by the instructions. The program can return successfully, or
// with an error code. An error return causes the entire
// transaction to fail immediately.
let it_pool = pool.clone();
let it_ctx = created_tx.clone();
let it_dtx = decoded_tx.clone();
let instruction_task = async move {
crate::processors::instruction::
process_instructions(&it_pool, &it_dtx,
&it_ctx,
it_dtx.message.instructions.as_slice(),
it_dtx.message.account_keys.as_slice())
.await;
};
let it_handle = thread_pool
.spawn_with_handle(instruction_task)
.unwrap();
// Ensure we have the tx meta as well
let tmt_pool = pool.clone();
let tmt_block = block.clone();
let tmt_meta = encoded_tx.meta.clone();
let tx_meta_task = async move {
if let Some(tx_meta) = tmt_meta {
let tm_thread_pool = ThreadPool::new().unwrap();
// Process the transaction's meta and validate the various
// data structures. Seed the accounts, alternate tx hashes,
// logs, balance inputs first
// pub log_messages: Option<Vec<String>>,
let tlt_pool = tmt_pool.clone();
let tlt_txm = tx_meta.clone();
let tlt_tx = tx.clone();
let transaction_log_task = async move {
if let Some(log_messages) = &tlt_txm.log_messages {
let transaction_logs =
log_messages.into_par_iter().enumerate()
.map(|(idx, log_msg)| {
InsertableTransactionLog {
transaction_hash: tlt_tx.hash.clone(),
data: log_msg.clone(),
line: idx as i32,
timestamp: tlt_tx.timestamp.clone()
}
})
.collect::<Vec<InsertableTransactionLog>>();
let tl_result = crate::actions::transaction_logs::batch_create(
&*tlt_pool.get().unwrap(),
transaction_logs.as_slice()
);
if let Err(err) = tl_result {
eprintln!("[processors/transaction] WARN: Problem pushing \
transaction logs for tx {} due to {}", tlt_tx.hash.clone(),
err.to_string());
}
}
};
let tlt_handle = tm_thread_pool
.spawn_with_handle(transaction_log_task)
.unwrap();
// Gather and seed all account inputs
let ait_pool = tmt_pool.clone();
let ait_txm = tx_meta.clone();
let ait_tx = tx.clone();
let ait_dtx = decoded_tx.clone();
let account_inputs_task = async move {
let account_inputs: Vec<InsertableAccountInput> = (0..ait_dtx.message.account_keys.len())
.into_par_iter()
.map(|i| {
let current_account_hash =
ait_dtx.message.account_keys[i].clone().to_string();
InsertableAccountInput {
transaction_hash: ait_tx.hash.clone(),
account: current_account_hash.to_string(),
token_id: "".to_string(),
pre_balance: (ait_txm.pre_balances[i] as i64),
post_balance: Option::from(ait_txm.post_balances[i] as i64),
timestamp: block.timestamp.clone()
}
}).collect();
let result = crate::actions::account_inputs::batch_create(
&*ait_pool.get().unwrap(),
account_inputs);
if let Err(error) = result {
eprintln!("[processors/transaction] FATAL: Problem indexing \
account inputs for tx {} due to {}", ait_tx.hash.clone(),
error.to_string());
}
};
let ait_handle = tm_thread_pool
.spawn_with_handle(account_inputs_task)
.unwrap();
// If there are token balances for this transaction
// pub pre_token_balances: Option<Vec<UiTransactionTokenBalance>>,
// pub post_token_balances: Option<Vec<UiTransactionTokenBalance>>,
let tai_pool = tmt_pool.clone();
let tai_txm = tx_meta.clone();
let tai_tx = tx.clone();
let tai_block = tmt_block.clone();
let tai_dtx = decoded_tx.clone();
let token_account_inputs_task = async move {
if let (Some(pre_token_balances),
Some(post_token_balances)) =
(tai_txm.pre_token_balances, tai_txm.post_token_balances)
{
super::account_input::process_token_account_inputs(
&tai_pool,
&tai_tx.hash,
&pre_token_balances,
&post_token_balances,
&tai_dtx.message.account_keys,
&tai_block.timestamp,
).await;
}
};
let tai_handle = tm_thread_pool
.spawn_with_handle(token_account_inputs_task)
.unwrap();
let iit_pool = tmt_pool.clone();
let iit_dtx = decoded_tx.clone();
let iit_txm = tx_meta.clone();
let iit_tx = tx.clone();
let inner_instructions_task = async move {
if let Some(inner_instructions) = iit_txm.inner_instructions {
crate::processors::inner_instruction::process(&iit_pool,
inner_instructions.as_slice(),
&iit_dtx, &iit_tx,
iit_dtx.message.account_keys.as_slice())
.await;
}
};
let iit_handle = tm_thread_pool
.spawn_with_handle(inner_instructions_task)
.unwrap();
let tasks_future = future::join_all(vec![tlt_handle, ait_handle, tai_handle, iit_handle]);
tasks_future.await;
// Update the tx's metadata and proceed with instructions processing
let update_result =
crate::actions::transactions::update(&*tmt_pool.get().unwrap(), &tx);
if let Err(update_err) = update_result {
eprintln!(
"[blockchain_syncer] FATAL: Problem updating tx: {}",
update_err
);
}
} else {
eprintln!(
"[processors/transaction] WARN: tx {} has no metadata!",
tx.hash
);
}
};
let tmt_handle = thread_pool
.spawn_with_handle(tx_meta_task)
.unwrap();
let future_batch =
future::join_all(vec![at_handle, it_handle, tst_handle, tmt_handle]);
future_batch.await;
} else {
let tx_err = tx_res.err();
if let Some(err) = tx_err {
eprintln!(
"[blockchain_syncer] WARN: Problem pushing tx {} to DB \
due to: {}",
tx.hash, err
)
} else {
eprintln!(
"[blockchain_syncer] FATAL: Problem pushing tx {} to DB \
due to an unknown error",
tx.hash
);
}
}
} else {
eprintln!(
"[blockchain_syncer] FATAL: a transaction in block {} has no hashes!",
&block.number
);
}
} else {
eprintln!(
"[blockchain_syncer] FATAL: Unable to obtain \
account information vec from chain for tx {}",
&tx.hash
);
}
})
})
.collect();
future::join_all(tasks).await;