From 326c8f66ffcb628fd4941d35198b86161ce0ffda Mon Sep 17 00:00:00 2001 From: Alessandro Decina Date: Sat, 6 Jan 2024 23:23:53 +0000 Subject: [PATCH] rpc: use our custom runtime to spawn blocking tasks Pass the custom runtime to JsonRpcRequestProcessor and use it to spawn blocking tasks from rpc methods. --- rpc/src/rpc.rs | 256 ++++++++++++++++++++++++++--------------- rpc/src/rpc_service.rs | 62 +++++----- 2 files changed, 201 insertions(+), 117 deletions(-) diff --git a/rpc/src/rpc.rs b/rpc/src/rpc.rs index 3a1f5d8debaeea..cb7d9e93d1db39 100644 --- a/rpc/src/rpc.rs +++ b/rpc/src/rpc.rs @@ -116,7 +116,7 @@ use { }, time::Duration, }, - tokio::task, + tokio::runtime::Runtime, }; #[cfg(test)] use { @@ -155,7 +155,7 @@ fn is_finalized( && (blockstore.is_root(slot) || bank.status_cache_ancestors().contains(&slot)) } -#[derive(Debug, Default, Clone)] +#[derive(Debug, Clone)] pub struct JsonRpcConfig { pub enable_rpc_transaction_history: bool, pub enable_extended_tx_metadata_storage: bool, @@ -175,6 +175,28 @@ pub struct JsonRpcConfig { pub disable_health_check: bool, } +impl Default for JsonRpcConfig { + fn default() -> Self { + Self { + enable_rpc_transaction_history: Default::default(), + enable_extended_tx_metadata_storage: Default::default(), + faucet_addr: Option::default(), + health_check_slot_distance: Default::default(), + skip_preflight_health_check: bool::default(), + rpc_bigtable_config: Option::default(), + max_multiple_accounts: Option::default(), + account_indexes: AccountSecondaryIndexes::default(), + rpc_threads: 1, + rpc_blocking_threads: 1, + rpc_niceness_adj: Default::default(), + full_api: Default::default(), + rpc_scan_and_fix_roots: Default::default(), + max_request_body_size: Option::default(), + disable_health_check: Default::default(), + } + } +} + impl JsonRpcConfig { pub fn default_for_test() -> Self { Self { @@ -229,6 +251,7 @@ pub struct JsonRpcRequestProcessor { max_complete_transaction_status_slot: Arc, max_complete_rewards_slot: Arc, prioritization_fee_cache: Arc, + runtime: Arc, } impl Metadata for JsonRpcRequestProcessor {} @@ -335,6 +358,7 @@ impl JsonRpcRequestProcessor { max_complete_transaction_status_slot: Arc, max_complete_rewards_slot: Arc, prioritization_fee_cache: Arc, + runtime: Arc, ) -> (Self, Receiver) { let (transaction_sender, transaction_receiver) = unbounded(); ( @@ -357,6 +381,7 @@ impl JsonRpcRequestProcessor { max_complete_transaction_status_slot, max_complete_rewards_slot, prioritization_fee_cache, + runtime, }, transaction_receiver, ) @@ -368,6 +393,8 @@ impl JsonRpcRequestProcessor { socket_addr_space: SocketAddrSpace, connection_cache: Arc, ) -> Self { + use crate::rpc_service::service_runtime; + let genesis_hash = bank.hash(); let bank_forks = BankForks::new_rw_arc(bank); let bank = bank_forks.read().unwrap().root_bank(); @@ -407,8 +434,15 @@ impl JsonRpcRequestProcessor { let slot = bank.slot(); let optimistically_confirmed_bank = Arc::new(RwLock::new(OptimisticallyConfirmedBank { bank })); + let config = JsonRpcConfig::default(); + let JsonRpcConfig { + rpc_threads, + rpc_blocking_threads, + rpc_niceness_adj, + .. + } = config; Self { - config: JsonRpcConfig::default(), + config, snapshot_config: None, bank_forks, block_commitment_cache: Arc::new(RwLock::new(BlockCommitmentCache::new( @@ -436,6 +470,7 @@ impl JsonRpcRequestProcessor { max_complete_transaction_status_slot: Arc::new(AtomicU64::default()), max_complete_rewards_slot: Arc::new(AtomicU64::default()), prioritization_fee_cache: Arc::new(PrioritizationFeeCache::default()), + runtime: service_runtime(rpc_threads, rpc_blocking_threads, rpc_niceness_adj), } } @@ -456,12 +491,14 @@ impl JsonRpcRequestProcessor { })?; let encoding = encoding.unwrap_or(UiAccountEncoding::Binary); - let response = task::spawn_blocking({ - let bank = Arc::clone(&bank); - move || get_encoded_account(&bank, &pubkey, encoding, data_slice, None) - }) - .await - .expect("rpc: get_encoded_account panicked")?; + let response = self + .runtime + .spawn_blocking({ + let bank = Arc::clone(&bank); + move || get_encoded_account(&bank, &pubkey, encoding, data_slice, None) + }) + .await + .expect("rpc: get_encoded_account panicked")?; Ok(new_response(&bank, response)) } @@ -486,11 +523,12 @@ impl JsonRpcRequestProcessor { for pubkey in pubkeys { let bank = Arc::clone(&bank); accounts.push( - task::spawn_blocking(move || { - get_encoded_account(&bank, &pubkey, encoding, data_slice, None) - }) - .await - .expect("rpc: get_encoded_account panicked")?, + self.runtime + .spawn_blocking(move || { + get_encoded_account(&bank, &pubkey, encoding, data_slice, None) + }) + .await + .expect("rpc: get_encoded_account panicked")?, ); } Ok(new_response(&bank, accounts)) @@ -2112,40 +2150,42 @@ impl JsonRpcRequestProcessor { index_key: program_id.to_string(), }); } - task::spawn_blocking(move || { - bank.get_filtered_indexed_accounts( - &IndexKey::ProgramId(program_id), - |account| { - // The program-id account index checks for Account owner on inclusion. However, due - // to the current AccountsDb implementation, an account may remain in storage as a - // zero-lamport AccountSharedData::Default() after being wiped and reinitialized in later - // updates. We include the redundant filters here to avoid returning these - // accounts. - account.owner() == &program_id && filter_closure(account) - }, - &ScanConfig::new(!sort_results), - bank.byte_limit_for_scans(), - ) - .map_err(|e| RpcCustomError::ScanError { - message: e.to_string(), + self.runtime + .spawn_blocking(move || { + bank.get_filtered_indexed_accounts( + &IndexKey::ProgramId(program_id), + |account| { + // The program-id account index checks for Account owner on inclusion. However, due + // to the current AccountsDb implementation, an account may remain in storage as a + // zero-lamport AccountSharedData::Default() after being wiped and reinitialized in later + // updates. We include the redundant filters here to avoid returning these + // accounts. + account.owner() == &program_id && filter_closure(account) + }, + &ScanConfig::new(!sort_results), + bank.byte_limit_for_scans(), + ) + .map_err(|e| RpcCustomError::ScanError { + message: e.to_string(), + }) }) - }) - .await - .expect("Failed to spawn blocking task") + .await + .expect("Failed to spawn blocking task") } else { // this path does not need to provide a mb limit because we only want to support secondary indexes - task::spawn_blocking(move || { - bank.get_filtered_program_accounts( - &program_id, - filter_closure, - &ScanConfig::new(!sort_results), - ) - .map_err(|e| RpcCustomError::ScanError { - message: e.to_string(), + self.runtime + .spawn_blocking(move || { + bank.get_filtered_program_accounts( + &program_id, + filter_closure, + &ScanConfig::new(!sort_results), + ) + .map_err(|e| RpcCustomError::ScanError { + message: e.to_string(), + }) }) - }) - .await - .expect("Failed to spawn blocking task") + .await + .expect("Failed to spawn blocking task") } } @@ -2181,24 +2221,25 @@ impl JsonRpcRequestProcessor { index_key: owner_key.to_string(), }); } - task::spawn_blocking(move || { - bank.get_filtered_indexed_accounts( - &IndexKey::SplTokenOwner(owner_key), - |account| { - account.owner() == &program_id - && filters - .iter() - .all(|filter_type| filter_allows(filter_type, account)) - }, - &ScanConfig::new(!sort_results), - bank.byte_limit_for_scans(), - ) - .map_err(|e| RpcCustomError::ScanError { - message: e.to_string(), + self.runtime + .spawn_blocking(move || { + bank.get_filtered_indexed_accounts( + &IndexKey::SplTokenOwner(owner_key), + |account| { + account.owner() == &program_id + && filters + .iter() + .all(|filter_type| filter_allows(filter_type, account)) + }, + &ScanConfig::new(!sort_results), + bank.byte_limit_for_scans(), + ) + .map_err(|e| RpcCustomError::ScanError { + message: e.to_string(), + }) }) - }) - .await - .expect("rpc: get_filtered_indexed_account panicked") + .await + .expect("rpc: get_filtered_indexed_account panicked") } else { self.get_filtered_program_accounts(bank, program_id, filters, sort_results) .await @@ -2236,24 +2277,25 @@ impl JsonRpcRequestProcessor { index_key: mint_key.to_string(), }); } - task::spawn_blocking(move || { - bank.get_filtered_indexed_accounts( - &IndexKey::SplTokenMint(mint_key), - |account| { - account.owner() == &program_id - && filters - .iter() - .all(|filter_type| filter_allows(filter_type, account)) - }, - &ScanConfig::new(!sort_results), - bank.byte_limit_for_scans(), - ) - .map_err(|e| RpcCustomError::ScanError { - message: e.to_string(), + self.runtime + .spawn_blocking(move || { + bank.get_filtered_indexed_accounts( + &IndexKey::SplTokenMint(mint_key), + |account| { + account.owner() == &program_id + && filters + .iter() + .all(|filter_type| filter_allows(filter_type, account)) + }, + &ScanConfig::new(!sort_results), + bank.byte_limit_for_scans(), + ) + .map_err(|e| RpcCustomError::ScanError { + message: e.to_string(), + }) }) - }) - .await - .expect("rpc: get_filtered_indexed_account panicked") + .await + .expect("rpc: get_filtered_indexed_account panicked") } else { self.get_filtered_program_accounts(bank, program_id, filters, sort_results) .await @@ -4411,6 +4453,7 @@ pub mod tests { optimistically_confirmed_bank_tracker::{ BankNotification, OptimisticallyConfirmedBankTracker, }, + rpc_service::service_runtime, rpc_subscriptions::RpcSubscriptions, }, bincode::deserialize, @@ -4589,6 +4632,12 @@ pub mod tests { let optimistically_confirmed_bank = OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks); + let JsonRpcConfig { + rpc_threads, + rpc_blocking_threads, + rpc_niceness_adj, + .. + } = config; let meta = JsonRpcRequestProcessor::new( config, None, @@ -4607,6 +4656,7 @@ pub mod tests { max_complete_transaction_status_slot.clone(), max_complete_rewards_slot, Arc::new(PrioritizationFeeCache::default()), + service_runtime(rpc_threads, rpc_blocking_threads, rpc_niceness_adj), ) .0; @@ -5316,8 +5366,8 @@ pub mod tests { assert_eq!(response, expected); } - #[tokio::test] - async fn test_rpc_get_account_info() { + #[test] + fn test_rpc_get_account_info() { let rpc = RpcHandler::start(); let bank = rpc.working_bank(); @@ -5465,8 +5515,8 @@ pub mod tests { assert!(result.is_ok()); } - #[tokio::test] - async fn test_rpc_get_multiple_accounts() { + #[test] + fn test_rpc_get_multiple_accounts() { let rpc = RpcHandler::start(); let bank = rpc.working_bank(); @@ -5579,8 +5629,8 @@ pub mod tests { ); } - #[tokio::test] - async fn test_rpc_get_program_accounts() { + #[test] + fn test_rpc_get_program_accounts() { let rpc = RpcHandler::start(); let bank = rpc.working_bank(); @@ -6555,8 +6605,15 @@ pub mod tests { .my_contact_info() .tpu(connection_cache.protocol()) .unwrap(); + let config = JsonRpcConfig::default(); + let JsonRpcConfig { + rpc_threads, + rpc_blocking_threads, + rpc_niceness_adj, + .. + } = config; let (meta, receiver) = JsonRpcRequestProcessor::new( - JsonRpcConfig::default(), + config, None, bank_forks.clone(), block_commitment_cache, @@ -6573,6 +6630,7 @@ pub mod tests { Arc::new(AtomicU64::default()), Arc::new(AtomicU64::default()), Arc::new(PrioritizationFeeCache::default()), + service_runtime(rpc_threads, rpc_blocking_threads, rpc_niceness_adj), ); let client = ConnectionCacheClient::::new( connection_cache.clone(), @@ -6827,8 +6885,15 @@ pub mod tests { .unwrap(); let optimistically_confirmed_bank = OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks); + let config = JsonRpcConfig::default(); + let JsonRpcConfig { + rpc_threads, + rpc_blocking_threads, + rpc_niceness_adj, + .. + } = config; let (request_processor, receiver) = JsonRpcRequestProcessor::new( - JsonRpcConfig::default(), + config, None, bank_forks.clone(), block_commitment_cache, @@ -6845,6 +6910,7 @@ pub mod tests { Arc::new(AtomicU64::default()), Arc::new(AtomicU64::default()), Arc::new(PrioritizationFeeCache::default()), + service_runtime(rpc_threads, rpc_blocking_threads, rpc_niceness_adj), ); let client = ConnectionCacheClient::::new( connection_cache.clone(), @@ -7542,8 +7608,8 @@ pub mod tests { )); } - #[tokio::test] - async fn test_token_rpcs() { + #[test] + fn test_token_rpcs() { for program_id in solana_account_decoder::parse_token::spl_token_ids() { let rpc = RpcHandler::start(); let bank = rpc.working_bank(); @@ -8037,8 +8103,8 @@ pub mod tests { } } - #[tokio::test] - async fn test_token_parsing() { + #[test] + fn test_token_parsing() { for program_id in solana_account_decoder::parse_token::spl_token_ids() { let rpc = RpcHandler::start(); let bank = rpc.working_bank(); @@ -8476,8 +8542,15 @@ pub mod tests { optimistically_confirmed_bank.clone(), )); + let config = JsonRpcConfig::default(); + let JsonRpcConfig { + rpc_threads, + rpc_blocking_threads, + rpc_niceness_adj, + .. + } = config; let (meta, _receiver) = JsonRpcRequestProcessor::new( - JsonRpcConfig::default(), + config, None, bank_forks.clone(), block_commitment_cache, @@ -8494,6 +8567,7 @@ pub mod tests { max_complete_transaction_status_slot, max_complete_rewards_slot, Arc::new(PrioritizationFeeCache::default()), + service_runtime(rpc_threads, rpc_blocking_threads, rpc_niceness_adj), ); let mut io = MetaIoHandler::default(); diff --git a/rpc/src/rpc_service.rs b/rpc/src/rpc_service.rs index 3dc027fc9ec34f..78dab869ecaee6 100644 --- a/rpc/src/rpc_service.rs +++ b/rpc/src/rpc_service.rs @@ -382,32 +382,7 @@ impl JsonRpcService { .tpu(connection_cache.protocol()) .map_err(|err| format!("{err}"))?; - // The jsonrpc_http_server crate supports two execution models: - // - // - By default, it spawns a number of threads - configured with .threads(N) - and runs a - // single-threaded futures executor in each thread. - // - Alternatively when configured with .event_loop_executor(executor) and .threads(1), - // it executes all the tasks on the given executor, not spawning any extra internal threads. - // - // We use the latter configuration, using a multi threaded tokio runtime as the executor. We - // do this so we can configure the number of worker threads, the number of blocking threads - // and then use tokio::task::spawn_blocking() to avoid blocking the worker threads on CPU - // bound operations like getMultipleAccounts. This results in reduced latency, since fast - // rpc calls (the majority) are not blocked by slow CPU bound ones. - // - // NB: `rpc_blocking_threads` shouldn't be set too high (defaults to num_cpus / 2). Too many - // (busy) blocking threads could compete with CPU time with other validator threads and - // negatively impact performance. - let runtime = Arc::new( - tokio::runtime::Builder::new_multi_thread() - .worker_threads(rpc_threads) - .max_blocking_threads(rpc_blocking_threads) - .on_thread_start(move || renice_this_thread(rpc_niceness_adj).unwrap()) - .thread_name("solRpcEl") - .enable_all() - .build() - .expect("Runtime"), - ); + let runtime = service_runtime(rpc_threads, rpc_blocking_threads, rpc_niceness_adj); let exit_bigtable_ledger_upload_service = Arc::new(AtomicBool::new(false)); @@ -485,6 +460,7 @@ impl JsonRpcService { max_complete_transaction_status_slot, max_complete_rewards_slot, prioritization_fee_cache, + Arc::clone(&runtime), ); let leader_info = @@ -598,6 +574,40 @@ impl JsonRpcService { } } +pub fn service_runtime( + rpc_threads: usize, + rpc_blocking_threads: usize, + rpc_niceness_adj: i8, +) -> Arc { + // The jsonrpc_http_server crate supports two execution models: + // + // - By default, it spawns a number of threads - configured with .threads(N) - and runs a + // single-threaded futures executor in each thread. + // - Alternatively when configured with .event_loop_executor(executor) and .threads(1), + // it executes all the tasks on the given executor, not spawning any extra internal threads. + // + // We use the latter configuration, using a multi threaded tokio runtime as the executor. We + // do this so we can configure the number of worker threads, the number of blocking threads + // and then use tokio::task::spawn_blocking() to avoid blocking the worker threads on CPU + // bound operations like getMultipleAccounts. This results in reduced latency, since fast + // rpc calls (the majority) are not blocked by slow CPU bound ones. + // + // NB: `rpc_blocking_threads` shouldn't be set too high (defaults to num_cpus / 2). Too many + // (busy) blocking threads could compete with CPU time with other validator threads and + // negatively impact performance. + let runtime = Arc::new( + tokio::runtime::Builder::new_multi_thread() + .worker_threads(rpc_threads) + .max_blocking_threads(rpc_blocking_threads) + .on_thread_start(move || renice_this_thread(rpc_niceness_adj).unwrap()) + .thread_name("solRpcEl") + .enable_all() + .build() + .expect("Runtime"), + ); + runtime +} + #[cfg(test)] mod tests { use {