From a58bf786367aa0500390f3c75ec593053f792b11 Mon Sep 17 00:00:00 2001 From: Liu-Cheng Xu Date: Sat, 7 Sep 2024 19:35:17 +0800 Subject: [PATCH 1/5] Extract `attempt_state_sync()` Pure refactoring, zero logical changes. --- .../network/sync/src/strategy/chain_sync.rs | 136 +++++++++--------- 1 file changed, 70 insertions(+), 66 deletions(-) diff --git a/substrate/client/network/sync/src/strategy/chain_sync.rs b/substrate/client/network/sync/src/strategy/chain_sync.rs index f29ed1b083e8..fb0a2ab962e0 100644 --- a/substrate/client/network/sync/src/strategy/chain_sync.rs +++ b/substrate/client/network/sync/src/strategy/chain_sync.rs @@ -497,7 +497,7 @@ where "💔 New peer {} with unknown genesis hash {} ({}).", peer_id, best_hash, best_number, ); - return Err(BadPeer(peer_id, rep::GENESIS_MISMATCH)) + return Err(BadPeer(peer_id, rep::GENESIS_MISMATCH)); } // If there are more than `MAJOR_SYNC_BLOCKS` in the import queue then we have @@ -521,7 +521,7 @@ where state: PeerSyncState::Available, }, ); - return Ok(None) + return Ok(None); } // If we are at genesis, just start downloading. @@ -644,14 +644,14 @@ where if self.is_known(hash) { debug!(target: LOG_TARGET, "Refusing to sync known hash {hash:?}"); - return + return; } trace!(target: LOG_TARGET, "Downloading requested old fork {hash:?}"); for peer_id in &peers { if let Some(peer) = self.peers.get_mut(peer_id) { if let PeerSyncState::AncestorSearch { .. } = peer.state { - continue + continue; } if number > peer.best_number { @@ -748,14 +748,14 @@ where blocks } else { debug!(target: LOG_TARGET, "Unexpected gap block response from {peer_id}"); - return Err(BadPeer(*peer_id, rep::NO_BLOCK)) + return Err(BadPeer(*peer_id, rep::NO_BLOCK)); } }, PeerSyncState::DownloadingStale(_) => { peer.state = PeerSyncState::Available; if blocks.is_empty() { debug!(target: LOG_TARGET, "Empty block response from {peer_id}"); - return Err(BadPeer(*peer_id, rep::NO_BLOCK)) + return Err(BadPeer(*peer_id, rep::NO_BLOCK)); } validate_blocks::(&blocks, peer_id, Some(request))?; blocks @@ -796,14 +796,14 @@ where target: LOG_TARGET, "Invalid response when searching for ancestor from {peer_id}", ); - return Err(BadPeer(*peer_id, rep::UNKNOWN_ANCESTOR)) + return Err(BadPeer(*peer_id, rep::UNKNOWN_ANCESTOR)); }, (_, Err(e)) => { info!( target: LOG_TARGET, "❌ Error answering legitimate blockchain query: {e}", ); - return Err(BadPeer(*peer_id, rep::BLOCKCHAIN_READ_ERROR)) + return Err(BadPeer(*peer_id, rep::BLOCKCHAIN_READ_ERROR)); }, }; if matching_hash.is_some() { @@ -837,7 +837,7 @@ where target: LOG_TARGET, "Ancestry search: genesis mismatch for peer {peer_id}", ); - return Err(BadPeer(*peer_id, rep::GENESIS_MISMATCH)) + return Err(BadPeer(*peer_id, rep::GENESIS_MISMATCH)); } if let Some((next_state, next_num)) = handle_ancestor_search_state(state, *current, matching_hash.is_some()) @@ -852,7 +852,7 @@ where peer_id: *peer_id, request, }); - return Ok(()) + return Ok(()); } else { // Ancestry search is complete. Check if peer is on a stale fork unknown // to us and add it to sync targets if necessary. @@ -892,7 +892,7 @@ where .insert(*peer_id); } peer.state = PeerSyncState::Available; - return Ok(()) + return Ok(()); } }, PeerSyncState::Available | @@ -925,7 +925,7 @@ where } } else { // We don't know of this peer, so we also did not request anything from it. - return Err(BadPeer(*peer_id, rep::NOT_REQUESTED)) + return Err(BadPeer(*peer_id, rep::NOT_REQUESTED)); }; self.validate_and_queue_blocks(new_blocks, gap); @@ -947,7 +947,7 @@ where target: LOG_TARGET, "💔 Called on_block_justification with a peer ID of an unknown peer", ); - return Ok(()) + return Ok(()); }; self.allowed_requests.add(&peer_id); @@ -964,7 +964,7 @@ where hash, block.hash, ); - return Err(BadPeer(peer_id, rep::BAD_JUSTIFICATION)) + return Err(BadPeer(peer_id, rep::BAD_JUSTIFICATION)); } block @@ -990,7 +990,7 @@ where number, justifications, }); - return Ok(()) + return Ok(()); } } @@ -1014,26 +1014,7 @@ where if let ChainSyncMode::LightState { skip_proofs, .. } = &self.mode { if self.state_sync.is_none() && !self.peers.is_empty() && self.queue_blocks.is_empty() { - // Finalized a recent block. - let mut heads: Vec<_> = self.peers.values().map(|peer| peer.best_number).collect(); - heads.sort(); - let median = heads[heads.len() / 2]; - if number + STATE_SYNC_FINALITY_THRESHOLD.saturated_into() >= median { - if let Ok(Some(header)) = self.client.header(*hash) { - log::debug!( - target: LOG_TARGET, - "Starting state sync for #{number} ({hash})", - ); - self.state_sync = Some(StateSync::new( - self.client.clone(), - header, - None, - None, - *skip_proofs, - )); - self.allowed_requests.set_all(); - } - } + self.attempt_state_sync(*hash, number, *skip_proofs) } } @@ -1045,6 +1026,28 @@ where } } + fn attempt_state_sync( + &mut self, + finalized_hash: B::Hash, + finalized_number: NumberFor, + skip_proofs: bool, + ) { + let mut heads: Vec<_> = self.peers.values().map(|peer| peer.best_number).collect(); + heads.sort(); + let median = heads[heads.len() / 2]; + if finalized_number + STATE_SYNC_FINALITY_THRESHOLD.saturated_into() >= median { + if let Ok(Some(header)) = self.client.header(finalized_hash) { + log::debug!( + target: LOG_TARGET, + "Starting state sync for #{finalized_number} ({finalized_hash})", + ); + self.state_sync = + Some(StateSync::new(self.client.clone(), header, None, None, skip_proofs)); + self.allowed_requests.set_all(); + } + } + } + /// Submit a validated block announcement. /// /// Returns new best hash & best number of the peer if they are updated. @@ -1067,12 +1070,12 @@ where peer } else { error!(target: LOG_TARGET, "💔 Called `on_validated_block_announce` with a bad peer ID {peer_id}"); - return Some((hash, number)) + return Some((hash, number)); }; if let PeerSyncState::AncestorSearch { .. } = peer.state { trace!(target: LOG_TARGET, "Peer {} is in the ancestor search state.", peer_id); - return None + return None; } let peer_info = is_best.then(|| { @@ -1102,7 +1105,7 @@ where if let Some(target) = self.fork_targets.get_mut(&hash) { target.peers.insert(peer_id); } - return peer_info + return peer_info; } if ancient_parent { @@ -1113,7 +1116,7 @@ where hash, announce.header, ); - return peer_info + return peer_info; } if self.status().state == SyncState::Idle { @@ -1281,7 +1284,7 @@ where for (n, peer) in self.peers.iter_mut() { if let PeerSyncState::AncestorSearch { .. } = peer.state { // Wait for ancestry search to complete first. - continue + continue; } let new_common_number = if peer.best_number >= number { number } else { peer.best_number }; @@ -1401,7 +1404,7 @@ where /// What is the status of the block corresponding to the given hash? fn block_status(&self, hash: &B::Hash) -> Result { if self.queue_blocks.contains(hash) { - return Ok(BlockStatus::Queued) + return Ok(BlockStatus::Queued); } self.client.block_status(*hash) } @@ -1521,12 +1524,12 @@ where /// Get block requests scheduled by sync to be sent out. fn block_requests(&mut self) -> Vec<(PeerId, BlockRequest)> { if self.allowed_requests.is_empty() || self.state_sync.is_some() { - return Vec::new() + return Vec::new(); } if self.queue_blocks.len() > MAX_IMPORTING_BLOCKS { trace!(target: LOG_TARGET, "Too many blocks in the queue."); - return Vec::new() + return Vec::new(); } let is_major_syncing = self.status().state.is_major_syncing(); let attrs = self.required_block_attributes(); @@ -1550,7 +1553,7 @@ where !allowed_requests.contains(&id) || !disconnected_peers.is_peer_available(&id) { - return None + return None; } // If our best queued is more than `MAX_BLOCKS_TO_LOOK_BACKWARDS` blocks away from @@ -1648,17 +1651,17 @@ where /// Get a state request scheduled by sync to be sent out (if any). fn state_request(&mut self) -> Option<(PeerId, OpaqueStateRequest)> { if self.allowed_requests.is_empty() { - return None + return None; } if self.state_sync.is_some() && self.peers.iter().any(|(_, peer)| peer.state == PeerSyncState::DownloadingState) { // Only one pending state request is allowed. - return None + return None; } if let Some(sync) = &self.state_sync { if sync.is_complete() { - return None + return None; } for (id, peer) in self.peers.iter_mut() { @@ -1670,7 +1673,7 @@ where let request = sync.next_request(); trace!(target: LOG_TARGET, "New StateRequest for {}: {:?}", id, request); self.allowed_requests.clear(); - return Some((*id, OpaqueStateRequest(Box::new(request)))) + return Some((*id, OpaqueStateRequest(Box::new(request)))); } } } @@ -1709,7 +1712,7 @@ where sync.import(*response) } else { debug!(target: LOG_TARGET, "Ignored obsolete state response from {peer_id}"); - return Err(BadPeer(*peer_id, rep::NOT_REQUESTED)) + return Err(BadPeer(*peer_id, rep::NOT_REQUESTED)); }; match import_result { @@ -1765,16 +1768,17 @@ where } for (result, hash) in results { if has_error { - break + break; } has_error |= result.is_err(); match result { - Ok(BlockImportStatus::ImportedKnown(number, peer_id)) => + Ok(BlockImportStatus::ImportedKnown(number, peer_id)) => { if let Some(peer) = peer_id { self.update_peer_common_number(&peer, number); - }, + } + }, Ok(BlockImportStatus::ImportedUnknown(number, aux, peer_id)) => { if aux.clear_justification_requests { trace!( @@ -1964,7 +1968,7 @@ fn handle_ancestor_search_state( if block_hash_match && next_distance_to_tip == One::one() { // We found the ancestor in the first step so there is no need to execute binary // search. - return None + return None; } if block_hash_match { let left = curr_block_num; @@ -1983,7 +1987,7 @@ fn handle_ancestor_search_state( }, AncestorSearchState::BinarySearch(mut left, mut right) => { if left >= curr_block_num { - return None + return None; } if block_hash_match { left = curr_block_num; @@ -2014,7 +2018,7 @@ fn peer_block_request( ) -> Option<(Range>, BlockRequest)> { if best_num >= peer.best_number { // Will be downloaded as alternative fork instead. - return None + return None; } else if peer.common_number < finalized { trace!( target: LOG_TARGET, @@ -2103,7 +2107,7 @@ fn fork_sync_request( hash, r.number, ); - return false + return false; } if check_block(hash) != BlockStatus::Unknown { trace!( @@ -2112,7 +2116,7 @@ fn fork_sync_request( hash, r.number, ); - return false + return false; } true }); @@ -2121,7 +2125,7 @@ fn fork_sync_request( } for (hash, r) in fork_targets { if !r.peers.contains(&id) { - continue + continue; } // Download the fork only if it is behind or not too far ahead our tip of the chain // Otherwise it should be downloaded in full sync mode. @@ -2148,7 +2152,7 @@ fn fork_sync_request( direction: Direction::Descending, max: Some(count), }, - )) + )); } else { trace!(target: LOG_TARGET, "Fork too far in the future: {:?} (#{})", hash, r.number); } @@ -2167,7 +2171,7 @@ where T: HeaderMetadata + ?Sized, { if base == block { - return Ok(false) + return Ok(false); } let ancestor = sp_blockchain::lowest_common_ancestor(client, *block, *base)?; @@ -2194,7 +2198,7 @@ pub fn validate_blocks( blocks.len(), ); - return Err(BadPeer(*peer_id, rep::NOT_REQUESTED)) + return Err(BadPeer(*peer_id, rep::NOT_REQUESTED)); } let block_header = @@ -2214,7 +2218,7 @@ pub fn validate_blocks( block_header, ); - return Err(BadPeer(*peer_id, rep::NOT_REQUESTED)) + return Err(BadPeer(*peer_id, rep::NOT_REQUESTED)); } if request.fields.contains(BlockAttributes::HEADER) && @@ -2225,7 +2229,7 @@ pub fn validate_blocks( "Missing requested header for a block in response from {peer_id}.", ); - return Err(BadPeer(*peer_id, rep::BAD_RESPONSE)) + return Err(BadPeer(*peer_id, rep::BAD_RESPONSE)); } if request.fields.contains(BlockAttributes::BODY) && blocks.iter().any(|b| b.body.is_none()) @@ -2235,7 +2239,7 @@ pub fn validate_blocks( "Missing requested body for a block in response from {peer_id}.", ); - return Err(BadPeer(*peer_id, rep::BAD_RESPONSE)) + return Err(BadPeer(*peer_id, rep::BAD_RESPONSE)); } } @@ -2250,7 +2254,7 @@ pub fn validate_blocks( b.hash, hash, ); - return Err(BadPeer(*peer_id, rep::BAD_BLOCK)) + return Err(BadPeer(*peer_id, rep::BAD_BLOCK)); } } if let (Some(header), Some(body)) = (&b.header, &b.body) { @@ -2268,7 +2272,7 @@ pub fn validate_blocks( expected, got, ); - return Err(BadPeer(*peer_id, rep::BAD_BLOCK)) + return Err(BadPeer(*peer_id, rep::BAD_BLOCK)); } } } From 3672c3425c05c4729e61e6ce9910c60f69126f4b Mon Sep 17 00:00:00 2001 From: Liu-Cheng Xu Date: Sat, 7 Sep 2024 22:11:01 +0800 Subject: [PATCH 2/5] Add `pending_state_sync_attempt` There is an edge case where the finalized block notification is received, but the conditions required to initiate the state sync are not fully met. In such cases, state sync would fail to start as expected and remain stalled. This fixes it by storing the pending attempt and trying to start it later. --- .../network/sync/src/strategy/chain_sync.rs | 23 +++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/substrate/client/network/sync/src/strategy/chain_sync.rs b/substrate/client/network/sync/src/strategy/chain_sync.rs index fb0a2ab962e0..3d665963c724 100644 --- a/substrate/client/network/sync/src/strategy/chain_sync.rs +++ b/substrate/client/network/sync/src/strategy/chain_sync.rs @@ -254,6 +254,14 @@ pub struct ChainSync { /// A set of hashes of blocks that are being downloaded or have been /// downloaded and are queued for import. queue_blocks: HashSet, + /// A pending attempt to start the state sync. + /// + /// The initiation of state sync may be deferred in cases where other conditions + /// are not yet met when the finalized block notification is received, such as + /// when `queue_blocks` is not empty or there are no peers. This field holds the + /// necessary information to attempt the state sync at a later point when + /// conditions are satisfied. + pending_state_sync_attempt: Option<(B::Hash, NumberFor, bool)>, /// Fork sync targets. fork_targets: HashMap>, /// A set of peers for which there might be potential block requests @@ -376,6 +384,7 @@ where extra_justifications: ExtraRequests::new("justification", metrics_registry), mode, queue_blocks: Default::default(), + pending_state_sync_attempt: None, fork_targets: Default::default(), allowed_requests: Default::default(), max_parallel_downloads, @@ -1013,8 +1022,12 @@ where }); if let ChainSyncMode::LightState { skip_proofs, .. } = &self.mode { - if self.state_sync.is_none() && !self.peers.is_empty() && self.queue_blocks.is_empty() { - self.attempt_state_sync(*hash, number, *skip_proofs) + if self.state_sync.is_none() { + if !self.peers.is_empty() && self.queue_blocks.is_empty() { + self.attempt_state_sync(*hash, number, *skip_proofs); + } else { + self.pending_state_sync_attempt.replace((*hash, number, *skip_proofs)); + } } } @@ -1886,6 +1899,12 @@ where /// Get pending actions to perform. #[must_use] pub fn actions(&mut self) -> impl Iterator> { + if !self.peers.is_empty() && self.queue_blocks.is_empty() { + if let Some((hash, number, skip_proofs)) = self.pending_state_sync_attempt.take() { + self.attempt_state_sync(hash, number, skip_proofs); + } + } + let block_requests = self .block_requests() .into_iter() From 4f03545d6318c2ac60f3699f89bf672040ded8c1 Mon Sep 17 00:00:00 2001 From: Liu-Cheng Xu Date: Sat, 7 Sep 2024 23:27:59 +0800 Subject: [PATCH 3/5] Add prdoc --- prdoc/pr_5635.prdoc | 13 +++++++++++++ 1 file changed, 13 insertions(+) create mode 100644 prdoc/pr_5635.prdoc diff --git a/prdoc/pr_5635.prdoc b/prdoc/pr_5635.prdoc new file mode 100644 index 000000000000..168d65970c95 --- /dev/null +++ b/prdoc/pr_5635.prdoc @@ -0,0 +1,13 @@ +title: Fix edge case where state sync is not triggered + +doc: + - audience: Node Dev + description: | + There is an edge case where the finalized block notification is received, but the conditions required to initiate the + state sync are not fully met. In such cases, state sync would fail to start as expected and remain stalled. + This patch addresses it by storing the pending attempt and trying to start the state sync later when the conditions + are satisfied. + +crates: + - name: sc-network-sync + bump: patch From 6d5cd14dfffcbe7dd1ade7b7b8e2e4889090589a Mon Sep 17 00:00:00 2001 From: Liu-Cheng Xu Date: Mon, 9 Sep 2024 17:17:05 +0800 Subject: [PATCH 4/5] Update substrate/client/network/sync/src/strategy/chain_sync.rs Co-authored-by: Dmitry Markin --- substrate/client/network/sync/src/strategy/chain_sync.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/substrate/client/network/sync/src/strategy/chain_sync.rs b/substrate/client/network/sync/src/strategy/chain_sync.rs index 3d665963c724..aa84d9a3f20b 100644 --- a/substrate/client/network/sync/src/strategy/chain_sync.rs +++ b/substrate/client/network/sync/src/strategy/chain_sync.rs @@ -1057,6 +1057,13 @@ where self.state_sync = Some(StateSync::new(self.client.clone(), header, None, None, skip_proofs)); self.allowed_requests.set_all(); + } else { + log::error!( + target: LOG_TARGET, + "Failed to start state sync: header for finalized block \ + #{finalized_number} ({finalized_hash}) is not available", + ); + debug_assert!(false); } } } From 03f6e42b00ca3e6ede927d785d88e16fd455b131 Mon Sep 17 00:00:00 2001 From: Liu-Cheng Xu Date: Tue, 10 Sep 2024 16:11:01 +0800 Subject: [PATCH 5/5] FMT --- substrate/client/network/sync/src/strategy/chain_sync.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/substrate/client/network/sync/src/strategy/chain_sync.rs b/substrate/client/network/sync/src/strategy/chain_sync.rs index aa84d9a3f20b..cca83a5055cb 100644 --- a/substrate/client/network/sync/src/strategy/chain_sync.rs +++ b/substrate/client/network/sync/src/strategy/chain_sync.rs @@ -1061,7 +1061,7 @@ where log::error!( target: LOG_TARGET, "Failed to start state sync: header for finalized block \ - #{finalized_number} ({finalized_hash}) is not available", + #{finalized_number} ({finalized_hash}) is not available", ); debug_assert!(false); }