Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Explicit recovery for announced candidate blocks #1891

Merged
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions client/consensus/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ sp-trie = { git = "https://github.com/paritytech/substrate", branch = "master" }
polkadot-primitives = { git = "https://github.com/paritytech/polkadot", branch = "master" }

# Cumulus
cumulus-primitives-core = { path = "../../../primitives/core" }
cumulus-relay-chain-interface = { path = "../../relay-chain-interface" }

[dev-dependencies]
Expand Down
43 changes: 28 additions & 15 deletions client/consensus/common/src/parachain_consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
// along with Cumulus. If not, see <http://www.gnu.org/licenses/>.

use async_trait::async_trait;
use cumulus_relay_chain_interface::{RelayChainInterface, RelayChainResult};
use sc_client_api::{
Backend, BlockBackend, BlockImportNotification, BlockchainEvents, Finalizer, UsageProvider,
};
Expand All @@ -27,15 +26,25 @@ use sp_runtime::{
traits::{Block as BlockT, Header as HeaderT},
};

use cumulus_primitives_core::{RecoveryDelay, RecoveryKind, RecoveryRequest};
use cumulus_relay_chain_interface::{RelayChainInterface, RelayChainResult};

use polkadot_primitives::v2::{Hash as PHash, Id as ParaId, OccupiedCoreAssumption};

use codec::Decode;
use futures::{channel::mpsc::Sender, select, FutureExt, Stream, StreamExt};

use std::{pin::Pin, sync::Arc};
use std::{pin::Pin, sync::Arc, time::Duration};

const LOG_TARGET: &str = "cumulus-consensus";

// Delay range to trigger explicit requests.
// The chosen value doesn't have any special meaning, a random delay within the order of
// seconds in practice should be a good enough to allow a quick recovery without DOSing
// the relay chain.
const RECOVERY_DELAY: RecoveryDelay =
RecoveryDelay { min: Duration::ZERO, max: Duration::from_secs(30) };
davxy marked this conversation as resolved.
Show resolved Hide resolved

/// Helper for the relay chain client. This is expected to be a lightweight handle like an `Arc`.
#[async_trait]
pub trait RelaychainClient: Clone + 'static {
Expand Down Expand Up @@ -136,7 +145,7 @@ pub async fn run_parachain_consensus<P, R, Block, B>(
parachain: Arc<P>,
relay_chain: R,
announce_block: Arc<dyn Fn(Block::Hash, Option<Vec<u8>>) + Send + Sync>,
recovery_chan_tx: Sender<Block::Hash>,
recovery_chan_tx: Option<Sender<RecoveryRequest<Block>>>,
) where
Block: BlockT,
P: Finalizer<Block, B>
Expand Down Expand Up @@ -169,7 +178,7 @@ async fn follow_new_best<P, R, Block, B>(
parachain: Arc<P>,
relay_chain: R,
announce_block: Arc<dyn Fn(Block::Hash, Option<Vec<u8>>) + Send + Sync>,
recovery_chan_tx: Sender<Block::Hash>,
recovery_chan_tx: Option<Sender<RecoveryRequest<Block>>>,
) where
Block: BlockT,
P: Finalizer<Block, B>
Expand Down Expand Up @@ -298,7 +307,7 @@ async fn handle_new_best_parachain_head<Block, P>(
head: Vec<u8>,
parachain: &P,
unset_best_header: &mut Option<Block::Header>,
mut recovery_chan_tx: Sender<Block::Hash>,
mut recovery_chan_tx: Option<Sender<RecoveryRequest<Block>>>,
) where
Block: BlockT,
P: UsageProvider<Block> + Send + Sync + BlockBackend<Block>,
Expand Down Expand Up @@ -348,16 +357,20 @@ async fn handle_new_best_parachain_head<Block, P>(
"Parachain block not yet imported, waiting for import to enact as best block.",
);

