diff --git a/client/cli/src/params/network_params.rs b/client/cli/src/params/network_params.rs index 106fba75aa727..aaa669fb3ba6c 100644 --- a/client/cli/src/params/network_params.rs +++ b/client/cli/src/params/network_params.rs @@ -146,6 +146,13 @@ pub struct NetworkParams { verbatim_doc_comment )] pub sync: SyncMode, + + /// Maximum number of blocks per request. + /// + /// Try reducing this number from the default value if you have a slow network connection + /// and observe block requests timing out. + #[arg(long, value_name = "COUNT", default_value_t = 64)] + pub max_blocks_per_request: u32, } impl NetworkParams { @@ -235,6 +242,7 @@ impl NetworkParams { allow_private_ip, }, max_parallel_downloads: self.max_parallel_downloads, + max_blocks_per_request: self.max_blocks_per_request, enable_dht_random_walk: !self.reserved_only, allow_non_globals_in_dht, kademlia_disjoint_query_paths: self.kademlia_disjoint_query_paths, diff --git a/client/network/src/config.rs b/client/network/src/config.rs index 781ae9c786694..e00bfac79f650 100644 --- a/client/network/src/config.rs +++ b/client/network/src/config.rs @@ -566,6 +566,7 @@ pub struct NetworkConfiguration { /// List of request-response protocols that the node supports. pub request_response_protocols: Vec, + /// Configuration for the default set of nodes used for block syncing and transactions. pub default_peers_set: SetConfig, @@ -590,6 +591,9 @@ pub struct NetworkConfiguration { /// Maximum number of peers to ask the same blocks in parallel. pub max_parallel_downloads: u32, + /// Maximum number of blocks per request. + pub max_blocks_per_request: u32, + /// Initial syncing mode. pub sync_mode: SyncMode, @@ -653,6 +657,7 @@ impl NetworkConfiguration { node_name: node_name.into(), transport: TransportConfig::Normal { enable_mdns: false, allow_private_ip: true }, max_parallel_downloads: 5, + max_blocks_per_request: 64, sync_mode: SyncMode::Full, enable_dht_random_walk: true, allow_non_globals_in_dht: false, diff --git a/client/network/sync/src/block_request_handler.rs b/client/network/sync/src/block_request_handler.rs index 921efd7def622..ece565aad4b09 100644 --- a/client/network/sync/src/block_request_handler.rs +++ b/client/network/sync/src/block_request_handler.rs @@ -17,7 +17,10 @@ //! Helper for handling (i.e. answering) block requests from a remote peer via the //! `crate::request_responses::RequestResponsesBehaviour`. -use crate::schema::v1::{block_request::FromBlock, BlockResponse, Direction}; +use crate::{ + schema::v1::{block_request::FromBlock, BlockResponse, Direction}, + MAX_BLOCKS_IN_RESPONSE, +}; use codec::{Decode, Encode}; use futures::{ @@ -50,7 +53,6 @@ use std::{ }; const LOG_TARGET: &str = "sync"; -const MAX_BLOCKS_IN_RESPONSE: usize = 128; const MAX_BODY_BYTES: usize = 8 * 1024 * 1024; const MAX_NUMBER_OF_SAME_REQUESTS_PER_PEER: usize = 2; diff --git a/client/network/sync/src/blocks.rs b/client/network/sync/src/blocks.rs index 4cc1ca080d177..3c76238be1b5f 100644 --- a/client/network/sync/src/blocks.rs +++ b/client/network/sync/src/blocks.rs @@ -109,7 +109,7 @@ impl BlockCollection { pub fn needed_blocks( &mut self, who: PeerId, - count: usize, + count: u32, peer_best: NumberFor, common: NumberFor, max_parallel: u32, diff --git a/client/network/sync/src/engine.rs b/client/network/sync/src/engine.rs index e3d45a980a0b4..6fb618a571c25 100644 --- a/client/network/sync/src/engine.rs +++ b/client/network/sync/src/engine.rs @@ -264,6 +264,14 @@ where SyncOperationMode::Warp => SyncMode::Warp, }; let max_parallel_downloads = network_config.max_parallel_downloads; + let max_blocks_per_request = if network_config.max_blocks_per_request > + crate::MAX_BLOCKS_IN_RESPONSE as u32 + { + log::info!(target: "sync", "clamping maximum blocks per request to {}", crate::MAX_BLOCKS_IN_RESPONSE); + crate::MAX_BLOCKS_IN_RESPONSE as u32 + } else { + network_config.max_blocks_per_request + }; let cache_capacity = NonZeroUsize::new( (network_config.default_peers_set.in_peers as usize + network_config.default_peers_set.out_peers as usize) @@ -318,6 +326,7 @@ where roles, block_announce_validator, max_parallel_downloads, + max_blocks_per_request, warp_sync_params, metrics_registry, network_service.clone(), diff --git a/client/network/sync/src/lib.rs b/client/network/sync/src/lib.rs index 28959e7f9c886..e112a1715ced1 100644 --- a/client/network/sync/src/lib.rs +++ b/client/network/sync/src/lib.rs @@ -107,9 +107,6 @@ pub mod state_request_handler; pub mod warp; pub mod warp_request_handler; -/// Maximum blocks to request in a single packet. -const MAX_BLOCKS_TO_REQUEST: usize = 64; - /// Maximum blocks to store in the import queue. const MAX_IMPORTING_BLOCKS: usize = 2048; @@ -147,6 +144,9 @@ const MIN_PEERS_TO_START_WARP_SYNC: usize = 3; /// Maximum allowed size for a block announce. const MAX_BLOCK_ANNOUNCE_SIZE: u64 = 1024 * 1024; +/// Maximum blocks per response. +pub(crate) const MAX_BLOCKS_IN_RESPONSE: usize = 128; + mod rep { use sc_peerset::ReputationChange as Rep; /// Reputation change when a peer sent us a message that led to a @@ -311,6 +311,8 @@ pub struct ChainSync { block_announce_validator: Box + Send>, /// Maximum number of peers to ask the same blocks in parallel. max_parallel_downloads: u32, + /// Maximum blocks per request. + max_blocks_per_request: u32, /// Total number of downloaded blocks. downloaded_blocks: usize, /// All block announcement that are currently being validated. @@ -1403,6 +1405,7 @@ where roles: Roles, block_announce_validator: Box + Send>, max_parallel_downloads: u32, + max_blocks_per_request: u32, warp_sync_params: Option>, metrics_registry: Option<&Registry>, network_service: service::network::NetworkServiceHandle, @@ -1437,6 +1440,7 @@ where allowed_requests: Default::default(), block_announce_validator, max_parallel_downloads, + max_blocks_per_request, downloaded_blocks: 0, block_announce_validation: Default::default(), block_announce_validation_per_peer_stats: Default::default(), @@ -2365,6 +2369,7 @@ where let queue = &self.queue_blocks; let allowed_requests = self.allowed_requests.take(); let max_parallel = if is_major_syncing { 1 } else { self.max_parallel_downloads }; + let max_blocks_per_request = self.max_blocks_per_request; let gap_sync = &mut self.gap_sync; self.peers .iter_mut() @@ -2404,6 +2409,7 @@ where blocks, attrs, max_parallel, + max_blocks_per_request, last_finalized, best_queued, ) { @@ -2430,6 +2436,7 @@ where client.block_status(*hash).unwrap_or(BlockStatus::Unknown) } }, + max_blocks_per_request, ) { trace!(target: "sync", "Downloading fork {:?} from {}", hash, id); peer.state = PeerSyncState::DownloadingStale(hash); @@ -2442,6 +2449,7 @@ where attrs, sync.target, sync.best_queued_number, + max_blocks_per_request, ) }) { peer.state = PeerSyncState::DownloadingGap(range.start); @@ -2910,6 +2918,7 @@ fn peer_block_request( blocks: &mut BlockCollection, attrs: BlockAttributes, max_parallel_downloads: u32, + max_blocks_per_request: u32, finalized: NumberFor, best_num: NumberFor, ) -> Option<(Range>, BlockRequest)> { @@ -2925,7 +2934,7 @@ fn peer_block_request( } let range = blocks.needed_blocks( *id, - MAX_BLOCKS_TO_REQUEST, + max_blocks_per_request, peer.best_number, peer.common_number, max_parallel_downloads, @@ -2960,10 +2969,11 @@ fn peer_gap_block_request( attrs: BlockAttributes, target: NumberFor, common_number: NumberFor, + max_blocks_per_request: u32, ) -> Option<(Range>, BlockRequest)> { let range = blocks.needed_blocks( *id, - MAX_BLOCKS_TO_REQUEST, + max_blocks_per_request, std::cmp::min(peer.best_number, target), common_number, 1, @@ -2992,6 +3002,7 @@ fn fork_sync_request( finalized: NumberFor, attributes: BlockAttributes, check_block: impl Fn(&B::Hash) -> BlockStatus, + max_blocks_per_request: u32, ) -> Option<(B::Hash, BlockRequest)> { targets.retain(|hash, r| { if r.number <= finalized { @@ -3011,7 +3022,7 @@ fn fork_sync_request( // Download the fork only if it is behind or not too far ahead our tip of the chain // Otherwise it should be downloaded in full sync mode. if r.number <= best_num || - (r.number - best_num).saturated_into::() < MAX_BLOCKS_TO_REQUEST as u32 + (r.number - best_num).saturated_into::() < max_blocks_per_request as u32 { let parent_status = r.parent_hash.as_ref().map_or(BlockStatus::Unknown, check_block); let count = if parent_status == BlockStatus::Unknown { @@ -3199,6 +3210,7 @@ mod test { Roles::from(&Role::Full), block_announce_validator, 1, + 64, None, None, chain_sync_network_handle, @@ -3265,6 +3277,7 @@ mod test { Roles::from(&Role::Full), Box::new(DefaultBlockAnnounceValidator), 1, + 64, None, None, chain_sync_network_handle, @@ -3446,6 +3459,7 @@ mod test { Roles::from(&Role::Full), Box::new(DefaultBlockAnnounceValidator), 5, + 64, None, None, chain_sync_network_handle, @@ -3572,6 +3586,7 @@ mod test { Roles::from(&Role::Full), Box::new(DefaultBlockAnnounceValidator), 5, + 64, None, None, chain_sync_network_handle, @@ -3586,6 +3601,7 @@ mod test { let peer_id2 = PeerId::random(); let best_block = blocks.last().unwrap().clone(); + let max_blocks_to_request = sync.max_blocks_per_request; // Connect the node we will sync from sync.new_peer(peer_id1, best_block.hash(), *best_block.header().number()) .unwrap(); @@ -3595,8 +3611,8 @@ mod test { while best_block_num < MAX_DOWNLOAD_AHEAD { let request = get_block_request( &mut sync, - FromBlock::Number(MAX_BLOCKS_TO_REQUEST as u64 + best_block_num as u64), - MAX_BLOCKS_TO_REQUEST as u32, + FromBlock::Number(max_blocks_to_request as u64 + best_block_num as u64), + max_blocks_to_request as u32, &peer_id1, ); @@ -3610,14 +3626,14 @@ mod test { let res = sync.on_block_data(&peer_id1, Some(request), response).unwrap(); assert!(matches!( res, - OnBlockData::Import(_, blocks) if blocks.len() == MAX_BLOCKS_TO_REQUEST + OnBlockData::Import(_, blocks) if blocks.len() == max_blocks_to_request as usize ),); - best_block_num += MAX_BLOCKS_TO_REQUEST as u32; + best_block_num += max_blocks_to_request as u32; let _ = sync.on_blocks_processed( - MAX_BLOCKS_TO_REQUEST as usize, - MAX_BLOCKS_TO_REQUEST as usize, + max_blocks_to_request as usize, + max_blocks_to_request as usize, resp_blocks .iter() .rev() @@ -3675,8 +3691,8 @@ mod test { // peer 2 as well. get_block_request( &mut sync, - FromBlock::Number(peer1_from + MAX_BLOCKS_TO_REQUEST as u64), - MAX_BLOCKS_TO_REQUEST as u32, + FromBlock::Number(peer1_from + max_blocks_to_request as u64), + max_blocks_to_request as u32, &peer_id2, ); } @@ -3728,6 +3744,7 @@ mod test { Roles::from(&Role::Full), Box::new(DefaultBlockAnnounceValidator), 5, + 64, None, None, chain_sync_network_handle, @@ -3773,11 +3790,12 @@ mod test { // Now request and import the fork. let mut best_block_num = *finalized_block.header().number() as u32; + let max_blocks_to_request = sync.max_blocks_per_request; while best_block_num < *fork_blocks.last().unwrap().header().number() as u32 - 1 { let request = get_block_request( &mut sync, - FromBlock::Number(MAX_BLOCKS_TO_REQUEST as u64 + best_block_num as u64), - MAX_BLOCKS_TO_REQUEST as u32, + FromBlock::Number(max_blocks_to_request as u64 + best_block_num as u64), + max_blocks_to_request as u32, &peer_id1, ); @@ -3791,14 +3809,14 @@ mod test { let res = sync.on_block_data(&peer_id1, Some(request), response).unwrap(); assert!(matches!( res, - OnBlockData::Import(_, blocks) if blocks.len() == MAX_BLOCKS_TO_REQUEST + OnBlockData::Import(_, blocks) if blocks.len() == sync.max_blocks_per_request as usize ),); - best_block_num += MAX_BLOCKS_TO_REQUEST as u32; + best_block_num += sync.max_blocks_per_request as u32; let _ = sync.on_blocks_processed( - MAX_BLOCKS_TO_REQUEST as usize, - MAX_BLOCKS_TO_REQUEST as usize, + max_blocks_to_request as usize, + max_blocks_to_request as usize, resp_blocks .iter() .rev() @@ -3869,6 +3887,7 @@ mod test { Roles::from(&Role::Full), Box::new(DefaultBlockAnnounceValidator), 5, + 64, None, None, chain_sync_network_handle, @@ -3914,10 +3933,12 @@ mod test { // Now request and import the fork. let mut best_block_num = *finalized_block.header().number() as u32; + let max_blocks_to_request = sync.max_blocks_per_request; + let mut request = get_block_request( &mut sync, - FromBlock::Number(MAX_BLOCKS_TO_REQUEST as u64 + best_block_num as u64), - MAX_BLOCKS_TO_REQUEST as u32, + FromBlock::Number(max_blocks_to_request as u64 + best_block_num as u64), + max_blocks_to_request as u32, &peer_id1, ); let last_block_num = *fork_blocks.last().unwrap().header().number() as u32 - 1; @@ -3932,18 +3953,18 @@ mod test { let res = sync.on_block_data(&peer_id1, Some(request.clone()), response).unwrap(); assert!(matches!( res, - OnBlockData::Import(_, blocks) if blocks.len() == MAX_BLOCKS_TO_REQUEST + OnBlockData::Import(_, blocks) if blocks.len() == max_blocks_to_request as usize ),); - best_block_num += MAX_BLOCKS_TO_REQUEST as u32; + best_block_num += max_blocks_to_request as u32; if best_block_num < last_block_num { // make sure we're not getting a duplicate request in the time before the blocks are // processed request = get_block_request( &mut sync, - FromBlock::Number(MAX_BLOCKS_TO_REQUEST as u64 + best_block_num as u64), - MAX_BLOCKS_TO_REQUEST as u32, + FromBlock::Number(max_blocks_to_request as u64 + best_block_num as u64), + max_blocks_to_request as u32, &peer_id1, ); } @@ -3965,16 +3986,17 @@ mod test { // The import queue may send notifications in batches of varying size. So we simulate // this here by splitting the batch into 2 notifications. + let max_blocks_to_request = sync.max_blocks_per_request; let second_batch = notify_imported.split_off(notify_imported.len() / 2); let _ = sync.on_blocks_processed( - MAX_BLOCKS_TO_REQUEST as usize, - MAX_BLOCKS_TO_REQUEST as usize, + max_blocks_to_request as usize, + max_blocks_to_request as usize, notify_imported, ); let _ = sync.on_blocks_processed( - MAX_BLOCKS_TO_REQUEST as usize, - MAX_BLOCKS_TO_REQUEST as usize, + max_blocks_to_request as usize, + max_blocks_to_request as usize, second_batch, ); @@ -4010,6 +4032,7 @@ mod test { Roles::from(&Role::Full), Box::new(DefaultBlockAnnounceValidator), 1, + 64, None, None, chain_sync_network_handle, @@ -4055,6 +4078,7 @@ mod test { Roles::from(&Role::Full), Box::new(DefaultBlockAnnounceValidator), 1, + 64, None, None, chain_sync_network_handle,