diff --git a/Cargo.lock b/Cargo.lock index 419654736dd3..678a45946f88 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7233,6 +7233,7 @@ dependencies = [ name = "polkadot-node-collation-generation" version = "0.9.41" dependencies = [ + "assert_matches", "futures", "parity-scale-codec 3.4.0", "polkadot-erasure-coding", @@ -7243,6 +7244,7 @@ dependencies = [ "polkadot-primitives", "polkadot-primitives-test-helpers", "sp-core", + "sp-keyring", "sp-maybe-compressed-blob", "thiserror", "tracing-gum", diff --git a/node/collation-generation/Cargo.toml b/node/collation-generation/Cargo.toml index 68410c2cecbe..a2ece2dd5404 100644 --- a/node/collation-generation/Cargo.toml +++ b/node/collation-generation/Cargo.toml @@ -20,3 +20,5 @@ parity-scale-codec = { version = "3.4.0", default-features = false, features = [ [dev-dependencies] polkadot-node-subsystem-test-helpers = { path = "../subsystem-test-helpers" } test-helpers = { package = "polkadot-primitives-test-helpers", path = "../../primitives/test-helpers" } +assert_matches = "1.4.0" +sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master" } diff --git a/node/collation-generation/src/lib.rs b/node/collation-generation/src/lib.rs index 5599a737059f..8ee5b897ccc1 100644 --- a/node/collation-generation/src/lib.rs +++ b/node/collation-generation/src/lib.rs @@ -29,12 +29,15 @@ #![deny(missing_docs)] -use futures::{channel::mpsc, future::FutureExt, join, select, sink::SinkExt, stream::StreamExt}; +use futures::{channel::oneshot, future::FutureExt, join, select}; use parity_scale_codec::Encode; -use polkadot_node_primitives::{AvailableData, CollationGenerationConfig, PoV}; +use polkadot_node_primitives::{ + AvailableData, Collation, CollationGenerationConfig, CollationSecondedSignal, PoV, + SubmitCollationParams, +}; use polkadot_node_subsystem::{ messages::{CollationGenerationMessage, CollatorProtocolMessage}, - overseer, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, SpawnedSubsystem, + overseer, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, RuntimeApiError, SpawnedSubsystem, SubsystemContext, SubsystemError, SubsystemResult, }; use polkadot_node_subsystem_util::{ @@ -43,7 +46,7 @@ use polkadot_node_subsystem_util::{ }; use polkadot_primitives::{ collator_signature_payload, CandidateCommitments, CandidateDescriptor, CandidateReceipt, - CoreState, Hash, Id as ParaId, OccupiedCoreAssumption, PersistedValidationData, + CollatorPair, CoreState, Hash, Id as ParaId, OccupiedCoreAssumption, PersistedValidationData, ValidationCodeHash, }; use sp_core::crypto::Pair; @@ -84,26 +87,13 @@ impl CollationGenerationSubsystem { /// If `err_tx` is not `None`, errors are forwarded onto that channel as they occur. /// Otherwise, most are logged and then discarded. async fn run(mut self, mut ctx: Context) { - // when we activate new leaves, we spawn a bunch of sub-tasks, each of which is - // expected to generate precisely one message. We don't want to block the main loop - // at any point waiting for them all, so instead, we create a channel on which they can - // send those messages. We can then just monitor the channel and forward messages on it - // to the overseer here, via the context. - let (sender, receiver) = mpsc::channel(0); - - let mut receiver = receiver.fuse(); loop { select! { incoming = ctx.recv().fuse() => { - if self.handle_incoming::(incoming, &mut ctx, &sender).await { + if self.handle_incoming::(incoming, &mut ctx).await { break; } }, - msg = receiver.next() => { - if let Some(msg) = msg { - ctx.send_message(msg).await; - } - }, } } } @@ -116,7 +106,6 @@ impl CollationGenerationSubsystem { &mut self, incoming: SubsystemResult::Message>>, ctx: &mut Context, - sender: &mpsc::Sender, ) -> bool { match incoming { Ok(FromOrchestra::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { @@ -131,7 +120,6 @@ impl CollationGenerationSubsystem { activated.into_iter().map(|v| v.hash), ctx, metrics, - sender, ) .await { @@ -152,6 +140,21 @@ impl CollationGenerationSubsystem { } false }, + Ok(FromOrchestra::Communication { + msg: CollationGenerationMessage::SubmitCollation(params), + }) => { + if let Some(config) = &self.config { + if let Err(err) = + handle_submit_collation(params, config, ctx, &self.metrics).await + { + gum::error!(target: LOG_TARGET, ?err, "Failed to submit collation"); + } + } else { + gum::error!(target: LOG_TARGET, "Collation submitted before initialization"); + } + + false + }, Ok(FromOrchestra::Signal(OverseerSignal::BlockFinalized(..))) => false, Err(err) => { gum::error!( @@ -185,11 +188,14 @@ async fn handle_new_activations( activated: impl IntoIterator, ctx: &mut Context, metrics: Metrics, - sender: &mpsc::Sender, ) -> crate::error::Result<()> { // follow the procedure from the guide: // https://paritytech.github.io/polkadot/book/node/collators/collation-generation.html + if config.collator.is_none() { + return Ok(()) + } + let _overall_timer = metrics.time_new_activations(); for relay_parent in activated { @@ -268,7 +274,7 @@ async fn handle_new_activations( }, }; - let validation_code_hash = match obtain_current_validation_code_hash( + let validation_code_hash = match obtain_validation_code_hash_with_assumption( relay_parent, scheduled_core.para_id, assumption, @@ -291,16 +297,18 @@ async fn handle_new_activations( }; let task_config = config.clone(); - let mut task_sender = sender.clone(); let metrics = metrics.clone(); + let mut task_sender = ctx.sender().clone(); ctx.spawn( "collation-builder", Box::pin(async move { - let persisted_validation_data_hash = validation_data.hash(); - let parent_head_data_hash = validation_data.parent_head.hash(); + let collator_fn = match task_config.collator.as_ref() { + Some(x) => x, + None => return, + }; let (collation, result_sender) = - match (task_config.collator)(relay_parent, &validation_data).await { + match collator_fn(relay_parent, &validation_data).await { Some(collation) => collation.into_inner(), None => { gum::debug!( @@ -312,108 +320,21 @@ async fn handle_new_activations( }, }; - // Apply compression to the block data. - let pov = { - let pov = collation.proof_of_validity.into_compressed(); - let encoded_size = pov.encoded_size(); - - // As long as `POV_BOMB_LIMIT` is at least `max_pov_size`, this ensures - // that honest collators never produce a PoV which is uncompressed. - // - // As such, honest collators never produce an uncompressed PoV which starts with - // a compression magic number, which would lead validators to reject the collation. - if encoded_size > validation_data.max_pov_size as usize { - gum::debug!( - target: LOG_TARGET, - para_id = %scheduled_core.para_id, - size = encoded_size, - max_size = validation_data.max_pov_size, - "PoV exceeded maximum size" - ); - - return - } - - pov - }; - - let pov_hash = pov.hash(); - - let signature_payload = collator_signature_payload( - &relay_parent, - &scheduled_core.para_id, - &persisted_validation_data_hash, - &pov_hash, - &validation_code_hash, - ); - - let erasure_root = - match erasure_root(n_validators, validation_data, pov.clone()) { - Ok(erasure_root) => erasure_root, - Err(err) => { - gum::error!( - target: LOG_TARGET, - para_id = %scheduled_core.para_id, - err = ?err, - "failed to calculate erasure root", - ); - return - }, - }; - - let commitments = CandidateCommitments { - upward_messages: collation.upward_messages, - horizontal_messages: collation.horizontal_messages, - new_validation_code: collation.new_validation_code, - head_data: collation.head_data, - processed_downward_messages: collation.processed_downward_messages, - hrmp_watermark: collation.hrmp_watermark, - }; - - let ccr = CandidateReceipt { - commitments_hash: commitments.hash(), - descriptor: CandidateDescriptor { - signature: task_config.key.sign(&signature_payload), - para_id: scheduled_core.para_id, + construct_and_distribute_receipt( + PreparedCollation { + collation, + para_id: task_config.para_id, relay_parent, - collator: task_config.key.public(), - persisted_validation_data_hash, - pov_hash, - erasure_root, - para_head: commitments.head_data.hash(), + validation_data, validation_code_hash, + n_validators, }, - }; - - gum::debug!( - target: LOG_TARGET, - candidate_hash = ?ccr.hash(), - ?pov_hash, - ?relay_parent, - para_id = %scheduled_core.para_id, - "candidate is generated", - ); - metrics.on_collation_generated(); - - if let Err(err) = task_sender - .send( - CollatorProtocolMessage::DistributeCollation( - ccr, - parent_head_data_hash, - pov, - result_sender, - ) - .into(), - ) - .await - { - gum::warn!( - target: LOG_TARGET, - para_id = %scheduled_core.para_id, - err = ?err, - "failed to send collation result", - ); - } + task_config.key.clone(), + &mut task_sender, + result_sender, + &metrics, + ) + .await; }), )?; } @@ -422,14 +343,199 @@ async fn handle_new_activations( Ok(()) } -async fn obtain_current_validation_code_hash( +#[overseer::contextbounds(CollationGeneration, prefix = self::overseer)] +async fn handle_submit_collation( + params: SubmitCollationParams, + config: &CollationGenerationConfig, + ctx: &mut Context, + metrics: &Metrics, +) -> crate::error::Result<()> { + let _timer = metrics.time_submit_collation(); + + let SubmitCollationParams { + relay_parent, + collation, + parent_head, + validation_code_hash, + result_sender, + } = params; + + let validators = request_validators(relay_parent, ctx.sender()).await.await??; + let n_validators = validators.len(); + + // We need to swap the parent-head data, but all other fields here will be correct. + let mut validation_data = match request_persisted_validation_data( + relay_parent, + config.para_id, + OccupiedCoreAssumption::TimedOut, + ctx.sender(), + ) + .await + .await?? + { + Some(v) => v, + None => { + gum::debug!( + target: LOG_TARGET, + relay_parent = ?relay_parent, + our_para = %config.para_id, + "No validation data for para - does it exist at this relay-parent?", + ); + return Ok(()) + }, + }; + + validation_data.parent_head = parent_head; + + let collation = PreparedCollation { + collation, + relay_parent, + para_id: config.para_id, + validation_data, + validation_code_hash, + n_validators, + }; + + construct_and_distribute_receipt( + collation, + config.key.clone(), + ctx.sender(), + result_sender, + metrics, + ) + .await; + + Ok(()) +} + +struct PreparedCollation { + collation: Collation, + para_id: ParaId, + relay_parent: Hash, + validation_data: PersistedValidationData, + validation_code_hash: ValidationCodeHash, + n_validators: usize, +} + +/// Takes a prepared collation, along with its context, and produces a candidate receipt +/// which is distributed to validators. +async fn construct_and_distribute_receipt( + collation: PreparedCollation, + key: CollatorPair, + sender: &mut impl overseer::CollationGenerationSenderTrait, + result_sender: Option>, + metrics: &Metrics, +) { + let PreparedCollation { + collation, + para_id, + relay_parent, + validation_data, + validation_code_hash, + n_validators, + } = collation; + + let persisted_validation_data_hash = validation_data.hash(); + let parent_head_data_hash = validation_data.parent_head.hash(); + + // Apply compression to the block data. + let pov = { + let pov = collation.proof_of_validity.into_compressed(); + let encoded_size = pov.encoded_size(); + + // As long as `POV_BOMB_LIMIT` is at least `max_pov_size`, this ensures + // that honest collators never produce a PoV which is uncompressed. + // + // As such, honest collators never produce an uncompressed PoV which starts with + // a compression magic number, which would lead validators to reject the collation. + if encoded_size > validation_data.max_pov_size as usize { + gum::debug!( + target: LOG_TARGET, + para_id = %para_id, + size = encoded_size, + max_size = validation_data.max_pov_size, + "PoV exceeded maximum size" + ); + + return + } + + pov + }; + + let pov_hash = pov.hash(); + + let signature_payload = collator_signature_payload( + &relay_parent, + ¶_id, + &persisted_validation_data_hash, + &pov_hash, + &validation_code_hash, + ); + + let erasure_root = match erasure_root(n_validators, validation_data, pov.clone()) { + Ok(erasure_root) => erasure_root, + Err(err) => { + gum::error!( + target: LOG_TARGET, + para_id = %para_id, + err = ?err, + "failed to calculate erasure root", + ); + return + }, + }; + + let commitments = CandidateCommitments { + upward_messages: collation.upward_messages, + horizontal_messages: collation.horizontal_messages, + new_validation_code: collation.new_validation_code, + head_data: collation.head_data, + processed_downward_messages: collation.processed_downward_messages, + hrmp_watermark: collation.hrmp_watermark, + }; + + let ccr = CandidateReceipt { + commitments_hash: commitments.hash(), + descriptor: CandidateDescriptor { + signature: key.sign(&signature_payload), + para_id, + relay_parent, + collator: key.public(), + persisted_validation_data_hash, + pov_hash, + erasure_root, + para_head: commitments.head_data.hash(), + validation_code_hash, + }, + }; + + gum::debug!( + target: LOG_TARGET, + candidate_hash = ?ccr.hash(), + ?pov_hash, + ?relay_parent, + para_id = %para_id, + "candidate is generated", + ); + metrics.on_collation_generated(); + + sender + .send_message(CollatorProtocolMessage::DistributeCollation( + ccr, + parent_head_data_hash, + pov, + result_sender, + )) + .await; +} + +async fn obtain_validation_code_hash_with_assumption( relay_parent: Hash, para_id: ParaId, assumption: OccupiedCoreAssumption, sender: &mut impl overseer::CollationGenerationSenderTrait, -) -> Result, crate::error::Error> { - use polkadot_node_subsystem::RuntimeApiError; - +) -> crate::error::Result> { match request_validation_code_hash(relay_parent, para_id, assumption, sender) .await .await? diff --git a/node/collation-generation/src/metrics.rs b/node/collation-generation/src/metrics.rs index cb9e4a0c8e85..c7690ec82c4f 100644 --- a/node/collation-generation/src/metrics.rs +++ b/node/collation-generation/src/metrics.rs @@ -22,6 +22,7 @@ pub(crate) struct MetricsInner { pub(crate) new_activations_overall: prometheus::Histogram, pub(crate) new_activations_per_relay_parent: prometheus::Histogram, pub(crate) new_activations_per_availability_core: prometheus::Histogram, + pub(crate) submit_collation: prometheus::Histogram, } /// `CollationGenerationSubsystem` metrics. @@ -57,6 +58,11 @@ impl Metrics { .as_ref() .map(|metrics| metrics.new_activations_per_availability_core.start_timer()) } + + /// Provide a timer for submitting a collation which updates on drop. + pub fn time_submit_collation(&self) -> Option { + self.0.as_ref().map(|metrics| metrics.submit_collation.start_timer()) + } } impl metrics::Metrics for Metrics { @@ -96,6 +102,15 @@ impl metrics::Metrics for Metrics { )?, registry, )?, + submit_collation: prometheus::register( + prometheus::Histogram::with_opts( + prometheus::HistogramOpts::new( + "polkadot_parachain_collation_generation_submit_collation", + "Time spent preparing and submitting a collation to the network protocol", + ) + )?, + registry, + )?, }; Ok(Metrics(Some(metrics))) } diff --git a/node/collation-generation/src/tests.rs b/node/collation-generation/src/tests.rs index b2534bcf36c1..b7ff4ec2a576 100644 --- a/node/collation-generation/src/tests.rs +++ b/node/collation-generation/src/tests.rs @@ -14,472 +14,588 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . -mod handle_new_activations { - use super::super::*; - use ::test_helpers::{dummy_hash, dummy_head_data, dummy_validator}; - use futures::{ - lock::Mutex, - task::{Context as FuturesContext, Poll}, - Future, +use super::*; +use assert_matches::assert_matches; +use futures::{ + lock::Mutex, + task::{Context as FuturesContext, Poll}, + Future, +}; +use polkadot_node_primitives::{BlockData, Collation, CollationResult, MaybeCompressedPoV, PoV}; +use polkadot_node_subsystem::{ + errors::RuntimeApiError, + messages::{AllMessages, RuntimeApiMessage, RuntimeApiRequest}, +}; +use polkadot_node_subsystem_test_helpers::{subsystem_test_harness, TestSubsystemContextHandle}; +use polkadot_node_subsystem_util::TimeoutExt; +use polkadot_primitives::{ + CollatorPair, HeadData, Id as ParaId, PersistedValidationData, ScheduledCore, ValidationCode, +}; +use sp_keyring::sr25519::Keyring as Sr25519Keyring; +use std::pin::Pin; +use test_helpers::{dummy_hash, dummy_head_data, dummy_validator}; + +type VirtualOverseer = TestSubsystemContextHandle; + +fn test_harness>(test: impl FnOnce(VirtualOverseer) -> T) { + let pool = sp_core::testing::TaskExecutor::new(); + let (context, virtual_overseer) = + polkadot_node_subsystem_test_helpers::make_subsystem_context(pool); + let subsystem = async move { + let subsystem = crate::CollationGenerationSubsystem::new(Metrics::default()); + + subsystem.run(context).await; }; - use polkadot_node_primitives::{ - BlockData, Collation, CollationResult, MaybeCompressedPoV, PoV, - }; - use polkadot_node_subsystem::{ - errors::RuntimeApiError, - messages::{AllMessages, RuntimeApiMessage, RuntimeApiRequest}, - }; - use polkadot_node_subsystem_test_helpers::{ - subsystem_test_harness, TestSubsystemContextHandle, - }; - use polkadot_primitives::{ - CollatorPair, Id as ParaId, PersistedValidationData, ScheduledCore, ValidationCode, - }; - use std::pin::Pin; - - fn test_collation() -> Collation { - Collation { - upward_messages: Default::default(), - horizontal_messages: Default::default(), - new_validation_code: None, - head_data: dummy_head_data(), - proof_of_validity: MaybeCompressedPoV::Raw(PoV { block_data: BlockData(Vec::new()) }), - processed_downward_messages: 0_u32, - hrmp_watermark: 0_u32.into(), - } - } - fn test_collation_compressed() -> Collation { - let mut collation = test_collation(); - let compressed = collation.proof_of_validity.clone().into_compressed(); - collation.proof_of_validity = MaybeCompressedPoV::Compressed(compressed); - collation - } + let test_fut = test(virtual_overseer); + + futures::pin_mut!(test_fut); + futures::executor::block_on(futures::future::join( + async move { + let mut virtual_overseer = test_fut.await; + // Ensure we have handled all responses. + if let Ok(Some(msg)) = virtual_overseer.rx.try_next() { + panic!("Did not handle all responses: {:?}", msg); + } + // Conclude. + virtual_overseer.send(FromOrchestra::Signal(OverseerSignal::Conclude)).await; + }, + subsystem, + )); +} - fn test_validation_data() -> PersistedValidationData { - let mut persisted_validation_data = PersistedValidationData::default(); - persisted_validation_data.max_pov_size = 1024; - persisted_validation_data +fn test_collation() -> Collation { + Collation { + upward_messages: Default::default(), + horizontal_messages: Default::default(), + new_validation_code: None, + head_data: dummy_head_data(), + proof_of_validity: MaybeCompressedPoV::Raw(PoV { block_data: BlockData(Vec::new()) }), + processed_downward_messages: 0_u32, + hrmp_watermark: 0_u32.into(), } +} - // Box + Unpin + Send - struct TestCollator; +fn test_collation_compressed() -> Collation { + let mut collation = test_collation(); + let compressed = collation.proof_of_validity.clone().into_compressed(); + collation.proof_of_validity = MaybeCompressedPoV::Compressed(compressed); + collation +} - impl Future for TestCollator { - type Output = Option; +fn test_validation_data() -> PersistedValidationData { + let mut persisted_validation_data = PersistedValidationData::default(); + persisted_validation_data.max_pov_size = 1024; + persisted_validation_data +} - fn poll(self: Pin<&mut Self>, _cx: &mut FuturesContext) -> Poll { - Poll::Ready(Some(CollationResult { collation: test_collation(), result_sender: None })) - } +// Box + Unpin + Send +struct TestCollator; + +impl Future for TestCollator { + type Output = Option; + + fn poll(self: Pin<&mut Self>, _cx: &mut FuturesContext) -> Poll { + Poll::Ready(Some(CollationResult { collation: test_collation(), result_sender: None })) } +} + +impl Unpin for TestCollator {} - impl Unpin for TestCollator {} +async fn overseer_recv(overseer: &mut VirtualOverseer) -> AllMessages { + const TIMEOUT: std::time::Duration = std::time::Duration::from_millis(2000); - fn test_config>(para_id: Id) -> Arc { - Arc::new(CollationGenerationConfig { - key: CollatorPair::generate().0, - collator: Box::new(|_: Hash, _vd: &PersistedValidationData| TestCollator.boxed()), - para_id: para_id.into(), - }) + overseer + .recv() + .timeout(TIMEOUT) + .await + .expect(&format!("{:?} is long enough to receive messages", TIMEOUT)) +} + +fn test_config>(para_id: Id) -> CollationGenerationConfig { + CollationGenerationConfig { + key: CollatorPair::generate().0, + collator: Some(Box::new(|_: Hash, _vd: &PersistedValidationData| TestCollator.boxed())), + para_id: para_id.into(), } +} - fn scheduled_core_for>(para_id: Id) -> ScheduledCore { - ScheduledCore { para_id: para_id.into(), collator: None } +fn test_config_no_collator>(para_id: Id) -> CollationGenerationConfig { + CollationGenerationConfig { + key: CollatorPair::generate().0, + collator: None, + para_id: para_id.into(), } +} - #[test] - fn requests_availability_per_relay_parent() { - let activated_hashes: Vec = - vec![[1; 32].into(), [4; 32].into(), [9; 32].into(), [16; 32].into()]; - - let requested_availability_cores = Arc::new(Mutex::new(Vec::new())); - - let overseer_requested_availability_cores = requested_availability_cores.clone(); - let overseer = |mut handle: TestSubsystemContextHandle| async move { - loop { - match handle.try_recv().await { - None => break, - Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(hash, RuntimeApiRequest::AvailabilityCores(tx)))) => { - overseer_requested_availability_cores.lock().await.push(hash); - tx.send(Ok(vec![])).unwrap(); - } - Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(_hash, RuntimeApiRequest::Validators(tx)))) => { - tx.send(Ok(vec![dummy_validator(); 3])).unwrap(); - } - Some(msg) => panic!("didn't expect any other overseer requests given no availability cores; got {:?}", msg), +fn scheduled_core_for>(para_id: Id) -> ScheduledCore { + ScheduledCore { para_id: para_id.into(), collator: None } +} + +#[test] +fn requests_availability_per_relay_parent() { + let activated_hashes: Vec = + vec![[1; 32].into(), [4; 32].into(), [9; 32].into(), [16; 32].into()]; + + let requested_availability_cores = Arc::new(Mutex::new(Vec::new())); + + let overseer_requested_availability_cores = requested_availability_cores.clone(); + let overseer = |mut handle: TestSubsystemContextHandle| async move { + loop { + match handle.try_recv().await { + None => break, + Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(hash, RuntimeApiRequest::AvailabilityCores(tx)))) => { + overseer_requested_availability_cores.lock().await.push(hash); + tx.send(Ok(vec![])).unwrap(); + } + Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(_hash, RuntimeApiRequest::Validators(tx)))) => { + tx.send(Ok(vec![dummy_validator(); 3])).unwrap(); } + Some(msg) => panic!("didn't expect any other overseer requests given no availability cores; got {:?}", msg), } - }; - - let (tx, _rx) = mpsc::channel(0); - - let subsystem_activated_hashes = activated_hashes.clone(); - subsystem_test_harness(overseer, |mut ctx| async move { - handle_new_activations( - test_config(123u32), - subsystem_activated_hashes, - &mut ctx, - Metrics(None), - &tx, - ) - .await - .unwrap(); - }); - - let mut requested_availability_cores = Arc::try_unwrap(requested_availability_cores) - .expect("overseer should have shut down by now") - .into_inner(); - requested_availability_cores.sort(); + } + }; - assert_eq!(requested_availability_cores, activated_hashes); - } + let subsystem_activated_hashes = activated_hashes.clone(); + subsystem_test_harness(overseer, |mut ctx| async move { + handle_new_activations( + Arc::new(test_config(123u32)), + subsystem_activated_hashes, + &mut ctx, + Metrics(None), + ) + .await + .unwrap(); + }); + + let mut requested_availability_cores = Arc::try_unwrap(requested_availability_cores) + .expect("overseer should have shut down by now") + .into_inner(); + requested_availability_cores.sort(); + + assert_eq!(requested_availability_cores, activated_hashes); +} - #[test] - fn requests_validation_data_for_scheduled_matches() { - let activated_hashes: Vec = vec![ - Hash::repeat_byte(1), - Hash::repeat_byte(4), - Hash::repeat_byte(9), - Hash::repeat_byte(16), - ]; - - let requested_validation_data = Arc::new(Mutex::new(Vec::new())); - - let overseer_requested_validation_data = requested_validation_data.clone(); - let overseer = |mut handle: TestSubsystemContextHandle| async move { - loop { - match handle.try_recv().await { - None => break, - Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request( - hash, - RuntimeApiRequest::AvailabilityCores(tx), - ))) => { - tx.send(Ok(vec![ - CoreState::Free, - // this is weird, see explanation below - CoreState::Scheduled(scheduled_core_for( - (hash.as_fixed_bytes()[0] * 4) as u32, - )), - CoreState::Scheduled(scheduled_core_for( - (hash.as_fixed_bytes()[0] * 5) as u32, - )), - ])) - .unwrap(); - }, - Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request( - hash, - RuntimeApiRequest::PersistedValidationData( - _para_id, - _occupied_core_assumption, - tx, - ), - ))) => { - overseer_requested_validation_data.lock().await.push(hash); - tx.send(Ok(None)).unwrap(); - }, - Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request( - _hash, - RuntimeApiRequest::Validators(tx), - ))) => { - tx.send(Ok(vec![dummy_validator(); 3])).unwrap(); - }, - Some(msg) => { - panic!("didn't expect any other overseer requests; got {:?}", msg) - }, - } +#[test] +fn requests_validation_data_for_scheduled_matches() { + let activated_hashes: Vec = vec![ + Hash::repeat_byte(1), + Hash::repeat_byte(4), + Hash::repeat_byte(9), + Hash::repeat_byte(16), + ]; + + let requested_validation_data = Arc::new(Mutex::new(Vec::new())); + + let overseer_requested_validation_data = requested_validation_data.clone(); + let overseer = |mut handle: TestSubsystemContextHandle| async move { + loop { + match handle.try_recv().await { + None => break, + Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request( + hash, + RuntimeApiRequest::AvailabilityCores(tx), + ))) => { + tx.send(Ok(vec![ + CoreState::Free, + // this is weird, see explanation below + CoreState::Scheduled(scheduled_core_for( + (hash.as_fixed_bytes()[0] * 4) as u32, + )), + CoreState::Scheduled(scheduled_core_for( + (hash.as_fixed_bytes()[0] * 5) as u32, + )), + ])) + .unwrap(); + }, + Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request( + hash, + RuntimeApiRequest::PersistedValidationData( + _para_id, + _occupied_core_assumption, + tx, + ), + ))) => { + overseer_requested_validation_data.lock().await.push(hash); + tx.send(Ok(None)).unwrap(); + }, + Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request( + _hash, + RuntimeApiRequest::Validators(tx), + ))) => { + tx.send(Ok(vec![dummy_validator(); 3])).unwrap(); + }, + Some(msg) => { + panic!("didn't expect any other overseer requests; got {:?}", msg) + }, } - }; + } + }; - let (tx, _rx) = mpsc::channel(0); + subsystem_test_harness(overseer, |mut ctx| async move { + handle_new_activations( + Arc::new(test_config(16)), + activated_hashes, + &mut ctx, + Metrics(None), + ) + .await + .unwrap(); + }); + + let requested_validation_data = Arc::try_unwrap(requested_validation_data) + .expect("overseer should have shut down by now") + .into_inner(); + + // the only activated hash should be from the 4 hash: + // each activated hash generates two scheduled cores: one with its value * 4, one with its value * 5 + // given that the test configuration has a `para_id` of 16, there's only one way to get that value: with the 4 + // hash. + assert_eq!(requested_validation_data, vec![[4; 32].into()]); +} - subsystem_test_harness(overseer, |mut ctx| async move { - handle_new_activations(test_config(16), activated_hashes, &mut ctx, Metrics(None), &tx) - .await - .unwrap(); - }); +#[test] +fn sends_distribute_collation_message() { + let activated_hashes: Vec = vec![ + Hash::repeat_byte(1), + Hash::repeat_byte(4), + Hash::repeat_byte(9), + Hash::repeat_byte(16), + ]; + + // empty vec doesn't allocate on the heap, so it's ok we throw it away + let to_collator_protocol = Arc::new(Mutex::new(Vec::new())); + let inner_to_collator_protocol = to_collator_protocol.clone(); + + let overseer = |mut handle: TestSubsystemContextHandle| async move { + loop { + match handle.try_recv().await { + None => break, + Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request( + hash, + RuntimeApiRequest::AvailabilityCores(tx), + ))) => { + tx.send(Ok(vec![ + CoreState::Free, + // this is weird, see explanation below + CoreState::Scheduled(scheduled_core_for( + (hash.as_fixed_bytes()[0] * 4) as u32, + )), + CoreState::Scheduled(scheduled_core_for( + (hash.as_fixed_bytes()[0] * 5) as u32, + )), + ])) + .unwrap(); + }, + Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request( + _hash, + RuntimeApiRequest::PersistedValidationData( + _para_id, + _occupied_core_assumption, + tx, + ), + ))) => { + tx.send(Ok(Some(test_validation_data()))).unwrap(); + }, + Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request( + _hash, + RuntimeApiRequest::Validators(tx), + ))) => { + tx.send(Ok(vec![dummy_validator(); 3])).unwrap(); + }, + Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request( + _hash, + RuntimeApiRequest::ValidationCodeHash( + _para_id, + OccupiedCoreAssumption::Free, + tx, + ), + ))) => { + tx.send(Ok(Some(ValidationCode(vec![1, 2, 3]).hash()))).unwrap(); + }, + Some(msg @ AllMessages::CollatorProtocol(_)) => { + inner_to_collator_protocol.lock().await.push(msg); + }, + Some(msg) => { + panic!("didn't expect any other overseer requests; got {:?}", msg) + }, + } + } + }; - let requested_validation_data = Arc::try_unwrap(requested_validation_data) - .expect("overseer should have shut down by now") - .into_inner(); + let config = Arc::new(test_config(16)); + let subsystem_config = config.clone(); - // the only activated hash should be from the 4 hash: - // each activated hash generates two scheduled cores: one with its value * 4, one with its value * 5 - // given that the test configuration has a `para_id` of 16, there's only one way to get that value: with the 4 - // hash. - assert_eq!(requested_validation_data, vec![[4; 32].into()]); + subsystem_test_harness(overseer, |mut ctx| async move { + handle_new_activations(subsystem_config, activated_hashes, &mut ctx, Metrics(None)) + .await + .unwrap(); + }); + + let mut to_collator_protocol = Arc::try_unwrap(to_collator_protocol) + .expect("subsystem should have shut down by now") + .into_inner(); + + // we expect a single message to be sent, containing a candidate receipt. + // we don't care too much about the `commitments_hash` right now, but let's ensure that we've calculated the + // correct descriptor + let expect_pov_hash = test_collation_compressed().proof_of_validity.into_compressed().hash(); + let expect_validation_data_hash = test_validation_data().hash(); + let expect_relay_parent = Hash::repeat_byte(4); + let expect_validation_code_hash = ValidationCode(vec![1, 2, 3]).hash(); + let expect_payload = collator_signature_payload( + &expect_relay_parent, + &config.para_id, + &expect_validation_data_hash, + &expect_pov_hash, + &expect_validation_code_hash, + ); + let expect_descriptor = CandidateDescriptor { + signature: config.key.sign(&expect_payload), + para_id: config.para_id, + relay_parent: expect_relay_parent, + collator: config.key.public(), + persisted_validation_data_hash: expect_validation_data_hash, + pov_hash: expect_pov_hash, + erasure_root: dummy_hash(), // this isn't something we're checking right now + para_head: test_collation().head_data.hash(), + validation_code_hash: expect_validation_code_hash, + }; + + assert_eq!(to_collator_protocol.len(), 1); + match AllMessages::from(to_collator_protocol.pop().unwrap()) { + AllMessages::CollatorProtocol(CollatorProtocolMessage::DistributeCollation( + CandidateReceipt { descriptor, .. }, + _pov, + .., + )) => { + // signature generation is non-deterministic, so we can't just assert that the + // expected descriptor is correct. What we can do is validate that the produced + // descriptor has a valid signature, then just copy in the generated signature + // and check the rest of the fields for equality. + assert!(CollatorPair::verify( + &descriptor.signature, + &collator_signature_payload( + &descriptor.relay_parent, + &descriptor.para_id, + &descriptor.persisted_validation_data_hash, + &descriptor.pov_hash, + &descriptor.validation_code_hash, + ) + .as_ref(), + &descriptor.collator, + )); + let expect_descriptor = { + let mut expect_descriptor = expect_descriptor; + expect_descriptor.signature = descriptor.signature.clone(); + expect_descriptor.erasure_root = descriptor.erasure_root.clone(); + expect_descriptor + }; + assert_eq!(descriptor, expect_descriptor); + }, + _ => panic!("received wrong message type"), } +} - #[test] - fn sends_distribute_collation_message() { - let activated_hashes: Vec = vec![ - Hash::repeat_byte(1), - Hash::repeat_byte(4), - Hash::repeat_byte(9), - Hash::repeat_byte(16), - ]; - - let overseer = |mut handle: TestSubsystemContextHandle| async move { - loop { - match handle.try_recv().await { - None => break, - Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request( - hash, - RuntimeApiRequest::AvailabilityCores(tx), - ))) => { - tx.send(Ok(vec![ - CoreState::Free, - // this is weird, see explanation below - CoreState::Scheduled(scheduled_core_for( - (hash.as_fixed_bytes()[0] * 4) as u32, - )), - CoreState::Scheduled(scheduled_core_for( - (hash.as_fixed_bytes()[0] * 5) as u32, - )), - ])) - .unwrap(); - }, - Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request( - _hash, - RuntimeApiRequest::PersistedValidationData( - _para_id, - _occupied_core_assumption, - tx, - ), - ))) => { - tx.send(Ok(Some(test_validation_data()))).unwrap(); - }, - Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request( - _hash, - RuntimeApiRequest::Validators(tx), - ))) => { - tx.send(Ok(vec![dummy_validator(); 3])).unwrap(); - }, - Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request( - _hash, - RuntimeApiRequest::ValidationCodeHash( - _para_id, - OccupiedCoreAssumption::Free, - tx, - ), - ))) => { - tx.send(Ok(Some(ValidationCode(vec![1, 2, 3]).hash()))).unwrap(); - }, - Some(msg) => { - panic!("didn't expect any other overseer requests; got {:?}", msg) - }, - } +#[test] +fn fallback_when_no_validation_code_hash_api() { + // This is a variant of the above test, but with the validation code hash API disabled. + + let activated_hashes: Vec = vec![ + Hash::repeat_byte(1), + Hash::repeat_byte(4), + Hash::repeat_byte(9), + Hash::repeat_byte(16), + ]; + + // empty vec doesn't allocate on the heap, so it's ok we throw it away + let to_collator_protocol = Arc::new(Mutex::new(Vec::new())); + let inner_to_collator_protocol = to_collator_protocol.clone(); + + let overseer = |mut handle: TestSubsystemContextHandle| async move { + loop { + match handle.try_recv().await { + None => break, + Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request( + hash, + RuntimeApiRequest::AvailabilityCores(tx), + ))) => { + tx.send(Ok(vec![ + CoreState::Free, + CoreState::Scheduled(scheduled_core_for( + (hash.as_fixed_bytes()[0] * 4) as u32, + )), + CoreState::Scheduled(scheduled_core_for( + (hash.as_fixed_bytes()[0] * 5) as u32, + )), + ])) + .unwrap(); + }, + Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request( + _hash, + RuntimeApiRequest::PersistedValidationData( + _para_id, + _occupied_core_assumption, + tx, + ), + ))) => { + tx.send(Ok(Some(test_validation_data()))).unwrap(); + }, + Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request( + _hash, + RuntimeApiRequest::Validators(tx), + ))) => { + tx.send(Ok(vec![dummy_validator(); 3])).unwrap(); + }, + Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request( + _hash, + RuntimeApiRequest::ValidationCodeHash( + _para_id, + OccupiedCoreAssumption::Free, + tx, + ), + ))) => { + tx.send(Err(RuntimeApiError::NotSupported { + runtime_api_name: "validation_code_hash", + })) + .unwrap(); + }, + Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request( + _hash, + RuntimeApiRequest::ValidationCode(_para_id, OccupiedCoreAssumption::Free, tx), + ))) => { + tx.send(Ok(Some(ValidationCode(vec![1, 2, 3])))).unwrap(); + }, + Some(msg @ AllMessages::CollatorProtocol(_)) => { + inner_to_collator_protocol.lock().await.push(msg); + }, + Some(msg) => { + panic!("didn't expect any other overseer requests; got {:?}", msg) + }, } - }; - - let config = test_config(16); - let subsystem_config = config.clone(); - - let (tx, rx) = mpsc::channel(0); - - // empty vec doesn't allocate on the heap, so it's ok we throw it away - let sent_messages = Arc::new(Mutex::new(Vec::new())); - let subsystem_sent_messages = sent_messages.clone(); - subsystem_test_harness(overseer, |mut ctx| async move { - handle_new_activations( - subsystem_config, - activated_hashes, - &mut ctx, - Metrics(None), - &tx, - ) + } + }; + + let config = Arc::new(test_config(16u32)); + let subsystem_config = config.clone(); + + // empty vec doesn't allocate on the heap, so it's ok we throw it away + subsystem_test_harness(overseer, |mut ctx| async move { + handle_new_activations(subsystem_config, activated_hashes, &mut ctx, Metrics(None)) .await .unwrap(); + }); + + let to_collator_protocol = Arc::try_unwrap(to_collator_protocol) + .expect("subsystem should have shut down by now") + .into_inner(); + + let expect_validation_code_hash = ValidationCode(vec![1, 2, 3]).hash(); + + assert_eq!(to_collator_protocol.len(), 1); + match &to_collator_protocol[0] { + AllMessages::CollatorProtocol(CollatorProtocolMessage::DistributeCollation( + CandidateReceipt { descriptor, .. }, + _pov, + .., + )) => { + assert_eq!(expect_validation_code_hash, descriptor.validation_code_hash); + }, + _ => panic!("received wrong message type"), + } +} + +#[test] +fn submit_collation_is_no_op_before_initialization() { + test_harness(|mut virtual_overseer| async move { + virtual_overseer + .send(FromOrchestra::Communication { + msg: CollationGenerationMessage::SubmitCollation(SubmitCollationParams { + relay_parent: Hash::repeat_byte(0), + collation: test_collation(), + parent_head: vec![1, 2, 3].into(), + validation_code_hash: Hash::repeat_byte(1).into(), + result_sender: None, + }), + }) + .await; + + virtual_overseer + }); +} + +#[test] +fn submit_collation_leads_to_distribution() { + let relay_parent = Hash::repeat_byte(0); + let validation_code_hash = ValidationCodeHash::from(Hash::repeat_byte(42)); + let parent_head = HeadData::from(vec![1, 2, 3]); + let para_id = ParaId::from(5); + let expected_pvd = PersistedValidationData { + parent_head: parent_head.clone(), + relay_parent_number: 10, + relay_parent_storage_root: Hash::repeat_byte(1), + max_pov_size: 1024, + }; - std::mem::drop(tx); - - // collect all sent messages - *subsystem_sent_messages.lock().await = rx.collect().await; - }); - - let mut sent_messages = Arc::try_unwrap(sent_messages) - .expect("subsystem should have shut down by now") - .into_inner(); - - // we expect a single message to be sent, containing a candidate receipt. - // we don't care too much about the `commitments_hash` right now, but let's ensure that we've calculated the - // correct descriptor - let expect_pov_hash = - test_collation_compressed().proof_of_validity.into_compressed().hash(); - let expect_validation_data_hash = test_validation_data().hash(); - let expect_relay_parent = Hash::repeat_byte(4); - let expect_validation_code_hash = ValidationCode(vec![1, 2, 3]).hash(); - let expect_payload = collator_signature_payload( - &expect_relay_parent, - &config.para_id, - &expect_validation_data_hash, - &expect_pov_hash, - &expect_validation_code_hash, + test_harness(|mut virtual_overseer| async move { + virtual_overseer + .send(FromOrchestra::Communication { + msg: CollationGenerationMessage::Initialize(test_config_no_collator(para_id)), + }) + .await; + + virtual_overseer + .send(FromOrchestra::Communication { + msg: CollationGenerationMessage::SubmitCollation(SubmitCollationParams { + relay_parent, + collation: test_collation(), + parent_head: vec![1, 2, 3].into(), + validation_code_hash, + result_sender: None, + }), + }) + .await; + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request(rp, RuntimeApiRequest::Validators(tx))) => { + assert_eq!(rp, relay_parent); + let _ = tx.send(Ok(vec![ + Sr25519Keyring::Alice.public().into(), + Sr25519Keyring::Bob.public().into(), + Sr25519Keyring::Charlie.public().into(), + ])); + } ); - let expect_descriptor = CandidateDescriptor { - signature: config.key.sign(&expect_payload), - para_id: config.para_id, - relay_parent: expect_relay_parent, - collator: config.key.public(), - persisted_validation_data_hash: expect_validation_data_hash, - pov_hash: expect_pov_hash, - erasure_root: dummy_hash(), // this isn't something we're checking right now - para_head: test_collation().head_data.hash(), - validation_code_hash: expect_validation_code_hash, - }; - - assert_eq!(sent_messages.len(), 1); - match AllMessages::from(sent_messages.pop().unwrap()) { + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request(rp, RuntimeApiRequest::PersistedValidationData(id, a, tx))) => { + assert_eq!(rp, relay_parent); + assert_eq!(id, para_id); + assert_eq!(a, OccupiedCoreAssumption::TimedOut); + + // Candidate receipt should be constructed with the real parent head. + let mut pvd = expected_pvd.clone(); + pvd.parent_head = vec![4, 5, 6].into(); + let _ = tx.send(Ok(Some(pvd))); + } + ); + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, AllMessages::CollatorProtocol(CollatorProtocolMessage::DistributeCollation( - CandidateReceipt { descriptor, .. }, - _pov, - .., + ccr, + parent_head_data_hash, + .. )) => { - // signature generation is non-deterministic, so we can't just assert that the - // expected descriptor is correct. What we can do is validate that the produced - // descriptor has a valid signature, then just copy in the generated signature - // and check the rest of the fields for equality. - assert!(CollatorPair::verify( - &descriptor.signature, - &collator_signature_payload( - &descriptor.relay_parent, - &descriptor.para_id, - &descriptor.persisted_validation_data_hash, - &descriptor.pov_hash, - &descriptor.validation_code_hash, - ) - .as_ref(), - &descriptor.collator, - )); - let expect_descriptor = { - let mut expect_descriptor = expect_descriptor; - expect_descriptor.signature = descriptor.signature.clone(); - expect_descriptor.erasure_root = descriptor.erasure_root.clone(); - expect_descriptor - }; - assert_eq!(descriptor, expect_descriptor); - }, - _ => panic!("received wrong message type"), - } - } - - #[test] - fn fallback_when_no_validation_code_hash_api() { - // This is a variant of the above test, but with the validation code hash API disabled. - - let activated_hashes: Vec = vec![ - Hash::repeat_byte(1), - Hash::repeat_byte(4), - Hash::repeat_byte(9), - Hash::repeat_byte(16), - ]; - - let overseer = |mut handle: TestSubsystemContextHandle| async move { - loop { - match handle.try_recv().await { - None => break, - Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request( - hash, - RuntimeApiRequest::AvailabilityCores(tx), - ))) => { - tx.send(Ok(vec![ - CoreState::Free, - CoreState::Scheduled(scheduled_core_for( - (hash.as_fixed_bytes()[0] * 4) as u32, - )), - CoreState::Scheduled(scheduled_core_for( - (hash.as_fixed_bytes()[0] * 5) as u32, - )), - ])) - .unwrap(); - }, - Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request( - _hash, - RuntimeApiRequest::PersistedValidationData( - _para_id, - _occupied_core_assumption, - tx, - ), - ))) => { - tx.send(Ok(Some(test_validation_data()))).unwrap(); - }, - Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request( - _hash, - RuntimeApiRequest::Validators(tx), - ))) => { - tx.send(Ok(vec![dummy_validator(); 3])).unwrap(); - }, - Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request( - _hash, - RuntimeApiRequest::ValidationCodeHash( - _para_id, - OccupiedCoreAssumption::Free, - tx, - ), - ))) => { - tx.send(Err(RuntimeApiError::NotSupported { - runtime_api_name: "validation_code_hash", - })) - .unwrap(); - }, - Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request( - _hash, - RuntimeApiRequest::ValidationCode( - _para_id, - OccupiedCoreAssumption::Free, - tx, - ), - ))) => { - tx.send(Ok(Some(ValidationCode(vec![1, 2, 3])))).unwrap(); - }, - Some(msg) => { - panic!("didn't expect any other overseer requests; got {:?}", msg) - }, - } + assert_eq!(parent_head_data_hash, parent_head.hash()); + assert_eq!(ccr.descriptor().persisted_validation_data_hash, expected_pvd.hash()); + assert_eq!(ccr.descriptor().para_head, dummy_head_data().hash()); + assert_eq!(ccr.descriptor().validation_code_hash, validation_code_hash); } - }; - - let config = test_config(16u32); - let subsystem_config = config.clone(); - - let (tx, rx) = mpsc::channel(0); - - // empty vec doesn't allocate on the heap, so it's ok we throw it away - let sent_messages = Arc::new(Mutex::new(Vec::new())); - let subsystem_sent_messages = sent_messages.clone(); - subsystem_test_harness(overseer, |mut ctx| async move { - handle_new_activations( - subsystem_config, - activated_hashes, - &mut ctx, - Metrics(None), - &tx, - ) - .await - .unwrap(); + ); - std::mem::drop(tx); - - *subsystem_sent_messages.lock().await = rx.collect().await; - }); - - let sent_messages = Arc::try_unwrap(sent_messages) - .expect("subsystem should have shut down by now") - .into_inner(); - - let expect_validation_code_hash = ValidationCode(vec![1, 2, 3]).hash(); - - assert_eq!(sent_messages.len(), 1); - match &sent_messages[0] { - overseer::CollationGenerationOutgoingMessages::CollatorProtocolMessage( - CollatorProtocolMessage::DistributeCollation( - CandidateReceipt { descriptor, .. }, - _pov, - .., - ), - ) => { - assert_eq!(expect_validation_code_hash, descriptor.validation_code_hash); - }, - _ => panic!("received wrong message type"), - } - } + virtual_overseer + }); } diff --git a/node/overseer/src/tests.rs b/node/overseer/src/tests.rs index a5f473007c82..3d155acc1fac 100644 --- a/node/overseer/src/tests.rs +++ b/node/overseer/src/tests.rs @@ -797,7 +797,7 @@ fn test_chain_api_msg() -> ChainApiMessage { fn test_collator_generation_msg() -> CollationGenerationMessage { CollationGenerationMessage::Initialize(CollationGenerationConfig { key: CollatorPair::generate().0, - collator: Box::new(|_, _| TestCollator.boxed()), + collator: Some(Box::new(|_, _| TestCollator.boxed())), para_id: Default::default(), }) } diff --git a/node/primitives/src/lib.rs b/node/primitives/src/lib.rs index b9aa449188e9..9f031d772b06 100644 --- a/node/primitives/src/lib.rs +++ b/node/primitives/src/lib.rs @@ -32,8 +32,8 @@ use serde::{de, Deserialize, Deserializer, Serialize, Serializer}; use polkadot_primitives::{ BlakeTwo256, BlockNumber, CandidateCommitments, CandidateHash, CollatorPair, CommittedCandidateReceipt, CompactStatement, EncodeAs, Hash, HashT, HeadData, Id as ParaId, - PersistedValidationData, SessionIndex, Signed, UncheckedSigned, ValidationCode, ValidatorIndex, - MAX_CODE_SIZE, MAX_POV_SIZE, + PersistedValidationData, SessionIndex, Signed, UncheckedSigned, ValidationCode, + ValidationCodeHash, ValidatorIndex, MAX_CODE_SIZE, MAX_POV_SIZE, }; pub use sp_consensus_babe::{ AllowedSlots as BabeAllowedSlots, BabeEpochConfiguration, Epoch as BabeEpoch, @@ -380,6 +380,18 @@ pub enum MaybeCompressedPoV { Compressed(PoV), } +#[cfg(not(target_os = "unknown"))] +impl std::fmt::Debug for MaybeCompressedPoV { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + let (variant, size) = match self { + MaybeCompressedPoV::Raw(pov) => ("Raw", pov.block_data.0.len()), + MaybeCompressedPoV::Compressed(pov) => ("Compressed", pov.block_data.0.len()), + }; + + write!(f, "{} PoV ({} bytes)", variant, size) + } +} + #[cfg(not(target_os = "unknown"))] impl MaybeCompressedPoV { /// Convert into a compressed [`PoV`]. @@ -399,7 +411,7 @@ impl MaybeCompressedPoV { /// /// - does not contain the erasure root; that's computed at the Polkadot level, not at Cumulus /// - contains a proof of validity. -#[derive(Clone, Encode, Decode)] +#[derive(Debug, Clone, Encode, Decode)] #[cfg(not(target_os = "unknown"))] pub struct Collation { /// Messages destined to be interpreted by the Relay chain itself. @@ -475,7 +487,10 @@ pub struct CollationGenerationConfig { /// Collator's authentication key, so it can sign things. pub key: CollatorPair, /// Collation function. See [`CollatorFn`] for more details. - pub collator: CollatorFn, + /// + /// If this is `None`, it implies that collations are intended to be submitted + /// out-of-band and not pulled out of the function. + pub collator: Option, /// The parachain that this collator collates for pub para_id: ParaId, } @@ -487,6 +502,25 @@ impl std::fmt::Debug for CollationGenerationConfig { } } +/// Parameters for [`CollationGenerationMessage::SubmitCollation`]. +#[derive(Debug)] +pub struct SubmitCollationParams { + /// The relay-parent the collation is built against. + pub relay_parent: Hash, + /// The collation itself (PoV and commitments) + pub collation: Collation, + /// The parent block's head-data. + pub parent_head: HeadData, + /// The hash of the validation code the collation was created against. + pub validation_code_hash: ValidationCodeHash, + /// An optional result sender that should be informed about a successfully seconded collation. + /// + /// There is no guarantee that this sender is informed ever about any result, it is completely okay to just drop it. + /// However, if it is called, it should be called with the signed statement of a parachain validator seconding the + /// collation. + pub result_sender: Option>, +} + /// This is the data we keep available for each candidate included in the relay chain. #[derive(Clone, Encode, Decode, PartialEq, Eq, Debug)] pub struct AvailableData { diff --git a/node/subsystem-types/src/messages.rs b/node/subsystem-types/src/messages.rs index 68e953b9dade..83c9613aa178 100644 --- a/node/subsystem-types/src/messages.rs +++ b/node/subsystem-types/src/messages.rs @@ -36,7 +36,8 @@ use polkadot_node_primitives::{ approval::{BlockApprovalMeta, IndirectAssignmentCert, IndirectSignedApprovalVote}, AvailableData, BabeEpoch, BlockWeight, CandidateVotes, CollationGenerationConfig, CollationSecondedSignal, DisputeMessage, DisputeStatus, ErasureChunk, PoV, - SignedDisputeStatement, SignedFullStatement, SignedFullStatementWithPVD, ValidationResult, + SignedDisputeStatement, SignedFullStatement, SignedFullStatementWithPVD, SubmitCollationParams, + ValidationResult, }; use polkadot_primitives::{ vstaging as vstaging_primitives, AuthorityDiscoveryId, BackedCandidate, BlockNumber, @@ -736,6 +737,11 @@ pub enum ProvisionerMessage { pub enum CollationGenerationMessage { /// Initialize the collation generation subsystem Initialize(CollationGenerationConfig), + /// Submit a collation to the subsystem. This will package it into a signed + /// [`CommittedCandidateReceipt`] and distribute along the network to validators. + /// + /// If sent before `Initialize`, this will be ignored. + SubmitCollation(SubmitCollationParams), } /// The result type of [`ApprovalVotingMessage::CheckAndImportAssignment`] request. diff --git a/node/test/service/src/lib.rs b/node/test/service/src/lib.rs index 3e7f66128886..899af010fd4c 100644 --- a/node/test/service/src/lib.rs +++ b/node/test/service/src/lib.rs @@ -357,7 +357,8 @@ impl PolkadotTestNode { para_id: ParaId, collator: CollatorFn, ) { - let config = CollationGenerationConfig { key: collator_key, collator, para_id }; + let config = + CollationGenerationConfig { key: collator_key, collator: Some(collator), para_id }; self.overseer_handle .send_msg(CollationGenerationMessage::Initialize(config), "Collator") diff --git a/parachain/src/primitives.rs b/parachain/src/primitives.rs index 224591d6048d..ce9daffb1175 100644 --- a/parachain/src/primitives.rs +++ b/parachain/src/primitives.rs @@ -63,7 +63,8 @@ impl ValidationCode { } } -/// Unit type wrapper around [`type@Hash`] that represents a validation code hash. +/// Unit type wrapper around [`type@Hash`] that represents the blake2-256 hash +/// of validation code in particular. /// /// This type is produced by [`ValidationCode::hash`]. /// diff --git a/parachain/test-parachains/adder/collator/src/main.rs b/parachain/test-parachains/adder/collator/src/main.rs index 699cee202cb8..4b959ced4cf9 100644 --- a/parachain/test-parachains/adder/collator/src/main.rs +++ b/parachain/test-parachains/adder/collator/src/main.rs @@ -87,8 +87,9 @@ fn main() -> Result<()> { let config = CollationGenerationConfig { key: collator.collator_key(), - collator: collator - .create_collation_function(full_node.task_manager.spawn_handle()), + collator: Some( + collator.create_collation_function(full_node.task_manager.spawn_handle()), + ), para_id, }; overseer_handle diff --git a/parachain/test-parachains/undying/collator/src/main.rs b/parachain/test-parachains/undying/collator/src/main.rs index 189674b82a97..df1fec00ee86 100644 --- a/parachain/test-parachains/undying/collator/src/main.rs +++ b/parachain/test-parachains/undying/collator/src/main.rs @@ -87,8 +87,9 @@ fn main() -> Result<()> { let config = CollationGenerationConfig { key: collator.collator_key(), - collator: collator - .create_collation_function(full_node.task_manager.spawn_handle()), + collator: Some( + collator.create_collation_function(full_node.task_manager.spawn_handle()), + ), para_id, }; overseer_handle diff --git a/roadmap/implementers-guide/src/node/collators/collation-generation.md b/roadmap/implementers-guide/src/node/collators/collation-generation.md index 2f0d4742496d..9053ea40f89e 100644 --- a/roadmap/implementers-guide/src/node/collators/collation-generation.md +++ b/roadmap/implementers-guide/src/node/collators/collation-generation.md @@ -9,7 +9,7 @@ Collation generation for Parachains currently works in the following way: 1. A new relay chain block is imported. 2. The collation generation subsystem checks if the core associated to the parachain is free and if yes, continues. -3. Collation generation calls our collator callback to generate a PoV. +3. Collation generation calls our collator callback, if present, to generate a PoV. If none exists, do nothing. 4. Authoring logic determines if the current node should build a PoV. 5. Build new PoV and give it back to collation generation. @@ -25,6 +25,10 @@ Collation generation for Parachains currently works in the following way: - No more than one initialization message should ever be sent to the collation generation subsystem. - Sent by a collator to initialize this subsystem. +- `CollationGenerationMessage::SubmitCollation` + - If the subsystem isn't initialized or the relay-parent is too old to be relevant, ignore the message. + - Otherwise, use the provided parameters to generate a [`CommittedCandidateReceipt`] + - Submit the collation to the collator-protocol with `CollatorProtocolMessage::DistributeCollation`. ### Outgoing @@ -101,7 +105,7 @@ pub struct CollationGenerationConfig { /// Collator's authentication key, so it can sign things. pub key: CollatorPair, /// Collation function. See [`CollatorFn`] for more details. - pub collator: CollatorFn, + pub collator: Option, /// The parachain that this collator collates for pub para_id: ParaId, } @@ -136,7 +140,7 @@ The configuration should be optional, to allow for the case where the node is no - **Collation generation config** - - Contains collator's authentication key, collator function, and + - Contains collator's authentication key, optional collator function, and parachain ID. [CP]: collator-protocol.md diff --git a/roadmap/implementers-guide/src/types/overseer-protocol.md b/roadmap/implementers-guide/src/types/overseer-protocol.md index 30e6dc848802..1fc0c505a1cc 100644 --- a/roadmap/implementers-guide/src/types/overseer-protocol.md +++ b/roadmap/implementers-guide/src/types/overseer-protocol.md @@ -403,6 +403,57 @@ enum CollatorProtocolMessage { } ``` +## Collation Generation Message + +Messages received by the [Collation Generation subsystem](../node/collators/collation-generation.md) + +This is the core interface by which collators built on top of a Polkadot node submit collations to validators. As such, these messages are not sent by any subsystem but are instead sent from outside of the overseer. + +```rust +/// A function provided to the subsystem which it uses to pull new collations. +/// +/// This mode of querying collations is obsoleted by `CollationGenerationMessages::SubmitCollation` +/// +/// The response channel, if present, is meant to receive a `Seconded` statement as a +/// form of authentication, for collation mechanisms which rely on this for anti-spam. +type CollatorFn = Fn(Hash, PersistedValidationData) -> Future>)>; + +/// Configuration for the collation generator +struct CollationGenerationConfig { + /// Collator's authentication key, so it can sign things. + key: CollatorPair, + /// Collation function. See [`CollatorFn`] for more details. + collator: CollatorFn, + /// The parachain that this collator collates for + para_id: ParaId, +} + +/// Parameters for submitting a collation +struct SubmitCollationParams { + /// The relay-parent the collation is built against. + relay_parent: Hash, + /// The collation itself (PoV and commitments) + collation: Collation, + /// The parent block's head-data. + parent_head: HeadData, + /// The hash of the validation code the collation was created against. + validation_code_hash: ValidationCodeHash, + /// A response channel for receiving a `Seconded` message about the candidate + /// once produced by a validator. This is not guaranteed to provide anything. + result_sender: Option>, +} + +enum CollationGenerationMessage { + /// Initialize the collation generation subsystem + Initialize(CollationGenerationConfig), + /// Submit a collation to the subsystem. This will package it into a signed + /// [`CommittedCandidateReceipt`] and distribute along the network to validators. + /// + /// If sent before `Initialize`, this will be ignored. + SubmitCollation(SubmitCollationParams), +} +``` + ## Dispute Coordinator Message Messages received by the [Dispute Coordinator subsystem](../node/disputes/dispute-coordinator.md)