// Best effort to trigger a recovery.
// Error is not fatal. The relay chain will re-announce the best block anyway,
// thus we will have other opportunities to retry.
if let Err(err) = recovery_chan_tx.try_send(hash) {
tracing::warn!(
target: LOG_TARGET,
block_hash = ?hash,
error = ?err,
"Unable to notify block recovery subsystem"
)
if let Some(ref mut recovery_chan_tx) = recovery_chan_tx {
// Best effort channel to actively encourage block recovery.
// An error here is not fatal; the relay chain continuously re-announces
// the best block, thus we will have other opportunities to retry.
let req =
RecoveryRequest { hash, delay: RECOVERY_DELAY, kind: RecoveryKind::Full };
if let Err(err) = recovery_chan_tx.try_send(req) {
tracing::warn!(
target: LOG_TARGET,
block_hash = ?hash,
error = ?err,
"Unable to notify block recovery subsystem"
)
}
}
},
Err(e) => {
Expand Down
149 changes: 119 additions & 30 deletions client/consensus/common/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use crate::*;

use async_trait::async_trait;
use codec::Encode;
use cumulus_primitives_core::RecoveryKind;
use cumulus_relay_chain_interface::RelayChainResult;
use cumulus_test_client::{
runtime::{Block, Header},
Expand All @@ -29,7 +30,7 @@ use polkadot_primitives::v2::Id as ParaId;
use sc_client_api::{blockchain::Backend as _, Backend as _, UsageProvider};
use sc_consensus::{BlockImport, BlockImportParams, ForkChoiceStrategy};
use sp_blockchain::Error as ClientError;
use sp_consensus::BlockOrigin;
use sp_consensus::{BlockOrigin, BlockStatus};
use sp_runtime::generic::BlockId;
use std::{
sync::{Arc, Mutex},
Expand Down Expand Up @@ -103,22 +104,8 @@ impl crate::parachain_consensus::RelaychainClient for Relaychain {
}
}

fn build_and_import_block(mut client: Arc<Client>, import_as_best: bool) -> Block {
build_and_import_block_ext(
&*client.clone(),
BlockOrigin::Own,
import_as_best,
&mut client,
None,
None,
)
}

fn build_and_import_block_ext<B: InitBlockBuilder, I: BlockImport<Block>>(
fn build_block<B: InitBlockBuilder>(
builder: &B,
origin: BlockOrigin,
import_as_best: bool,
importer: &mut I,
at: Option<BlockId<Block>>,
timestamp: Option<u64>,
) -> Block {
Expand All @@ -132,27 +119,67 @@ fn build_and_import_block_ext<B: InitBlockBuilder, I: BlockImport<Block>>(
};

let mut block = builder.build().unwrap().block;
let (header, body) = block.clone().deconstruct();

// Simulate some form of post activity.
// Simulate some form of post activity (like a Seal or Other generic things).
// This is mostly used to excercise the `LevelMonitor` correct behavior.
// (in practice we want that header post-hash != pre-hash)
let post_digest = sp_runtime::DigestItem::Other(vec![1, 2, 3]);
block.header.digest.push(sp_runtime::DigestItem::Other(vec![1, 2, 3]));

block
}

async fn import_block<I: BlockImport<Block>>(
importer: &mut I,
block: Block,
origin: BlockOrigin,
import_as_best: bool,
) {
let (mut header, body) = block.deconstruct();

let post_digest =
header.digest.pop().expect("post digested is present in manually crafted block");

let mut block_import_params = BlockImportParams::new(origin, header);
block_import_params.fork_choice = Some(ForkChoiceStrategy::Custom(import_as_best));
block_import_params.body = Some(body);
block_import_params.post_digests.push(post_digest.clone());
block_import_params.post_digests.push(post_digest);

block_on(importer.import_block(block_import_params, Default::default())).unwrap();
importer.import_block(block_import_params, Default::default()).await.unwrap();
}

// In order to get a header hash compatible with block import params containing some
// form of `post_digest`, we need to manually push the post digest within the header
// digest logs.
block.header.digest.push(post_digest);
fn import_block_sync<I: BlockImport<Block>>(
importer: &mut I,
block: Block,
origin: BlockOrigin,
import_as_best: bool,
) {
block_on(import_block(importer, block, origin, import_as_best));
}

fn build_and_import_block_ext<B: InitBlockBuilder, I: BlockImport<Block>>(
builder: &B,
origin: BlockOrigin,
import_as_best: bool,
importer: &mut I,
at: Option<BlockId<Block>>,
timestamp: Option<u64>,
) -> Block {
let block = build_block(builder, at, timestamp);
import_block_sync(importer, block.clone(), origin, import_as_best);
block
}

fn build_and_import_block(mut client: Arc<Client>, import_as_best: bool) -> Block {
build_and_import_block_ext(
&*client.clone(),
BlockOrigin::Own,
import_as_best,
&mut client,
None,
None,
)
}

#[test]
fn follow_new_best_works() {
sp_tracing::try_init_simple();
Expand All @@ -164,7 +191,7 @@ fn follow_new_best_works() {
let new_best_heads_sender = relay_chain.inner.lock().unwrap().new_best_heads_sender.clone();

let consensus =
run_parachain_consensus(100.into(), client.clone(), relay_chain, Arc::new(|_, _| {}));
run_parachain_consensus(100.into(), client.clone(), relay_chain, Arc::new(|_, _| {}), None);

let work = async move {
new_best_heads_sender.unbounded_send(block.header().clone()).unwrap();
Expand All @@ -187,6 +214,68 @@ fn follow_new_best_works() {
});
}

#[test]
fn follow_new_best_with_dummy_recovery_works() {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test to excercise explicit best block recovery.

Even though currently we are using as a recovery mechanism the PoVRecovery subsystem, the motivation of this test is to also show how the recovery can be generic.

sp_tracing::try_init_simple();

let client = Arc::new(TestClientBuilder::default().build());

let relay_chain = Relaychain::new();
let new_best_heads_sender = relay_chain.inner.lock().unwrap().new_best_heads_sender.clone();

let (recovery_chan_tx, mut recovery_chan_rx) = futures::channel::mpsc::channel(3);

let consensus = run_parachain_consensus(
100.into(),
client.clone(),
relay_chain,
Arc::new(|_, _| {}),
Some(recovery_chan_tx),
);

let block = build_block(&*client.clone(), None, None);
let block_clone = block.clone();
let client_clone = client.clone();

let work = async move {
new_best_heads_sender.unbounded_send(block.header().clone()).unwrap();
loop {
Delay::new(Duration::from_millis(100)).await;
match client.block_status(&BlockId::Hash(block.hash())).unwrap() {
BlockStatus::Unknown => {},
status => {
assert_eq!(block.hash(), client.usage_info().chain.best_hash);
assert_eq!(status, BlockStatus::InChainWithState);
break
},
}
}
};

let dummy_block_recovery = async move {
loop {
if let Some(req) = recovery_chan_rx.next().await {
assert_eq!(req.hash, block_clone.hash());
assert_eq!(req.kind, RecoveryKind::Full);
Delay::new(Duration::from_millis(500)).await;
import_block(&mut &*client_clone, block_clone.clone(), BlockOrigin::Own, true)
.await;
}
}
};

block_on(async move {
futures::pin_mut!(consensus);
futures::pin_mut!(work);

select! {
r = consensus.fuse() => panic!("Consensus should not end: {:?}", r),
_ = dummy_block_recovery.fuse() => {},
_ = work.fuse() => {},
}
});
}

#[test]
fn follow_finalized_works() {
sp_tracing::try_init_simple();
Expand All @@ -198,7 +287,7 @@ fn follow_finalized_works() {
let finalized_sender = relay_chain.inner.lock().unwrap().finalized_heads_sender.clone();

let consensus =
run_parachain_consensus(100.into(), client.clone(), relay_chain, Arc::new(|_, _| {}));
run_parachain_consensus(100.into(), client.clone(), relay_chain, Arc::new(|_, _| {}), None);

let work = async move {
finalized_sender.unbounded_send(block.header().clone()).unwrap();
Expand Down Expand Up @@ -239,7 +328,7 @@ fn follow_finalized_does_not_stop_on_unknown_block() {
let finalized_sender = relay_chain.inner.lock().unwrap().finalized_heads_sender.clone();

let consensus =
run_parachain_consensus(100.into(), client.clone(), relay_chain, Arc::new(|_, _| {}));
run_parachain_consensus(100.into(), client.clone(), relay_chain, Arc::new(|_, _| {}), None);

let work = async move {
for _ in 0..3usize {
Expand Down Expand Up @@ -289,7 +378,7 @@ fn follow_new_best_sets_best_after_it_is_imported() {
let new_best_heads_sender = relay_chain.inner.lock().unwrap().new_best_heads_sender.clone();

let consensus =
run_parachain_consensus(100.into(), client.clone(), relay_chain, Arc::new(|_, _| {}));
run_parachain_consensus(100.into(), client.clone(), relay_chain, Arc::new(|_, _| {}), None);

let work = async move {
new_best_heads_sender.unbounded_send(block.header().clone()).unwrap();
Expand Down Expand Up @@ -366,7 +455,7 @@ fn do_not_set_best_block_to_older_block() {
let new_best_heads_sender = relay_chain.inner.lock().unwrap().new_best_heads_sender.clone();

let consensus =
run_parachain_consensus(100.into(), client.clone(), relay_chain, Arc::new(|_, _| {}));
run_parachain_consensus(100.into(), client.clone(), relay_chain, Arc::new(|_, _| {}), None);

let client2 = client.clone();
let work = async move {
Expand Down
8 changes: 4 additions & 4 deletions client/pov-recovery/src/active_candidate_recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,19 +42,19 @@ impl<Block: BlockT> ActiveCandidateRecovery<Block> {
Self { recoveries: Default::default(), candidates: Default::default(), overseer_handle }
}

/// Recover the given `pending_candidate`.
/// Recover the given `candidate`.
pub async fn recover_candidate(
&mut self,
block_hash: Block::Hash,
pending_candidate: crate::PendingCandidate<Block>,
candidate: &crate::Candidate<Block>,
) {
let (tx, rx) = oneshot::channel();

self.overseer_handle
.send_msg(
AvailabilityRecoveryMessage::RecoverAvailableData(
pending_candidate.receipt,
pending_candidate.session_index,
candidate.receipt.clone(),
candidate.session_index,
None,
tx,
),
Expand Down
Loading