diff --git a/Cargo.lock b/Cargo.lock index 5d82d317b84..8519f2cbf2c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1673,11 +1673,14 @@ name = "cumulus-client-consensus-common" version = "0.1.0" dependencies = [ "async-trait", + "cumulus-client-pov-recovery", + "cumulus-primitives-core", "cumulus-relay-chain-interface", "cumulus-test-client", "dyn-clone", "futures", "futures-timer", + "log", "parity-scale-codec", "polkadot-primitives", "sc-client-api", @@ -1786,6 +1789,7 @@ dependencies = [ "cumulus-relay-chain-inprocess-interface", "cumulus-relay-chain-interface", "cumulus-relay-chain-minimal-node", + "futures", "parking_lot 0.12.1", "polkadot-primitives", "sc-client-api", diff --git a/client/consensus/aura/src/import_queue.rs b/client/consensus/aura/src/import_queue.rs index 5cc7e9e0659..862abfb349a 100644 --- a/client/consensus/aura/src/import_queue.rs +++ b/client/consensus/aura/src/import_queue.rs @@ -17,7 +17,7 @@ //! Parachain specific wrapper for the AuRa import queue. use codec::Codec; -use cumulus_client_consensus_common::ParachainBlockImport; +use cumulus_client_consensus_common::ParachainBlockImportMarker; use sc_client_api::{backend::AuxStore, BlockOf, UsageProvider}; use sc_consensus::{import_queue::DefaultImportQueue, BlockImport}; use sc_consensus_aura::{AuraVerifier, CompatibilityMode}; @@ -37,7 +37,7 @@ use substrate_prometheus_endpoint::Registry; /// Parameters for [`import_queue`]. pub struct ImportQueueParams<'a, I, C, CIDP, S> { /// The block import to use. - pub block_import: ParachainBlockImport, + pub block_import: I, /// The client to interact with the chain. pub client: Arc, /// The inherent data providers, to create the inherent data. @@ -73,6 +73,7 @@ where + UsageProvider + HeaderBackend, I: BlockImport> + + ParachainBlockImportMarker + Send + Sync + 'static, diff --git a/client/consensus/aura/src/lib.rs b/client/consensus/aura/src/lib.rs index 93b475f6c74..965f8fe3baa 100644 --- a/client/consensus/aura/src/lib.rs +++ b/client/consensus/aura/src/lib.rs @@ -24,7 +24,7 @@ use codec::{Decode, Encode}; use cumulus_client_consensus_common::{ - ParachainBlockImport, ParachainCandidate, ParachainConsensus, + ParachainBlockImportMarker, ParachainCandidate, ParachainConsensus, }; use cumulus_primitives_core::{relay_chain::v2::Hash as PHash, PersistedValidationData}; @@ -75,7 +75,7 @@ impl Clone for AuraConsensus { pub struct BuildAuraConsensusParams { pub proposer_factory: PF, pub create_inherent_data_providers: CIDP, - pub block_import: ParachainBlockImport, + pub block_import: BI, pub para_client: Arc, pub backoff_authoring_blocks: Option, pub sync_oracle: SO, @@ -114,7 +114,11 @@ where Client: ProvideRuntimeApi + BlockOf + AuxStore + HeaderBackend + Send + Sync + 'static, Client::Api: AuraApi, - BI: BlockImport> + Send + Sync + 'static, + BI: BlockImport> + + ParachainBlockImportMarker + + Send + + Sync + + 'static, SO: SyncOracle + Send + Sync + Clone + 'static, BS: BackoffAuthoringBlocksStrategy> + Send + Sync + 'static, PF: Environment + Send + Sync + 'static, diff --git a/client/consensus/common/Cargo.toml b/client/consensus/common/Cargo.toml index 84b73f0fef5..c06e28c8d05 100644 --- a/client/consensus/common/Cargo.toml +++ b/client/consensus/common/Cargo.toml @@ -10,6 +10,7 @@ async-trait = "0.1.59" codec = { package = "parity-scale-codec", version = "3.0.0", features = [ "derive" ] } dyn-clone = "1.0.10" futures = "0.3.25" +log = "0.4.17" tracing = "0.1.37" # Substrate @@ -24,7 +25,9 @@ sp-trie = { git = "https://github.com/paritytech/substrate", branch = "master" } polkadot-primitives = { git = "https://github.com/paritytech/polkadot", branch = "master" } # Cumulus +cumulus-primitives-core = { path = "../../../primitives/core" } cumulus-relay-chain-interface = { path = "../../relay-chain-interface" } +cumulus-client-pov-recovery = { path = "../../pov-recovery" } [dev-dependencies] futures-timer = "3.0.2" diff --git a/client/consensus/common/src/level_monitor.rs b/client/consensus/common/src/level_monitor.rs new file mode 100644 index 00000000000..294527f1f9f --- /dev/null +++ b/client/consensus/common/src/level_monitor.rs @@ -0,0 +1,378 @@ +// Copyright 2022 Parity Technologies (UK) Ltd. +// This file is part of Cumulus. + +// Cumulus is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Cumulus is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Cumulus. If not, see . + +use sc_client_api::{blockchain::Backend as _, Backend, HeaderBackend as _}; +use sp_blockchain::{HashAndNumber, TreeRoute}; +use sp_runtime::traits::{Block as BlockT, NumberFor, One, Saturating, UniqueSaturatedInto, Zero}; +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, +}; + +/// Value good enough to be used with parachains using the current backend implementation +/// that ships with Substrate. This value may change in the future. +pub const MAX_LEAVES_PER_LEVEL_SENSIBLE_DEFAULT: usize = 32; + +// Counter threshold after which we are going to eventually cleanup our internal data. +const CLEANUP_THRESHOLD: u32 = 32; + +/// Upper bound to the number of leaves allowed for each level of the blockchain. +/// +/// If the limit is set and more leaves are detected on block import, then the older ones are +/// dropped to make space for the fresh blocks. +/// +/// In environments where blocks confirmations from the relay chain may be "slow", then +/// setting an upper bound helps keeping the chain health by dropping old (presumably) stale +/// leaves and prevents discarding new blocks because we've reached the backend max value. +pub enum LevelLimit { + /// Limit set to [`MAX_LEAVES_PER_LEVEL_SENSIBLE_DEFAULT`]. + Default, + /// No explicit limit, however a limit may be implicitly imposed by the backend implementation. + None, + /// Custom value. + Some(usize), +} + +/// Support structure to constrain the number of leaves at each level. +pub struct LevelMonitor { + // Max number of leaves for each level. + level_limit: usize, + // Monotonic counter used to keep track of block freshness. + pub(crate) import_counter: NumberFor, + // Map between blocks hashes and freshness. + pub(crate) freshness: HashMap>, + // Blockchain levels cache. + pub(crate) levels: HashMap, HashSet>, + // Lower level number stored by the levels map. + lowest_level: NumberFor, + // Backend reference to remove blocks on level saturation. + backend: Arc, +} + +/// Contains information about the target scheduled for removal. +struct TargetInfo { + /// Index of freshest leaf in the leaves array. + freshest_leaf_idx: usize, + /// Route from target to its freshest leaf. + freshest_route: TreeRoute, +} + +impl LevelMonitor +where + Block: BlockT, + BE: Backend, +{ + /// Instance a new monitor structure. + pub fn new(level_limit: usize, backend: Arc) -> Self { + let mut monitor = LevelMonitor { + level_limit, + import_counter: Zero::zero(), + freshness: HashMap::new(), + levels: HashMap::new(), + lowest_level: Zero::zero(), + backend, + }; + monitor.restore(); + monitor + } + + /// Restore the structure using the backend. + /// + /// Blocks freshness values are inferred from the height and not from the effective import + /// moment. This is a not accurate but "good-enough" best effort solution. + /// + /// Level limits are not enforced during this phase. + fn restore(&mut self) { + let info = self.backend.blockchain().info(); + log::debug!( + target: "parachain", + "Restoring chain level monitor from last finalized block: {} {}", + info.finalized_number, info.finalized_hash + ); + + self.lowest_level = info.finalized_number; + self.import_counter = info.finalized_number; + self.block_imported(info.finalized_number, info.finalized_hash); + + let mut counter_max = info.finalized_number; + + for leaf in self.backend.blockchain().leaves().unwrap_or_default() { + let route = + sp_blockchain::tree_route(self.backend.blockchain(), info.finalized_hash, leaf) + .expect("Route from finalized to leaf should be available; qed"); + if !route.retracted().is_empty() { + continue + } + route.enacted().iter().for_each(|elem| { + if !self.freshness.contains_key(&elem.hash) { + // Use the block height value as the freshness. + self.import_counter = elem.number; + self.block_imported(elem.number, elem.hash); + } + }); + counter_max = std::cmp::max(self.import_counter, counter_max); + } + + log::debug!(target: "parachain", "Restored chain level monitor up to height {}", counter_max); + + self.import_counter = counter_max; + } + + /// Check and enforce the limit bound at the given height. + /// + /// In practice this will enforce the given height in having a number of blocks less than + /// the limit passed to the constructor. + /// + /// If the given level is found to have a number of blocks greater than or equal the limit + /// then the limit is enforced by chosing one (or more) blocks to remove. + /// + /// The removal strategy is driven by the block freshness. + /// + /// A block freshness is determined by the most recent leaf freshness descending from the block + /// itself. In other words its freshness is equal to its more "fresh" descendant. + /// + /// The least "fresh" blocks are eventually removed. + pub fn enforce_limit(&mut self, number: NumberFor) { + let level_len = self.levels.get(&number).map(|l| l.len()).unwrap_or_default(); + if level_len < self.level_limit { + return + } + + // Sort leaves by freshness only once (less fresh first) and keep track of + // leaves that were invalidated on removal. + let mut leaves = self.backend.blockchain().leaves().unwrap_or_default(); + leaves.sort_unstable_by(|a, b| self.freshness.get(a).cmp(&self.freshness.get(b))); + let mut invalidated_leaves = HashSet::new(); + + // This may not be the most efficient way to remove **multiple** entries, but is the easy + // one :-). Should be considered that in "normal" conditions the number of blocks to remove + // is 0 or 1, it is not worth to complicate the code too much. One condition that may + // trigger multiple removals (2+) is if we restart the node using an existing db and a + // smaller limit wrt the one previously used. + let remove_count = level_len - self.level_limit + 1; + + log::debug!( + target: "parachain", + "Detected leaves overflow at height {number}, removing {remove_count} obsolete blocks", + ); + + (0..remove_count).all(|_| { + self.find_target(number, &leaves, &invalidated_leaves).map_or(false, |target| { + self.remove_target(target, number, &leaves, &mut invalidated_leaves); + true + }) + }); + } + + // Helper function to find the best candidate to be removed. + // + // Given a set of blocks with height equal to `number` (potential candidates) + // 1. For each candidate fetch all the leaves that are descending from it. + // 2. Set the candidate freshness equal to the fresher of its descending leaves. + // 3. The target is set as the candidate that is less fresh. + // + // Input `leaves` are assumed to be already ordered by "freshness" (less fresh first). + // + // Returns the index of the target fresher leaf within `leaves` and the route from target to + // such leaf. + fn find_target( + &self, + number: NumberFor, + leaves: &[Block::Hash], + invalidated_leaves: &HashSet, + ) -> Option> { + let mut target_info: Option> = None; + let blockchain = self.backend.blockchain(); + let best_hash = blockchain.info().best_hash; + + // Leaves that where already assigned to some node and thus can be skipped + // during the search. + let mut assigned_leaves = HashSet::new(); + + let level = self.levels.get(&number)?; + + for blk_hash in level.iter().filter(|hash| **hash != best_hash) { + // Search for the fresher leaf information for this block + let candidate_info = leaves + .iter() + .enumerate() + .filter(|(leaf_idx, _)| { + !assigned_leaves.contains(leaf_idx) && !invalidated_leaves.contains(leaf_idx) + }) + .rev() + .find_map(|(leaf_idx, leaf_hash)| { + if blk_hash == leaf_hash { + let entry = HashAndNumber { number, hash: *blk_hash }; + TreeRoute::new(vec![entry], 0).ok().map(|freshest_route| TargetInfo { + freshest_leaf_idx: leaf_idx, + freshest_route, + }) + } else { + match sp_blockchain::tree_route(blockchain, *blk_hash, *leaf_hash) { + Ok(route) if route.retracted().is_empty() => Some(TargetInfo { + freshest_leaf_idx: leaf_idx, + freshest_route: route, + }), + Err(err) => { + log::warn!( + target: "parachain", + "(Lookup) Unable getting route from {:?} to {:?}: {}", + blk_hash, leaf_hash, err, + ); + None + }, + _ => None, + } + } + }); + + let candidate_info = match candidate_info { + Some(candidate_info) => { + assigned_leaves.insert(candidate_info.freshest_leaf_idx); + candidate_info + }, + None => { + // This should never happen + log::error!( + target: "parachain", + "Unable getting route to any leaf from {:?} (this is a bug)", + blk_hash, + ); + continue + }, + }; + + // Found fresher leaf for this candidate. + // This candidate is set as the new target if: + // 1. its fresher leaf is less fresh than the previous target fresher leaf AND + // 2. best block is not in its route + + let is_less_fresh = || { + target_info + .as_ref() + .map(|ti| candidate_info.freshest_leaf_idx < ti.freshest_leaf_idx) + .unwrap_or(true) + }; + let not_contains_best = || { + candidate_info + .freshest_route + .enacted() + .iter() + .all(|entry| entry.hash != best_hash) + }; + + if is_less_fresh() && not_contains_best() { + let early_stop = candidate_info.freshest_leaf_idx == 0; + target_info = Some(candidate_info); + if early_stop { + // We will never find a candidate with an worst freshest leaf than this. + break + } + } + } + + target_info + } + + // Remove the target block and all its descendants. + // + // Leaves should have already been ordered by "freshness" (less fresh first). + fn remove_target( + &mut self, + target: TargetInfo, + number: NumberFor, + leaves: &[Block::Hash], + invalidated_leaves: &mut HashSet, + ) { + let mut remove_leaf = |number, hash| { + log::debug!(target: "parachain", "Removing block (@{}) {:?}", number, hash); + if let Err(err) = self.backend.remove_leaf_block(hash) { + log::debug!(target: "parachain", "Remove not possible for {}: {}", hash, err); + return false + } + self.levels.get_mut(&number).map(|level| level.remove(&hash)); + self.freshness.remove(&hash); + true + }; + + invalidated_leaves.insert(target.freshest_leaf_idx); + + // Takes care of route removal. Starts from the leaf and stops as soon as an error is + // encountered. In this case an error is interpreted as the block being not a leaf + // and it will be removed while removing another route from the same block but to a + // different leaf. + let mut remove_route = |route: TreeRoute| { + route.enacted().iter().rev().all(|elem| remove_leaf(elem.number, elem.hash)); + }; + + let target_hash = target.freshest_route.common_block().hash; + debug_assert_eq!( + target.freshest_route.common_block().number, + number, + "This is a bug in LevelMonitor::find_target() or the Backend is corrupted" + ); + + // Remove freshest (cached) route first. + remove_route(target.freshest_route); + + // Don't bother trying with leaves we already found to not be our descendants. + let to_skip = leaves.len() - target.freshest_leaf_idx; + leaves.iter().enumerate().rev().skip(to_skip).for_each(|(leaf_idx, leaf_hash)| { + if invalidated_leaves.contains(&leaf_idx) { + return + } + match sp_blockchain::tree_route(self.backend.blockchain(), target_hash, *leaf_hash) { + Ok(route) if route.retracted().is_empty() => { + invalidated_leaves.insert(leaf_idx); + remove_route(route); + }, + Err(err) => { + log::warn!( + target: "parachain", + "(Removal) unable getting route from {:?} to {:?}: {}", + target_hash, leaf_hash, err, + ); + }, + _ => (), + }; + }); + + remove_leaf(number, target_hash); + } + + /// Add a new imported block information to the monitor. + pub fn block_imported(&mut self, number: NumberFor, hash: Block::Hash) { + self.freshness.insert(hash, self.import_counter); + self.levels.entry(number).or_default().insert(hash); + self.import_counter += One::one(); + + // Do cleanup once in a while, we are allowed to have some obsolete information. + let finalized_num = self.backend.blockchain().info().finalized_number; + let delta: u32 = finalized_num.saturating_sub(self.lowest_level).unique_saturated_into(); + if delta >= CLEANUP_THRESHOLD { + for i in 0..delta { + let number = self.lowest_level + i.unique_saturated_into(); + self.levels.remove(&number).map(|level| { + level.iter().for_each(|hash| { + self.freshness.remove(hash); + }) + }); + } + + self.lowest_level = finalized_num; + } + } +} diff --git a/client/consensus/common/src/lib.rs b/client/consensus/common/src/lib.rs index d5d33585439..39119f345c2 100644 --- a/client/consensus/common/src/lib.rs +++ b/client/consensus/common/src/lib.rs @@ -15,14 +15,23 @@ // along with Cumulus. If not, see . use polkadot_primitives::v2::{Hash as PHash, PersistedValidationData}; -use sc_consensus::BlockImport; -use sp_runtime::traits::Block as BlockT; +use sc_client_api::Backend; +use sc_consensus::{shared_data::SharedData, BlockImport, ImportResult}; +use sp_runtime::traits::{Block as BlockT, Header as HeaderT}; + +use std::sync::Arc; + +mod level_monitor; mod parachain_consensus; #[cfg(test)] mod tests; + pub use parachain_consensus::run_parachain_consensus; +use level_monitor::LevelMonitor; +pub use level_monitor::{LevelLimit, MAX_LEAVES_PER_LEVEL_SENSIBLE_DEFAULT}; + /// The result of [`ParachainConsensus::produce_candidate`]. pub struct ParachainCandidate { /// The block that was built for this candidate. @@ -74,47 +83,93 @@ impl ParachainConsensus for Box + Send + /// This is used to set `block_import_params.fork_choice` to `false` as long as the block origin is /// not `NetworkInitialSync`. The best block for parachains is determined by the relay chain. Meaning /// we will update the best block, as it is included by the relay-chain. -pub struct ParachainBlockImport(I); +pub struct ParachainBlockImport { + inner: BI, + monitor: Option>>, +} -impl ParachainBlockImport { +impl> ParachainBlockImport { /// Create a new instance. - pub fn new(inner: I) -> Self { - Self(inner) + /// + /// The number of leaves per level limit is set to `LevelLimit::Default`. + pub fn new(inner: BI, backend: Arc) -> Self { + Self::new_with_limit(inner, backend, LevelLimit::Default) + } + + /// Create a new instance with an explicit limit to the number of leaves per level. + /// + /// This function alone doesn't enforce the limit on levels for old imported blocks, + /// the limit is eventually enforced only when new blocks are imported. + pub fn new_with_limit(inner: BI, backend: Arc, level_leaves_max: LevelLimit) -> Self { + let level_limit = match level_leaves_max { + LevelLimit::None => None, + LevelLimit::Some(limit) => Some(limit), + LevelLimit::Default => Some(MAX_LEAVES_PER_LEVEL_SENSIBLE_DEFAULT), + }; + + let monitor = + level_limit.map(|level_limit| SharedData::new(LevelMonitor::new(level_limit, backend))); + + Self { inner, monitor } } } -impl Clone for ParachainBlockImport { +impl Clone for ParachainBlockImport { fn clone(&self) -> Self { - ParachainBlockImport(self.0.clone()) + ParachainBlockImport { inner: self.inner.clone(), monitor: self.monitor.clone() } } } #[async_trait::async_trait] -impl BlockImport for ParachainBlockImport +impl BlockImport for ParachainBlockImport where Block: BlockT, - I: BlockImport + Send, + BI: BlockImport + Send, + BE: Backend, { - type Error = I::Error; - type Transaction = I::Transaction; + type Error = BI::Error; + type Transaction = BI::Transaction; async fn check_block( &mut self, block: sc_consensus::BlockCheckParams, ) -> Result { - self.0.check_block(block).await + self.inner.check_block(block).await } async fn import_block( &mut self, - mut block_import_params: sc_consensus::BlockImportParams, + mut params: sc_consensus::BlockImportParams, cache: std::collections::HashMap>, ) -> Result { + // Blocks are stored within the backend by using POST hash. + let hash = params.post_hash(); + let number = *params.header.number(); + // Best block is determined by the relay chain, or if we are doing the initial sync // we import all blocks as new best. - block_import_params.fork_choice = Some(sc_consensus::ForkChoiceStrategy::Custom( - block_import_params.origin == sp_consensus::BlockOrigin::NetworkInitialSync, + params.fork_choice = Some(sc_consensus::ForkChoiceStrategy::Custom( + params.origin == sp_consensus::BlockOrigin::NetworkInitialSync, )); - self.0.import_block(block_import_params, cache).await + + let maybe_lock = self.monitor.as_ref().map(|monitor_lock| { + let mut monitor = monitor_lock.shared_data_locked(); + monitor.enforce_limit(number); + monitor.release_mutex() + }); + + let res = self.inner.import_block(params, cache).await?; + + if let (Some(mut monitor_lock), ImportResult::Imported(_)) = (maybe_lock, &res) { + let mut monitor = monitor_lock.upgrade(); + monitor.block_imported(number, hash); + } + + Ok(res) } } + +/// Marker trait denoting a block import type that fits the parachain requirements. +pub trait ParachainBlockImportMarker {} + +impl ParachainBlockImportMarker for ParachainBlockImport {} diff --git a/client/consensus/common/src/parachain_consensus.rs b/client/consensus/common/src/parachain_consensus.rs index 860eb552c87..ffbbab5a200 100644 --- a/client/consensus/common/src/parachain_consensus.rs +++ b/client/consensus/common/src/parachain_consensus.rs @@ -15,7 +15,6 @@ // along with Cumulus. If not, see . use async_trait::async_trait; -use cumulus_relay_chain_interface::{RelayChainInterface, RelayChainResult}; use sc_client_api::{ Backend, BlockBackend, BlockImportNotification, BlockchainEvents, Finalizer, UsageProvider, }; @@ -27,15 +26,25 @@ use sp_runtime::{ traits::{Block as BlockT, Header as HeaderT}, }; +use cumulus_client_pov_recovery::{RecoveryDelay, RecoveryKind, RecoveryRequest}; +use cumulus_relay_chain_interface::{RelayChainInterface, RelayChainResult}; + use polkadot_primitives::v2::{Hash as PHash, Id as ParaId, OccupiedCoreAssumption}; use codec::Decode; -use futures::{select, FutureExt, Stream, StreamExt}; +use futures::{channel::mpsc::Sender, select, FutureExt, Stream, StreamExt}; -use std::{pin::Pin, sync::Arc}; +use std::{pin::Pin, sync::Arc, time::Duration}; const LOG_TARGET: &str = "cumulus-consensus"; +// Delay range to trigger explicit requests. +// The chosen value doesn't have any special meaning, a random delay within the order of +// seconds in practice should be a good enough to allow a quick recovery without DOSing +// the relay chain. +const RECOVERY_DELAY: RecoveryDelay = + RecoveryDelay { min: Duration::ZERO, max: Duration::from_secs(30) }; + /// Helper for the relay chain client. This is expected to be a lightweight handle like an `Arc`. #[async_trait] pub trait RelaychainClient: Clone + 'static { @@ -82,7 +91,7 @@ where let finalized_head = if let Some(h) = finalized_heads.next().await { h } else { - tracing::debug!(target: "cumulus-consensus", "Stopping following finalized head."); + tracing::debug!(target: LOG_TARGET, "Stopping following finalized head."); return }; @@ -90,7 +99,7 @@ where Ok(header) => header, Err(err) => { tracing::debug!( - target: "cumulus-consensus", + target: LOG_TARGET, error = ?err, "Could not decode parachain header while following finalized heads.", ); @@ -105,12 +114,12 @@ where if let Err(e) = parachain.finalize_block(hash, None, true) { match e { ClientError::UnknownBlock(_) => tracing::debug!( - target: "cumulus-consensus", + target: LOG_TARGET, block_hash = ?hash, "Could not finalize block because it is unknown.", ), _ => tracing::warn!( - target: "cumulus-consensus", + target: LOG_TARGET, error = ?e, block_hash = ?hash, "Failed to finalize block", @@ -136,6 +145,7 @@ pub async fn run_parachain_consensus( parachain: Arc

, relay_chain: R, announce_block: Arc>) + Send + Sync>, + recovery_chan_tx: Option>>, ) where Block: BlockT, P: Finalizer @@ -148,8 +158,13 @@ pub async fn run_parachain_consensus( R: RelaychainClient, B: Backend, { - let follow_new_best = - follow_new_best(para_id, parachain.clone(), relay_chain.clone(), announce_block); + let follow_new_best = follow_new_best( + para_id, + parachain.clone(), + relay_chain.clone(), + announce_block, + recovery_chan_tx, + ); let follow_finalized_head = follow_finalized_head(para_id, parachain, relay_chain); select! { _ = follow_new_best.fuse() => {}, @@ -163,6 +178,7 @@ async fn follow_new_best( parachain: Arc

, relay_chain: R, announce_block: Arc>) + Send + Sync>, + recovery_chan_tx: Option>>, ) where Block: BlockT, P: Finalizer @@ -197,10 +213,11 @@ async fn follow_new_best( h, &*parachain, &mut unset_best_header, + recovery_chan_tx.clone(), ).await, None => { tracing::debug!( - target: "cumulus-consensus", + target: LOG_TARGET, "Stopping following new best.", ); return @@ -217,7 +234,7 @@ async fn follow_new_best( ).await, None => { tracing::debug!( - target: "cumulus-consensus", + target: LOG_TARGET, "Stopping following imported blocks.", ); return @@ -276,7 +293,7 @@ async fn handle_new_block_imported( import_block_as_new_best(unset_hash, unset_best_header, parachain).await; }, state => tracing::debug!( - target: "cumulus-consensus", + target: LOG_TARGET, ?unset_best_header, ?notification.header, ?state, @@ -290,6 +307,7 @@ async fn handle_new_best_parachain_head( head: Vec, parachain: &P, unset_best_header: &mut Option, + mut recovery_chan_tx: Option>>, ) where Block: BlockT, P: UsageProvider + Send + Sync + BlockBackend, @@ -299,7 +317,7 @@ async fn handle_new_best_parachain_head( Ok(header) => header, Err(err) => { tracing::debug!( - target: "cumulus-consensus", + target: LOG_TARGET, error = ?err, "Could not decode Parachain header while following best heads.", ); @@ -311,7 +329,7 @@ async fn handle_new_best_parachain_head( if parachain.usage_info().chain.best_hash == hash { tracing::debug!( - target: "cumulus-consensus", + target: LOG_TARGET, block_hash = ?hash, "Skipping set new best block, because block is already the best.", ) @@ -325,7 +343,7 @@ async fn handle_new_best_parachain_head( }, Ok(BlockStatus::InChainPruned) => { tracing::error!( - target: "cumulus-collator", + target: LOG_TARGET, block_hash = ?hash, "Trying to set pruned block as new best!", ); @@ -334,14 +352,30 @@ async fn handle_new_best_parachain_head( *unset_best_header = Some(parachain_head); tracing::debug!( - target: "cumulus-collator", + target: LOG_TARGET, block_hash = ?hash, "Parachain block not yet imported, waiting for import to enact as best block.", ); + + if let Some(ref mut recovery_chan_tx) = recovery_chan_tx { + // Best effort channel to actively encourage block recovery. + // An error here is not fatal; the relay chain continuously re-announces + // the best block, thus we will have other opportunities to retry. + let req = + RecoveryRequest { hash, delay: RECOVERY_DELAY, kind: RecoveryKind::Full }; + if let Err(err) = recovery_chan_tx.try_send(req) { + tracing::warn!( + target: LOG_TARGET, + block_hash = ?hash, + error = ?err, + "Unable to notify block recovery subsystem" + ) + } + } }, Err(e) => { tracing::error!( - target: "cumulus-collator", + target: LOG_TARGET, block_hash = ?hash, error = ?e, "Failed to get block status of block.", @@ -361,7 +395,7 @@ where let best_number = parachain.usage_info().chain.best_number; if *header.number() < best_number { tracing::debug!( - target: "cumulus-consensus", + target: LOG_TARGET, %best_number, block_number = %header.number(), "Skipping importing block as new best block, because there already exists a \ @@ -377,7 +411,7 @@ where if let Err(err) = (&*parachain).import_block(block_import_params, Default::default()).await { tracing::warn!( - target: "cumulus-consensus", + target: LOG_TARGET, block_hash = ?hash, error = ?err, "Failed to set new best block.", diff --git a/client/consensus/common/src/tests.rs b/client/consensus/common/src/tests.rs index 23729abebb4..92cecc37d29 100644 --- a/client/consensus/common/src/tests.rs +++ b/client/consensus/common/src/tests.rs @@ -18,6 +18,7 @@ use crate::*; use async_trait::async_trait; use codec::Encode; +use cumulus_client_pov_recovery::RecoveryKind; use cumulus_relay_chain_interface::RelayChainResult; use cumulus_test_client::{ runtime::{Block, Header}, @@ -26,10 +27,10 @@ use cumulus_test_client::{ use futures::{channel::mpsc, executor::block_on, select, FutureExt, Stream, StreamExt}; use futures_timer::Delay; use polkadot_primitives::v2::Id as ParaId; -use sc_client_api::UsageProvider; +use sc_client_api::{blockchain::Backend as _, Backend as _, UsageProvider}; use sc_consensus::{BlockImport, BlockImportParams, ForkChoiceStrategy}; use sp_blockchain::Error as ClientError; -use sp_consensus::BlockOrigin; +use sp_consensus::{BlockOrigin, BlockStatus}; use sp_runtime::generic::BlockId; use std::{ sync::{Arc, Mutex}, @@ -103,21 +104,82 @@ impl crate::parachain_consensus::RelaychainClient for Relaychain { } } -fn build_and_import_block(mut client: Arc, import_as_best: bool) -> Block { - let builder = client.init_block_builder(None, Default::default()); +fn build_block( + builder: &B, + at: Option>, + timestamp: Option, +) -> Block { + let builder = match at { + Some(at) => match timestamp { + Some(ts) => + builder.init_block_builder_with_timestamp(&at, None, Default::default(), ts), + None => builder.init_block_builder_at(&at, None, Default::default()), + }, + None => builder.init_block_builder(None, Default::default()), + }; + + let mut block = builder.build().unwrap().block; - let block = builder.build().unwrap().block; - let (header, body) = block.clone().deconstruct(); + // Simulate some form of post activity (like a Seal or Other generic things). + // This is mostly used to excercise the `LevelMonitor` correct behavior. + // (in practice we want that header post-hash != pre-hash) + block.header.digest.push(sp_runtime::DigestItem::Other(vec![1, 2, 3])); - let mut block_import_params = BlockImportParams::new(BlockOrigin::Own, header); + block +} + +async fn import_block>( + importer: &mut I, + block: Block, + origin: BlockOrigin, + import_as_best: bool, +) { + let (mut header, body) = block.deconstruct(); + + let post_digest = + header.digest.pop().expect("post digested is present in manually crafted block"); + + let mut block_import_params = BlockImportParams::new(origin, header); block_import_params.fork_choice = Some(ForkChoiceStrategy::Custom(import_as_best)); block_import_params.body = Some(body); + block_import_params.post_digests.push(post_digest); - block_on(client.import_block(block_import_params, Default::default())).unwrap(); + importer.import_block(block_import_params, Default::default()).await.unwrap(); +} +fn import_block_sync>( + importer: &mut I, + block: Block, + origin: BlockOrigin, + import_as_best: bool, +) { + block_on(import_block(importer, block, origin, import_as_best)); +} + +fn build_and_import_block_ext>( + builder: &B, + origin: BlockOrigin, + import_as_best: bool, + importer: &mut I, + at: Option>, + timestamp: Option, +) -> Block { + let block = build_block(builder, at, timestamp); + import_block_sync(importer, block.clone(), origin, import_as_best); block } +fn build_and_import_block(mut client: Arc, import_as_best: bool) -> Block { + build_and_import_block_ext( + &*client.clone(), + BlockOrigin::Own, + import_as_best, + &mut client, + None, + None, + ) +} + #[test] fn follow_new_best_works() { sp_tracing::try_init_simple(); @@ -129,7 +191,7 @@ fn follow_new_best_works() { let new_best_heads_sender = relay_chain.inner.lock().unwrap().new_best_heads_sender.clone(); let consensus = - run_parachain_consensus(100.into(), client.clone(), relay_chain, Arc::new(|_, _| {})); + run_parachain_consensus(100.into(), client.clone(), relay_chain, Arc::new(|_, _| {}), None); let work = async move { new_best_heads_sender.unbounded_send(block.header().clone()).unwrap(); @@ -152,6 +214,68 @@ fn follow_new_best_works() { }); } +#[test] +fn follow_new_best_with_dummy_recovery_works() { + sp_tracing::try_init_simple(); + + let client = Arc::new(TestClientBuilder::default().build()); + + let relay_chain = Relaychain::new(); + let new_best_heads_sender = relay_chain.inner.lock().unwrap().new_best_heads_sender.clone(); + + let (recovery_chan_tx, mut recovery_chan_rx) = futures::channel::mpsc::channel(3); + + let consensus = run_parachain_consensus( + 100.into(), + client.clone(), + relay_chain, + Arc::new(|_, _| {}), + Some(recovery_chan_tx), + ); + + let block = build_block(&*client.clone(), None, None); + let block_clone = block.clone(); + let client_clone = client.clone(); + + let work = async move { + new_best_heads_sender.unbounded_send(block.header().clone()).unwrap(); + loop { + Delay::new(Duration::from_millis(100)).await; + match client.block_status(&BlockId::Hash(block.hash())).unwrap() { + BlockStatus::Unknown => {}, + status => { + assert_eq!(block.hash(), client.usage_info().chain.best_hash); + assert_eq!(status, BlockStatus::InChainWithState); + break + }, + } + } + }; + + let dummy_block_recovery = async move { + loop { + if let Some(req) = recovery_chan_rx.next().await { + assert_eq!(req.hash, block_clone.hash()); + assert_eq!(req.kind, RecoveryKind::Full); + Delay::new(Duration::from_millis(500)).await; + import_block(&mut &*client_clone, block_clone.clone(), BlockOrigin::Own, true) + .await; + } + } + }; + + block_on(async move { + futures::pin_mut!(consensus); + futures::pin_mut!(work); + + select! { + r = consensus.fuse() => panic!("Consensus should not end: {:?}", r), + _ = dummy_block_recovery.fuse() => {}, + _ = work.fuse() => {}, + } + }); +} + #[test] fn follow_finalized_works() { sp_tracing::try_init_simple(); @@ -163,7 +287,7 @@ fn follow_finalized_works() { let finalized_sender = relay_chain.inner.lock().unwrap().finalized_heads_sender.clone(); let consensus = - run_parachain_consensus(100.into(), client.clone(), relay_chain, Arc::new(|_, _| {})); + run_parachain_consensus(100.into(), client.clone(), relay_chain, Arc::new(|_, _| {}), None); let work = async move { finalized_sender.unbounded_send(block.header().clone()).unwrap(); @@ -204,7 +328,7 @@ fn follow_finalized_does_not_stop_on_unknown_block() { let finalized_sender = relay_chain.inner.lock().unwrap().finalized_heads_sender.clone(); let consensus = - run_parachain_consensus(100.into(), client.clone(), relay_chain, Arc::new(|_, _| {})); + run_parachain_consensus(100.into(), client.clone(), relay_chain, Arc::new(|_, _| {}), None); let work = async move { for _ in 0..3usize { @@ -254,7 +378,7 @@ fn follow_new_best_sets_best_after_it_is_imported() { let new_best_heads_sender = relay_chain.inner.lock().unwrap().new_best_heads_sender.clone(); let consensus = - run_parachain_consensus(100.into(), client.clone(), relay_chain, Arc::new(|_, _| {})); + run_parachain_consensus(100.into(), client.clone(), relay_chain, Arc::new(|_, _| {}), None); let work = async move { new_best_heads_sender.unbounded_send(block.header().clone()).unwrap(); @@ -331,7 +455,7 @@ fn do_not_set_best_block_to_older_block() { let new_best_heads_sender = relay_chain.inner.lock().unwrap().new_best_heads_sender.clone(); let consensus = - run_parachain_consensus(100.into(), client.clone(), relay_chain, Arc::new(|_, _| {})); + run_parachain_consensus(100.into(), client.clone(), relay_chain, Arc::new(|_, _| {}), None); let client2 = client.clone(); let work = async move { @@ -355,3 +479,216 @@ fn do_not_set_best_block_to_older_block() { // Build and import a new best block. build_and_import_block(client2.clone(), true); } + +#[test] +fn prune_blocks_on_level_overflow() { + // Here we are using the timestamp value to generate blocks with different hashes. + const LEVEL_LIMIT: usize = 3; + const TIMESTAMP_MULTIPLIER: u64 = 60000; + + let backend = Arc::new(Backend::new_test(1000, 3)); + let client = Arc::new(TestClientBuilder::with_backend(backend.clone()).build()); + let mut para_import = ParachainBlockImport::new_with_limit( + client.clone(), + backend.clone(), + LevelLimit::Some(LEVEL_LIMIT), + ); + + let block0 = build_and_import_block_ext( + &*client, + BlockOrigin::NetworkInitialSync, + true, + &mut para_import, + None, + None, + ); + let id0 = BlockId::Hash(block0.header.hash()); + + let blocks1 = (0..LEVEL_LIMIT) + .into_iter() + .map(|i| { + build_and_import_block_ext( + &*client, + if i == 1 { BlockOrigin::NetworkInitialSync } else { BlockOrigin::Own }, + i == 1, + &mut para_import, + Some(id0), + Some(i as u64 * TIMESTAMP_MULTIPLIER), + ) + }) + .collect::>(); + let id10 = BlockId::Hash(blocks1[0].header.hash()); + + let blocks2 = (0..2) + .into_iter() + .map(|i| { + build_and_import_block_ext( + &*client, + BlockOrigin::Own, + false, + &mut para_import, + Some(id10), + Some(i as u64 * TIMESTAMP_MULTIPLIER), + ) + }) + .collect::>(); + + // Initial scenario (with B11 imported as best) + // + // B0 --+-- B10 --+-- B20 + // +-- B11 +-- B21 + // +-- B12 + + let leaves = backend.blockchain().leaves().unwrap(); + let mut expected = vec![ + blocks2[0].header.hash(), + blocks2[1].header.hash(), + blocks1[1].header.hash(), + blocks1[2].header.hash(), + ]; + assert_eq!(leaves, expected); + let best = client.usage_info().chain.best_hash; + assert_eq!(best, blocks1[1].header.hash()); + + let block13 = build_and_import_block_ext( + &*client, + BlockOrigin::Own, + false, + &mut para_import, + Some(id0), + Some(LEVEL_LIMIT as u64 * TIMESTAMP_MULTIPLIER), + ); + + // Expected scenario + // + // B0 --+-- B10 --+-- B20 + // +-- B11 +-- B21 + // +--(B13) <-- B12 has been replaced + + let leaves = backend.blockchain().leaves().unwrap(); + expected[3] = block13.header.hash(); + assert_eq!(leaves, expected); + + let block14 = build_and_import_block_ext( + &*client, + BlockOrigin::Own, + false, + &mut para_import, + Some(id0), + Some(2 * LEVEL_LIMIT as u64 * TIMESTAMP_MULTIPLIER), + ); + + // Expected scenario + // + // B0 --+--(B14) <-- B10 has been replaced + // +-- B11 + // +--(B13) + + let leaves = backend.blockchain().leaves().unwrap(); + expected.remove(0); + expected.remove(0); + expected.push(block14.header.hash()); + assert_eq!(leaves, expected); +} + +#[test] +fn restore_limit_monitor() { + // Here we are using the timestamp value to generate blocks with different hashes. + const LEVEL_LIMIT: usize = 2; + const TIMESTAMP_MULTIPLIER: u64 = 60000; + + let backend = Arc::new(Backend::new_test(1000, 3)); + let client = Arc::new(TestClientBuilder::with_backend(backend.clone()).build()); + + // Start with a block import not enforcing any limit... + let mut para_import = ParachainBlockImport::new_with_limit( + client.clone(), + backend.clone(), + LevelLimit::Some(usize::MAX), + ); + + let block00 = build_and_import_block_ext( + &*client, + BlockOrigin::NetworkInitialSync, + true, + &mut para_import, + None, + None, + ); + let id00 = BlockId::Hash(block00.header.hash()); + + let blocks1 = (0..LEVEL_LIMIT + 1) + .into_iter() + .map(|i| { + build_and_import_block_ext( + &*client, + if i == 1 { BlockOrigin::NetworkInitialSync } else { BlockOrigin::Own }, + i == 1, + &mut para_import, + Some(id00), + Some(i as u64 * TIMESTAMP_MULTIPLIER), + ) + }) + .collect::>(); + let id10 = BlockId::Hash(blocks1[0].header.hash()); + + let _ = (0..LEVEL_LIMIT) + .into_iter() + .map(|i| { + build_and_import_block_ext( + &*client, + BlockOrigin::Own, + false, + &mut para_import, + Some(id10), + Some(i as u64 * TIMESTAMP_MULTIPLIER), + ) + }) + .collect::>(); + + // Scenario before limit application (with B11 imported as best) + // Import order (freshess): B00, B10, B11, B12, B20, B21 + // + // B00 --+-- B10 --+-- B20 + // | +-- B21 + // +-- B11 + // | + // +-- B12 + + // Simulate a restart by forcing a new monitor structure instance + + let mut para_import = ParachainBlockImport::new_with_limit( + client.clone(), + backend.clone(), + LevelLimit::Some(LEVEL_LIMIT), + ); + + let block13 = build_and_import_block_ext( + &*client, + BlockOrigin::Own, + false, + &mut para_import, + Some(id00), + Some(LEVEL_LIMIT as u64 * TIMESTAMP_MULTIPLIER), + ); + + // Expected scenario + // + // B0 --+-- B11 + // +--(B13) + + let leaves = backend.blockchain().leaves().unwrap(); + let expected = vec![blocks1[1].header.hash(), block13.header.hash()]; + assert_eq!(leaves, expected); + + let monitor = para_import.monitor.unwrap(); + let monitor = monitor.shared_data(); + assert_eq!(monitor.import_counter, 5); + assert!(monitor.levels.iter().all(|(number, hashes)| { + hashes + .iter() + .filter(|hash| **hash != block13.header.hash()) + .all(|hash| *number == *monitor.freshness.get(hash).unwrap()) + })); + assert_eq!(*monitor.freshness.get(&block13.header.hash()).unwrap(), monitor.import_counter - 1); +} diff --git a/client/consensus/relay-chain/src/import_queue.rs b/client/consensus/relay-chain/src/import_queue.rs index 0460ab8d582..31004c0005e 100644 --- a/client/consensus/relay-chain/src/import_queue.rs +++ b/client/consensus/relay-chain/src/import_queue.rs @@ -16,7 +16,7 @@ use std::{marker::PhantomData, sync::Arc}; -use cumulus_client_consensus_common::ParachainBlockImport; +use cumulus_client_consensus_common::ParachainBlockImportMarker; use sc_consensus::{ import_queue::{BasicQueue, Verifier as VerifierT}, @@ -107,13 +107,17 @@ where /// Start an import queue for a Cumulus collator that does not uses any special authoring logic. pub fn import_queue( client: Arc, - block_import: ParachainBlockImport, + block_import: I, create_inherent_data_providers: CIDP, spawner: &impl sp_core::traits::SpawnEssentialNamed, registry: Option<&substrate_prometheus_endpoint::Registry>, ) -> ClientResult> where - I: BlockImport + Send + Sync + 'static, + I: BlockImport + + ParachainBlockImportMarker + + Send + + Sync + + 'static, I::Transaction: Send, Client: ProvideRuntimeApi + Send + Sync + 'static, >::Api: BlockBuilderApi, diff --git a/client/consensus/relay-chain/src/lib.rs b/client/consensus/relay-chain/src/lib.rs index efcdc1e4c3b..4cd0ab24beb 100644 --- a/client/consensus/relay-chain/src/lib.rs +++ b/client/consensus/relay-chain/src/lib.rs @@ -34,11 +34,10 @@ //! 5. After the parachain candidate got backed and included, all collators start at 1. use cumulus_client_consensus_common::{ - ParachainBlockImport, ParachainCandidate, ParachainConsensus, + ParachainBlockImportMarker, ParachainCandidate, ParachainConsensus, }; use cumulus_primitives_core::{relay_chain::v2::Hash as PHash, ParaId, PersistedValidationData}; use cumulus_relay_chain_interface::RelayChainInterface; -use parking_lot::Mutex; use sc_consensus::{BlockImport, BlockImportParams}; use sp_consensus::{ @@ -46,6 +45,8 @@ use sp_consensus::{ }; use sp_inherents::{CreateInherentDataProviders, InherentData, InherentDataProvider}; use sp_runtime::traits::{Block as BlockT, Header as HeaderT}; + +use parking_lot::Mutex; use std::{marker::PhantomData, sync::Arc, time::Duration}; mod import_queue; @@ -56,11 +57,11 @@ const LOG_TARGET: &str = "cumulus-consensus-relay-chain"; /// The implementation of the relay-chain provided consensus for parachains. pub struct RelayChainConsensus { para_id: ParaId, - _phantom: PhantomData, proposer_factory: Arc>, create_inherent_data_providers: Arc, - block_import: Arc>>, + block_import: Arc>, relay_chain_interface: RCInterface, + _phantom: PhantomData, } impl Clone for RelayChainConsensus @@ -70,11 +71,11 @@ where fn clone(&self) -> Self { Self { para_id: self.para_id, - _phantom: PhantomData, proposer_factory: self.proposer_factory.clone(), create_inherent_data_providers: self.create_inherent_data_providers.clone(), block_import: self.block_import.clone(), relay_chain_interface: self.relay_chain_interface.clone(), + _phantom: PhantomData, } } } @@ -82,6 +83,7 @@ where impl RelayChainConsensus where B: BlockT, + BI: ParachainBlockImportMarker, RCInterface: RelayChainInterface, CIDP: CreateInherentDataProviders, { @@ -90,7 +92,7 @@ where para_id: ParaId, proposer_factory: PF, create_inherent_data_providers: CIDP, - block_import: ParachainBlockImport, + block_import: BI, relay_chain_interface: RCInterface, ) -> Self { Self { @@ -143,7 +145,7 @@ impl ParachainConsensus where B: BlockT, RCInterface: RelayChainInterface + Clone, - BI: BlockImport + Send + Sync, + BI: BlockImport + ParachainBlockImportMarker + Send + Sync, PF: Environment + Send + Sync, PF::Proposer: Proposer< B, @@ -221,7 +223,7 @@ pub struct BuildRelayChainConsensusParams { pub para_id: ParaId, pub proposer_factory: PF, pub create_inherent_data_providers: CIDP, - pub block_import: ParachainBlockImport, + pub block_import: BI, pub relay_chain_interface: RCInterface, } @@ -246,7 +248,7 @@ where ProofRecording = EnableProofRecording, Proof = ::Proof, >, - BI: BlockImport + Send + Sync + 'static, + BI: BlockImport + ParachainBlockImportMarker + Send + Sync + 'static, CIDP: CreateInherentDataProviders + 'static, RCInterface: RelayChainInterface + Clone + 'static, { diff --git a/client/pov-recovery/src/active_candidate_recovery.rs b/client/pov-recovery/src/active_candidate_recovery.rs index a269a26f821..caae3615a85 100644 --- a/client/pov-recovery/src/active_candidate_recovery.rs +++ b/client/pov-recovery/src/active_candidate_recovery.rs @@ -42,19 +42,19 @@ impl ActiveCandidateRecovery { Self { recoveries: Default::default(), candidates: Default::default(), overseer_handle } } - /// Recover the given `pending_candidate`. + /// Recover the given `candidate`. pub async fn recover_candidate( &mut self, block_hash: Block::Hash, - pending_candidate: crate::PendingCandidate, + candidate: &crate::Candidate, ) { let (tx, rx) = oneshot::channel(); self.overseer_handle .send_msg( AvailabilityRecoveryMessage::RecoverAvailableData( - pending_candidate.receipt, - pending_candidate.session_index, + candidate.receipt.clone(), + candidate.session_index, None, tx, ), diff --git a/client/pov-recovery/src/lib.rs b/client/pov-recovery/src/lib.rs index e1f59677423..90c0a853214 100644 --- a/client/pov-recovery/src/lib.rs +++ b/client/pov-recovery/src/lib.rs @@ -59,7 +59,9 @@ use cumulus_primitives_core::ParachainBlockData; use cumulus_relay_chain_interface::{RelayChainInterface, RelayChainResult}; use codec::Decode; -use futures::{select, stream::FuturesUnordered, Future, FutureExt, Stream, StreamExt}; +use futures::{ + channel::mpsc::Receiver, select, stream::FuturesUnordered, Future, FutureExt, Stream, StreamExt, +}; use futures_timer::Delay; use rand::{thread_rng, Rng}; @@ -75,38 +77,52 @@ use active_candidate_recovery::ActiveCandidateRecovery; const LOG_TARGET: &str = "cumulus-pov-recovery"; -/// Represents a pending candidate. -struct PendingCandidate { - receipt: CandidateReceipt, - session_index: SessionIndex, - block_number: NumberFor, +/// Type of recovery to trigger. +#[derive(Debug, PartialEq)] +pub enum RecoveryKind { + /// Single block recovery. + Simple, + /// Full ancestry recovery. + Full, } -/// The delay between observing an unknown block and recovering this block. +/// Structure used to trigger an explicit recovery request via `PoVRecovery`. +pub struct RecoveryRequest { + /// Hash of the last block to recover. + pub hash: Block::Hash, + /// Recovery delay range. Randomizing the start of the recovery within this interval + /// can be used to prevent self-DOSing if the recovery request is part of a + /// distributed protocol and there is the possibility that multiple actors are + /// requiring to perform the recovery action at approximately the same time. + pub delay: RecoveryDelay, + /// Recovery type. + pub kind: RecoveryKind, +} + +/// The delay between observing an unknown block and triggering the recovery of a block. #[derive(Clone, Copy)] -pub enum RecoveryDelay { - /// Start recovering the block in maximum of the given delay. - WithMax { max: Duration }, - /// Start recovering the block after at least `min` delay and in maximum `max` delay. - WithMinAndMax { min: Duration, max: Duration }, +pub struct RecoveryDelay { + /// Start recovering after `min` delay. + pub min: Duration, + /// Start recovering before `max` delay. + pub max: Duration, } -impl RecoveryDelay { - /// Return as [`Delay`]. - fn as_delay(self) -> Delay { - match self { - Self::WithMax { max } => Delay::new(max.mul_f64(thread_rng().gen())), - Self::WithMinAndMax { min, max } => - Delay::new(min + max.saturating_sub(min).mul_f64(thread_rng().gen())), - } - } +/// Represents an outstanding block candidate. +struct Candidate { + receipt: CandidateReceipt, + session_index: SessionIndex, + block_number: NumberFor, + parent_hash: Block::Hash, + // Lazy recovery has been submitted. + waiting_recovery: bool, } /// Encapsulates the logic of the pov recovery. pub struct PoVRecovery { /// All the pending candidates that we are waiting for to be imported or that need to be /// recovered when `next_candidate_to_recover` tells us to do so. - pending_candidates: HashMap>, + candidates: HashMap>, /// A stream of futures that resolve to hashes of candidates that need to be recovered. /// /// The candidates to the hashes are stored in `pending_candidates`. If a candidate is not @@ -122,6 +138,8 @@ pub struct PoVRecovery { parachain_import_queue: Box>, relay_chain_interface: RC, para_id: ParaId, + /// Explicit block recovery requests channel. + recovery_chan_rx: Receiver>, } impl PoVRecovery @@ -137,9 +155,10 @@ where parachain_import_queue: Box>, relay_chain_interface: RCInterface, para_id: ParaId, + recovery_chan_rx: Receiver>, ) -> Self { Self { - pending_candidates: HashMap::new(), + candidates: HashMap::new(), next_candidate_to_recover: Default::default(), active_candidate_recovery: ActiveCandidateRecovery::new(overseer_handle), recovery_delay, @@ -148,6 +167,7 @@ where parachain_import_queue, relay_chain_interface, para_id, + recovery_chan_rx, } } @@ -174,69 +194,54 @@ where } let hash = header.hash(); - match self.parachain_client.block_status(&BlockId::Hash(hash)) { - Ok(BlockStatus::Unknown) => (), - // Any other state means, we should ignore it. - Ok(_) => return, - Err(e) => { - tracing::debug!( - target: LOG_TARGET, - error = ?e, - block_hash = ?hash, - "Failed to get block status", - ); - return - }, - } - tracing::debug!(target: LOG_TARGET, ?hash, "Adding pending candidate"); - if self - .pending_candidates - .insert( - hash, - PendingCandidate { - block_number: *header.number(), - receipt: receipt.to_plain(), - session_index, - }, - ) - .is_some() - { + if self.candidates.contains_key(&hash) { return } - // Delay the recovery by some random time to not spam the relay chain. - let delay = self.recovery_delay.as_delay(); - self.next_candidate_to_recover.push( - async move { - delay.await; - hash - } - .boxed(), + tracing::debug!(target: LOG_TARGET, block_hash = ?hash, "Adding outstanding candidate"); + self.candidates.insert( + hash, + Candidate { + block_number: *header.number(), + receipt: receipt.to_plain(), + session_index, + parent_hash: *header.parent_hash(), + waiting_recovery: false, + }, ); + + // If required, triggers a lazy recovery request that will eventually be blocked + // if in the meantime the block is imported. + self.recover(RecoveryRequest { + hash, + delay: self.recovery_delay, + kind: RecoveryKind::Simple, + }); } /// Handle an imported block. - fn handle_block_imported(&mut self, hash: &Block::Hash) { - self.pending_candidates.remove(hash); + fn handle_block_imported(&mut self, block_hash: &Block::Hash) { + self.candidates.get_mut(block_hash).map(|candidate| { + // Prevents triggering an already enqueued recovery request + candidate.waiting_recovery = false; + }); } /// Handle a finalized block with the given `block_number`. fn handle_block_finalized(&mut self, block_number: NumberFor) { - self.pending_candidates.retain(|_, pc| pc.block_number > block_number); + self.candidates.retain(|_, pc| pc.block_number > block_number); } /// Recover the candidate for the given `block_hash`. async fn recover_candidate(&mut self, block_hash: Block::Hash) { - let pending_candidate = match self.pending_candidates.remove(&block_hash) { - Some(pending_candidate) => pending_candidate, - None => return, - }; - - tracing::debug!(target: LOG_TARGET, ?block_hash, "Issuing recovery request"); - self.active_candidate_recovery - .recover_candidate(block_hash, pending_candidate) - .await; + match self.candidates.get(&block_hash) { + Some(candidate) if candidate.waiting_recovery => { + tracing::debug!(target: LOG_TARGET, ?block_hash, "Issuing recovery request"); + self.active_candidate_recovery.recover_candidate(block_hash, candidate).await; + }, + _ => (), + } } /// Clear `waiting_for_parent` from the given `hash` and do this recursively for all child @@ -348,7 +353,7 @@ where async fn import_block(&mut self, block: Block) { let mut blocks = VecDeque::new(); - tracing::debug!(target: LOG_TARGET, hash = ?block.hash(), "Importing block retrieved using pov_recovery"); + tracing::debug!(target: LOG_TARGET, block_hash = ?block.hash(), "Importing block retrieved using pov_recovery"); blocks.push_back(block); let mut incoming_blocks = Vec::new(); @@ -379,6 +384,70 @@ where .import_blocks(BlockOrigin::ConsensusBroadcast, incoming_blocks); } + /// Attempts an explicit recovery of one or more blocks. + pub fn recover(&mut self, req: RecoveryRequest) { + let RecoveryRequest { mut hash, delay, kind } = req; + let mut to_recover = Vec::new(); + + let do_recover = loop { + let candidate = match self.candidates.get_mut(&hash) { + Some(candidate) => candidate, + None => { + tracing::debug!( + target: LOG_TARGET, + block_hash = ?hash, + "Cound not recover. Block was never announced as candidate" + ); + break false + }, + }; + + match self.parachain_client.block_status(&BlockId::Hash(hash)) { + Ok(BlockStatus::Unknown) if !candidate.waiting_recovery => { + candidate.waiting_recovery = true; + to_recover.push(hash); + }, + Ok(_) => break true, + Err(e) => { + tracing::error!( + target: LOG_TARGET, + error = ?e, + block_hash = ?hash, + "Failed to get block status", + ); + break false + }, + } + + if kind == RecoveryKind::Simple { + break true + } + + hash = candidate.parent_hash; + }; + + if do_recover { + for hash in to_recover.into_iter().rev() { + let delay = + delay.min + delay.max.saturating_sub(delay.min).mul_f64(thread_rng().gen()); + tracing::debug!( + target: LOG_TARGET, + block_hash = ?hash, + "Starting {:?} block recovery in {:?} sec", + kind, + delay.as_secs(), + ); + self.next_candidate_to_recover.push( + async move { + Delay::new(delay).await; + hash + } + .boxed(), + ); + } + } + } + /// Run the pov-recovery. pub async fn run(mut self) { let mut imported_blocks = self.parachain_client.import_notification_stream().fuse(); @@ -400,10 +469,15 @@ where if let Some((receipt, session_index)) = pending_candidate { self.handle_pending_candidate(receipt, session_index); } else { - tracing::debug!( - target: LOG_TARGET, - "Pending candidates stream ended", - ); + tracing::debug!(target: LOG_TARGET, "Pending candidates stream ended"); + return; + } + }, + recovery_req = self.recovery_chan_rx.next() => { + if let Some(req) = recovery_req { + self.recover(req); + } else { + tracing::debug!(target: LOG_TARGET, "Recovery channel stream ended"); return; } }, @@ -411,10 +485,7 @@ where if let Some(imported) = imported { self.handle_block_imported(&imported.hash); } else { - tracing::debug!( - target: LOG_TARGET, - "Imported blocks stream ended", - ); + tracing::debug!(target: LOG_TARGET, "Imported blocks stream ended"); return; } }, @@ -422,10 +493,7 @@ where if let Some(finalized) = finalized { self.handle_block_finalized(*finalized.header.number()); } else { - tracing::debug!( - target: LOG_TARGET, - "Finalized blocks stream ended", - ); + tracing::debug!(target: LOG_TARGET, "Finalized blocks stream ended"); return; } }, diff --git a/client/service/Cargo.toml b/client/service/Cargo.toml index 6642deef06e..3c9108c8c71 100644 --- a/client/service/Cargo.toml +++ b/client/service/Cargo.toml @@ -6,6 +6,7 @@ edition = "2021" [dependencies] parking_lot = "0.12.1" +futures = "0.3.24" # Substrate sc-client-api = { git = "https://github.com/paritytech/substrate", branch = "master" } diff --git a/client/service/src/lib.rs b/client/service/src/lib.rs index 82ce3ce4872..4e091c033a7 100644 --- a/client/service/src/lib.rs +++ b/client/service/src/lib.rs @@ -20,11 +20,13 @@ use cumulus_client_cli::CollatorOptions; use cumulus_client_consensus_common::ParachainConsensus; +use cumulus_client_pov_recovery::{PoVRecovery, RecoveryDelay}; use cumulus_primitives_core::{CollectCollationInfo, ParaId}; use cumulus_relay_chain_inprocess_interface::build_inprocess_relay_chain; use cumulus_relay_chain_interface::{RelayChainInterface, RelayChainResult}; use cumulus_relay_chain_minimal_node::build_minimal_relay_chain_node; use polkadot_primitives::v2::CollatorPair; + use sc_client_api::{ Backend as BackendT, BlockBackend, BlockchainEvents, Finalizer, UsageProvider, }; @@ -35,8 +37,15 @@ use sp_api::ProvideRuntimeApi; use sp_blockchain::HeaderBackend; use sp_core::traits::SpawnNamed; use sp_runtime::traits::Block as BlockT; + +use futures::channel::mpsc; use std::{sync::Arc, time::Duration}; +// Given the sporadic nature of the explicit recovery operation and the +// possibility to retry infinite times this value is more than enough. +// In practice here we expect no more than one queued messages. +const RECOVERY_CHAN_SIZE: usize = 8; + /// Parameters given to [`start_collator`]. pub struct StartCollatorParams<'a, Block: BlockT, BS, Client, RCInterface, Spawner> { pub block_status: Arc, @@ -90,11 +99,14 @@ where RCInterface: RelayChainInterface + Clone + 'static, Backend: BackendT + 'static, { + let (recovery_chan_tx, recovery_chan_rx) = mpsc::channel(RECOVERY_CHAN_SIZE); + let consensus = cumulus_client_consensus_common::run_parachain_consensus( para_id, client.clone(), relay_chain_interface.clone(), announce_block.clone(), + Some(recovery_chan_tx), ); task_manager @@ -105,15 +117,16 @@ where .overseer_handle() .map_err(|e| sc_service::Error::Application(Box::new(e)))?; - let pov_recovery = cumulus_client_pov_recovery::PoVRecovery::new( + let pov_recovery = PoVRecovery::new( overseer_handle.clone(), // We want that collators wait at maximum the relay chain slot duration before starting // to recover blocks. - cumulus_client_pov_recovery::RecoveryDelay::WithMax { max: relay_chain_slot_duration }, + RecoveryDelay { min: core::time::Duration::ZERO, max: relay_chain_slot_duration }, client.clone(), import_queue, relay_chain_interface.clone(), para_id, + recovery_chan_rx, ); task_manager @@ -173,11 +186,14 @@ where Backend: BackendT + 'static, RCInterface: RelayChainInterface + Clone + 'static, { + let (recovery_chan_tx, recovery_chan_rx) = mpsc::channel(RECOVERY_CHAN_SIZE); + let consensus = cumulus_client_consensus_common::run_parachain_consensus( para_id, client.clone(), relay_chain_interface.clone(), announce_block, + Some(recovery_chan_tx), ); task_manager @@ -188,21 +204,19 @@ where .overseer_handle() .map_err(|e| sc_service::Error::Application(Box::new(e)))?; - let pov_recovery = cumulus_client_pov_recovery::PoVRecovery::new( + let pov_recovery = PoVRecovery::new( overseer_handle, // Full nodes should at least wait 2.5 minutes (assuming 6 seconds slot duration) and // in maximum 5 minutes before starting to recover blocks. Collators should already start // the recovery way before full nodes try to recover a certain block and then share the // block with the network using "the normal way". Full nodes are just the "last resort" // for block recovery. - cumulus_client_pov_recovery::RecoveryDelay::WithMinAndMax { - min: relay_chain_slot_duration * 25, - max: relay_chain_slot_duration * 50, - }, + RecoveryDelay { min: relay_chain_slot_duration * 25, max: relay_chain_slot_duration * 50 }, client, import_queue, relay_chain_interface, para_id, + recovery_chan_rx, ); task_manager diff --git a/parachain-template/node/src/service.rs b/parachain-template/node/src/service.rs index 2df17e2eb89..a8ee57cadbf 100644 --- a/parachain-template/node/src/service.rs +++ b/parachain-template/node/src/service.rs @@ -51,7 +51,7 @@ type ParachainClient = TFullClient; type ParachainBackend = TFullBackend; -type ParachainBlockImport = TParachainBlockImport>; +type ParachainBlockImport = TParachainBlockImport, ParachainBackend>; /// Starts a `ServiceBuilder` for a full service. /// @@ -111,7 +111,7 @@ pub fn new_partial( client.clone(), ); - let block_import = ParachainBlockImport::new(client.clone()); + let block_import = ParachainBlockImport::new(client.clone(), backend.clone()); let import_queue = build_import_queue( client.clone(), @@ -141,7 +141,7 @@ async fn start_node_impl( parachain_config: Configuration, polkadot_config: Configuration, collator_options: CollatorOptions, - id: ParaId, + para_id: ParaId, hwbench: Option, ) -> sc_service::error::Result<(TaskManager, Arc)> { let parachain_config = prepare_node_config(parachain_config); @@ -167,7 +167,8 @@ async fn start_node_impl( s => s.to_string().into(), })?; - let block_announce_validator = BlockAnnounceValidator::new(relay_chain_interface.clone(), id); + let block_announce_validator = + BlockAnnounceValidator::new(relay_chain_interface.clone(), para_id); let force_authoring = parachain_config.force_authoring; let validator = parachain_config.role.is_authority(); @@ -219,7 +220,7 @@ async fn start_node_impl( task_manager: &mut task_manager, config: parachain_config, keystore: params.keystore_container.sync_keystore(), - backend: backend.clone(), + backend, network: network.clone(), system_rpc_tx, tx_handler_controller, @@ -258,12 +259,12 @@ async fn start_node_impl( network, params.keystore_container.sync_keystore(), force_authoring, - id, + para_id, )?; let spawner = task_manager.spawn_handle(); let params = StartCollatorParams { - para_id: id, + para_id, block_status: client.clone(), announce_block, client: client.clone(), @@ -282,7 +283,7 @@ async fn start_node_impl( client: client.clone(), announce_block, task_manager: &mut task_manager, - para_id: id, + para_id, relay_chain_interface, relay_chain_slot_duration, import_queue: import_queue_service, @@ -345,7 +346,7 @@ fn build_consensus( sync_oracle: Arc>, keystore: SyncCryptoStorePtr, force_authoring: bool, - id: ParaId, + para_id: ParaId, ) -> Result>, sc_service::Error> { let slot_duration = cumulus_client_consensus_aura::slot_duration(&*client)?; @@ -367,7 +368,7 @@ fn build_consensus( relay_parent, &relay_chain_interface, &validation_data, - id, + para_id, ) .await; let timestamp = sp_timestamp::InherentDataProvider::from_system_time(); @@ -408,8 +409,8 @@ pub async fn start_parachain_node( parachain_config: Configuration, polkadot_config: Configuration, collator_options: CollatorOptions, - id: ParaId, + para_id: ParaId, hwbench: Option, ) -> sc_service::error::Result<(TaskManager, Arc)> { - start_node_impl(parachain_config, polkadot_config, collator_options, id, hwbench).await + start_node_impl(parachain_config, polkadot_config, collator_options, para_id, hwbench).await } diff --git a/polkadot-parachain/src/service.rs b/polkadot-parachain/src/service.rs index b38858c8f3e..7fab9065e3a 100644 --- a/polkadot-parachain/src/service.rs +++ b/polkadot-parachain/src/service.rs @@ -71,7 +71,8 @@ type ParachainClient = TFullClient; -type ParachainBlockImport = TParachainBlockImport>>; +type ParachainBlockImport = + TParachainBlockImport>, ParachainBackend>; /// Native executor instance. pub struct ShellRuntimeExecutor; @@ -275,7 +276,7 @@ where client.clone(), ); - let block_import = ParachainBlockImport::new(client.clone()); + let block_import = ParachainBlockImport::new(client.clone(), backend.clone()); let import_queue = build_import_queue( client.clone(), @@ -1142,6 +1143,7 @@ where let telemetry2 = telemetry.clone(); let prometheus_registry2 = prometheus_registry.map(|r| (*r).clone()); let relay_chain_for_aura = relay_chain_interface.clone(); + let aura_consensus = BuildOnAccess::Uninitialized(Some(Box::new(move || { let slot_duration = cumulus_client_consensus_aura::slot_duration(&*client2).unwrap(); diff --git a/test/service/src/lib.rs b/test/service/src/lib.rs index 76d568ffeba..2b7f17ce14b 100644 --- a/test/service/src/lib.rs +++ b/test/service/src/lib.rs @@ -117,11 +117,15 @@ pub type Client = TFullClient< sc_executor::NativeElseWasmExecutor, >; +/// The backend type being used by the test service. +pub type Backend = TFullBackend; + +/// The block-import type being used by the test service. +pub type ParachainBlockImport = TParachainBlockImport, Backend>; + /// Transaction pool type used by the test service pub type TransactionPool = Arc>; -type ParachainBlockImport = TParachainBlockImport>; - /// Starts a `ServiceBuilder` for a full service. /// /// Use this macro if you don't actually need the full service, but just the builder in order to @@ -131,7 +135,7 @@ pub fn new_partial( ) -> Result< PartialComponents< Client, - TFullBackend, + Backend, (), sc_consensus::import_queue::BasicQueue>, sc_transaction_pool::FullPool, @@ -150,7 +154,7 @@ pub fn new_partial( sc_service::new_full_parts::(config, None, executor)?; let client = Arc::new(client); - let block_import = ParachainBlockImport::new(client.clone()); + let block_import = ParachainBlockImport::new(client.clone(), backend.clone()); let registry = config.prometheus_registry(); @@ -299,7 +303,7 @@ where task_manager: &mut task_manager, config: parachain_config, keystore: params.keystore_container.sync_keystore(), - backend, + backend: backend.clone(), network: network.clone(), system_rpc_tx, tx_handler_controller,