From 2e934c3662d7abfd956880f94fd7b0da0be72ce7 Mon Sep 17 00:00:00 2001 From: Andronik Ordian Date: Tue, 7 Jul 2020 18:06:44 +0200 Subject: [PATCH 1/9] overseer: introduce a utility typemap --- node/overseer/src/lib.rs | 42 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/node/overseer/src/lib.rs b/node/overseer/src/lib.rs index 70de3c1d86f8..85fc12b3b369 100644 --- a/node/overseer/src/lib.rs +++ b/node/overseer/src/lib.rs @@ -678,6 +678,48 @@ fn spawn( }) } +mod utils { + use std::any::{Any, TypeId}; + use std::collections::HashMap; + + /// A map allowing you to store and retrieve a single instance of a type. + /// + /// It is the same as https://github.com/malobre/static_type_map, + /// but implemented here as it is trivial enough. + #[derive(Default, Debug)] + pub struct TypeMap(pub HashMap>); + + impl TypeMap { + pub fn get(&self) -> Option<&T> + where + T: Any, + { + self.0 + .get(&TypeId::of::()) + .and_then(|any| any.downcast_ref()) + } + + pub fn get_mut(&self) -> Option<&mut T> + where + T: Any, + { + self.0 + .get(&TypeId::of::()) + .and_then(|any| any.downcast_mut()) + } + + pub fn insert(&mut self, t: T) + where + T: Any, + { + let old_value = self.0 + .insert(TypeId::of::(), Box::new(t)); + const OOPS: &str = "overseer was initialized with two subsystems handling the same type of message"; + assert!(old_value.is_none(), OOPS); + } + } +} + #[cfg(test)] mod tests { use futures::{executor, pin_mut, select, channel::mpsc, FutureExt}; From b604c90163003b17d1fe2b8ea09a73a7ad75f4e9 Mon Sep 17 00:00:00 2001 From: Andronik Ordian Date: Tue, 7 Jul 2020 22:32:56 +0200 Subject: [PATCH 2/9] it's ugly but it compiles --- node/overseer/src/lib.rs | 323 ++++++++++++++++++++++++++++++++------- 1 file changed, 265 insertions(+), 58 deletions(-) diff --git a/node/overseer/src/lib.rs b/node/overseer/src/lib.rs index 85fc12b3b369..7b6093f456b2 100644 --- a/node/overseer/src/lib.rs +++ b/node/overseer/src/lib.rs @@ -76,7 +76,11 @@ use polkadot_primitives::{Block, BlockNumber, Hash}; use client::{BlockImportNotification, BlockchainEvents, FinalityNotification}; use polkadot_subsystem::messages::{ - CandidateValidationMessage, CandidateBackingMessage, AllMessages + CandidateValidationMessage, CandidateBackingMessage, + CandidateSelectionMessage, StatementDistributionMessage, + AvailabilityDistributionMessage, BitfieldDistributionMessage, + ProvisionerMessage, RuntimeApiMessage, AvailabilityStoreMessage, + NetworkBridgeMessage, AllMessages, }; pub use polkadot_subsystem::{ Subsystem, SubsystemContext, OverseerSignal, FromOverseer, SubsystemError, SubsystemResult, @@ -319,12 +323,37 @@ struct OverseenSubsystem { /// The `Overseer` itself. pub struct Overseer { - /// A validation subsystem - validation_subsystem: OverseenSubsystem, + /// A candidate validation subsystem. + candidate_validation_subsystem: OverseenSubsystem, - /// A candidate backing subsystem + /// A candidate backing subsystem. candidate_backing_subsystem: OverseenSubsystem, + /// A candidate selection subsystem. + candidate_selection_subsystem: OverseenSubsystem, + + /// A statement distribution subsystem. + statement_distribution_subsystem: OverseenSubsystem, + + /// An availability distribution subsystem. + availability_distribution_subsystem: OverseenSubsystem, + + /// A bitfield distribution subsystem. + bitfield_distribution_subsystem: OverseenSubsystem, + + /// A provisioner subsystem. + provisioner_subsystem: OverseenSubsystem, + + /// A runtime API subsystem. + runtime_api_subsystem: OverseenSubsystem, + + /// An availability store subsystem. + availability_store_subsystem: OverseenSubsystem, + + /// A network bridge subsystem. + network_bridge_subsystem: OverseenSubsystem, + + /// Spawner to spawn tasks to. s: S, @@ -457,8 +486,16 @@ where /// ``` pub fn new( leaves: impl IntoIterator, - validation: impl Subsystem> + Send, + candidate_validation: impl Subsystem> + Send, candidate_backing: impl Subsystem> + Send, + candidate_selection: impl Subsystem> + Send, + statement_distribution: impl Subsystem> + Send, + availability_distribution: impl Subsystem> + Send, + bitfield_distribution: impl Subsystem> + Send, + provisioner: impl Subsystem> + Send, + runtime_api: impl Subsystem> + Send, + availability_store: impl Subsystem> + Send, + network_bridge: impl Subsystem> + Send, mut s: S, ) -> SubsystemResult<(Self, OverseerHandler)> { let (events_tx, events_rx) = mpsc::channel(CHANNEL_CAPACITY); @@ -470,11 +507,11 @@ where let mut running_subsystems_rx = StreamUnordered::new(); let mut running_subsystems = FuturesUnordered::new(); - let validation_subsystem = spawn( + let candidate_validation_subsystem = spawn( &mut s, &mut running_subsystems, &mut running_subsystems_rx, - validation, + candidate_validation, )?; let candidate_backing_subsystem = spawn( @@ -484,6 +521,62 @@ where candidate_backing, )?; + let candidate_selection_subsystem = spawn( + &mut s, + &mut running_subsystems, + &mut running_subsystems_rx, + candidate_selection, + )?; + + let statement_distribution_subsystem = spawn( + &mut s, + &mut running_subsystems, + &mut running_subsystems_rx, + statement_distribution, + )?; + + let availability_distribution_subsystem = spawn( + &mut s, + &mut running_subsystems, + &mut running_subsystems_rx, + availability_distribution, + )?; + + let bitfield_distribution_subsystem = spawn( + &mut s, + &mut running_subsystems, + &mut running_subsystems_rx, + bitfield_distribution, + )?; + + let provisioner_subsystem = spawn( + &mut s, + &mut running_subsystems, + &mut running_subsystems_rx, + provisioner, + )?; + + let runtime_api_subsystem = spawn( + &mut s, + &mut running_subsystems, + &mut running_subsystems_rx, + runtime_api, + )?; + + let availability_store_subsystem = spawn( + &mut s, + &mut running_subsystems, + &mut running_subsystems_rx, + availability_store, + )?; + + let network_bridge_subsystem = spawn( + &mut s, + &mut running_subsystems, + &mut running_subsystems_rx, + network_bridge, + )?; + let active_leaves = HashSet::new(); let leaves = leaves @@ -492,8 +585,16 @@ where .collect(); let this = Self { - validation_subsystem, + candidate_validation_subsystem, candidate_backing_subsystem, + candidate_selection_subsystem, + statement_distribution_subsystem, + availability_distribution_subsystem, + bitfield_distribution_subsystem, + provisioner_subsystem, + runtime_api_subsystem, + availability_store_subsystem, + network_bridge_subsystem, s, running_subsystems, running_subsystems_rx, @@ -507,7 +608,7 @@ where // Stop the overseer. async fn stop(mut self) { - if let Some(ref mut s) = self.validation_subsystem.instance { + if let Some(ref mut s) = self.candidate_validation_subsystem.instance { let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; } @@ -515,6 +616,38 @@ where let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; } + if let Some(ref mut s) = self.candidate_selection_subsystem.instance { + let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; + } + + if let Some(ref mut s) = self.statement_distribution_subsystem.instance { + let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; + } + + if let Some(ref mut s) = self.availability_distribution_subsystem.instance { + let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; + } + + if let Some(ref mut s) = self.bitfield_distribution_subsystem.instance { + let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; + } + + if let Some(ref mut s) = self.provisioner_subsystem.instance { + let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; + } + + if let Some(ref mut s) = self.runtime_api_subsystem.instance { + let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; + } + + if let Some(ref mut s) = self.availability_distribution_subsystem.instance { + let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; + } + + if let Some(ref mut s) = self.network_bridge_subsystem.instance { + let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; + } + let mut stop_delay = Delay::new(Duration::from_secs(STOP_DELAY)).fuse(); loop { @@ -616,11 +749,43 @@ where } async fn broadcast_signal(&mut self, signal: OverseerSignal) -> SubsystemResult<()> { - if let Some(ref mut s) = self.validation_subsystem.instance { + if let Some(ref mut s) = self.candidate_validation_subsystem.instance { s.tx.send(FromOverseer::Signal(signal.clone())).await?; } if let Some(ref mut s) = self.candidate_backing_subsystem.instance { + s.tx.send(FromOverseer::Signal(signal.clone())).await?; + } + + if let Some(ref mut s) = self.candidate_selection_subsystem.instance { + s.tx.send(FromOverseer::Signal(signal.clone())).await?; + } + + if let Some(ref mut s) = self.statement_distribution_subsystem.instance { + s.tx.send(FromOverseer::Signal(signal.clone())).await?; + } + + if let Some(ref mut s) = self.availability_distribution_subsystem.instance { + s.tx.send(FromOverseer::Signal(signal.clone())).await?; + } + + if let Some(ref mut s) = self.bitfield_distribution_subsystem.instance { + s.tx.send(FromOverseer::Signal(signal.clone())).await?; + } + + if let Some(ref mut s) = self.provisioner_subsystem.instance { + s.tx.send(FromOverseer::Signal(signal.clone())).await?; + } + + if let Some(ref mut s) = self.runtime_api_subsystem.instance { + s.tx.send(FromOverseer::Signal(signal.clone())).await?; + } + + if let Some(ref mut s) = self.availability_store_subsystem.instance { + s.tx.send(FromOverseer::Signal(signal.clone())).await?; + } + + if let Some(ref mut s) = self.network_bridge_subsystem.instance { s.tx.send(FromOverseer::Signal(signal)).await?; } @@ -630,7 +795,7 @@ where async fn route_message(&mut self, msg: AllMessages) { match msg { AllMessages::CandidateValidation(msg) => { - if let Some(ref mut s) = self.validation_subsystem.instance { + if let Some(ref mut s) = self.candidate_validation_subsystem.instance { let _= s.tx.send(FromOverseer::Communication { msg }).await; } } @@ -639,11 +804,45 @@ where let _ = s.tx.send(FromOverseer::Communication { msg }).await; } } - _ => { - // TODO: temporary catch-all until all subsystems are integrated with overseer. - // The overseer is not complete until this is an exhaustive match with all - // messages targeting an included subsystem. - // https://github.com/paritytech/polkadot/issues/1317 + AllMessages::CandidateSelection(msg) => { + if let Some(ref mut s) = self.candidate_selection_subsystem.instance { + let _ = s.tx.send(FromOverseer::Communication { msg }).await; + } + } + AllMessages::StatementDistribution(msg) => { + if let Some(ref mut s) = self.statement_distribution_subsystem.instance { + let _ = s.tx.send(FromOverseer::Communication { msg }).await; + } + } + AllMessages::AvailabilityDistribution(msg) => { + if let Some(ref mut s) = self.availability_distribution_subsystem.instance { + let _ = s.tx.send(FromOverseer::Communication { msg }).await; + } + } + AllMessages::BitfieldDistribution(msg) => { + if let Some(ref mut s) = self.bitfield_distribution_subsystem.instance { + let _ = s.tx.send(FromOverseer::Communication { msg }).await; + } + } + AllMessages::Provisioner(msg) => { + if let Some(ref mut s) = self.provisioner_subsystem.instance { + let _ = s.tx.send(FromOverseer::Communication { msg }).await; + } + } + AllMessages::RuntimeApi(msg) => { + if let Some(ref mut s) = self.runtime_api_subsystem.instance { + let _ = s.tx.send(FromOverseer::Communication { msg }).await; + } + } + AllMessages::AvailabilityStore(msg) => { + if let Some(ref mut s) = self.availability_store_subsystem.instance { + let _ = s.tx.send(FromOverseer::Communication { msg }).await; + } + } + AllMessages::NetworkBridge(msg) => { + if let Some(ref mut s) = self.network_bridge_subsystem.instance { + let _ = s.tx.send(FromOverseer::Communication { msg }).await; + } } } } @@ -678,48 +877,6 @@ fn spawn( }) } -mod utils { - use std::any::{Any, TypeId}; - use std::collections::HashMap; - - /// A map allowing you to store and retrieve a single instance of a type. - /// - /// It is the same as https://github.com/malobre/static_type_map, - /// but implemented here as it is trivial enough. - #[derive(Default, Debug)] - pub struct TypeMap(pub HashMap>); - - impl TypeMap { - pub fn get(&self) -> Option<&T> - where - T: Any, - { - self.0 - .get(&TypeId::of::()) - .and_then(|any| any.downcast_ref()) - } - - pub fn get_mut(&self) -> Option<&mut T> - where - T: Any, - { - self.0 - .get(&TypeId::of::()) - .and_then(|any| any.downcast_mut()) - } - - pub fn insert(&mut self, t: T) - where - T: Any, - { - let old_value = self.0 - .insert(TypeId::of::(), Box::new(t)); - const OOPS: &str = "overseer was initialized with two subsystems handling the same type of message"; - assert!(old_value.is_none(), OOPS); - } - } -} - #[cfg(test)] mod tests { use futures::{executor, pin_mut, select, channel::mpsc, FutureExt}; @@ -727,6 +884,24 @@ mod tests { use polkadot_primitives::parachain::{BlockData, PoVBlock}; use super::*; + struct DummySubsystem(std::marker::PhantomData); + + impl Default for DummySubsystem { + fn default() -> Self { + Self(std::marker::PhantomData) + } + } + + impl Subsystem for DummySubsystem + where C: SubsystemContext + { + fn start(self, mut _ctx: C) -> SpawnedSubsystem { + SpawnedSubsystem(Box::pin(async move { + // Do nothing and exit. + })) + } + } + struct TestSubsystem1(mpsc::Sender); impl Subsystem for TestSubsystem1 @@ -821,6 +996,14 @@ mod tests { vec![], TestSubsystem1(s1_tx), TestSubsystem2(s2_tx), + DummySubsystem::::default(), + DummySubsystem::::default(), + DummySubsystem::::default(), + DummySubsystem::::default(), + DummySubsystem::::default(), + DummySubsystem::::default(), + DummySubsystem::::default(), + DummySubsystem::::default(), spawner, ).unwrap(); let overseer_fut = overseer.run().fuse(); @@ -871,6 +1054,14 @@ mod tests { vec![], TestSubsystem1(s1_tx), TestSubsystem4, + DummySubsystem::::default(), + DummySubsystem::::default(), + DummySubsystem::::default(), + DummySubsystem::::default(), + DummySubsystem::::default(), + DummySubsystem::::default(), + DummySubsystem::::default(), + DummySubsystem::::default(), spawner, ).unwrap(); let overseer_fut = overseer.run().fuse(); @@ -969,6 +1160,14 @@ mod tests { vec![first_block], TestSubsystem5(tx_5), TestSubsystem6(tx_6), + DummySubsystem::::default(), + DummySubsystem::::default(), + DummySubsystem::::default(), + DummySubsystem::::default(), + DummySubsystem::::default(), + DummySubsystem::::default(), + DummySubsystem::::default(), + DummySubsystem::::default(), spawner, ).unwrap(); @@ -1054,6 +1253,14 @@ mod tests { vec![first_block, second_block], TestSubsystem5(tx_5), TestSubsystem6(tx_6), + DummySubsystem::::default(), + DummySubsystem::::default(), + DummySubsystem::::default(), + DummySubsystem::::default(), + DummySubsystem::::default(), + DummySubsystem::::default(), + DummySubsystem::::default(), + DummySubsystem::::default(), spawner, ).unwrap(); From f5af64fd46a254217d4d5410683f0670b91f007e Mon Sep 17 00:00:00 2001 From: Andronik Ordian Date: Tue, 7 Jul 2020 22:49:14 +0200 Subject: [PATCH 3/9] move DummySubsystem to subsystem crate --- node/overseer/src/lib.rs | 47 +++++++++------------------------------ node/subsystem/src/lib.rs | 20 +++++++++++++++++ 2 files changed, 31 insertions(+), 36 deletions(-) diff --git a/node/overseer/src/lib.rs b/node/overseer/src/lib.rs index 7b6093f456b2..9b58096d236e 100644 --- a/node/overseer/src/lib.rs +++ b/node/overseer/src/lib.rs @@ -425,8 +425,8 @@ where /// # use futures_timer::Delay; /// # use polkadot_overseer::Overseer; /// # use polkadot_subsystem::{ - /// # Subsystem, SpawnedSubsystem, SubsystemContext, - /// # messages::{CandidateValidationMessage, CandidateBackingMessage}, + /// # Subsystem, DummySubsystem, SpawnedSubsystem, SubsystemContext, + /// # messages::*, /// # }; /// /// struct ValidationSubsystem; @@ -446,28 +446,19 @@ where /// } /// } /// - /// struct CandidateBackingSubsystem; - /// impl Subsystem for CandidateBackingSubsystem - /// where C: SubsystemContext - /// { - /// fn start( - /// self, - /// mut ctx: C, - /// ) -> SpawnedSubsystem { - /// SpawnedSubsystem(Box::pin(async move { - /// loop { - /// Delay::new(Duration::from_secs(1)).await; - /// } - /// })) - /// } - /// } - /// /// # fn main() { executor::block_on(async move { /// let spawner = executor::ThreadPool::new().unwrap(); /// let (overseer, _handler) = Overseer::new( /// vec![], /// ValidationSubsystem, - /// CandidateBackingSubsystem, + /// DummySubsystem::::default(), + /// DummySubsystem::::default(), + /// DummySubsystem::::default(), + /// DummySubsystem::::default(), + /// DummySubsystem::::default(), + /// DummySubsystem::::default(), + /// DummySubsystem::::default(), + /// DummySubsystem::::default(), /// spawner, /// ).unwrap(); /// @@ -877,6 +868,7 @@ fn spawn( }) } + #[cfg(test)] mod tests { use futures::{executor, pin_mut, select, channel::mpsc, FutureExt}; @@ -884,23 +876,6 @@ mod tests { use polkadot_primitives::parachain::{BlockData, PoVBlock}; use super::*; - struct DummySubsystem(std::marker::PhantomData); - - impl Default for DummySubsystem { - fn default() -> Self { - Self(std::marker::PhantomData) - } - } - - impl Subsystem for DummySubsystem - where C: SubsystemContext - { - fn start(self, mut _ctx: C) -> SpawnedSubsystem { - SpawnedSubsystem(Box::pin(async move { - // Do nothing and exit. - })) - } - } struct TestSubsystem1(mpsc::Sender); diff --git a/node/subsystem/src/lib.rs b/node/subsystem/src/lib.rs index fd32d7cfdbc9..76603fba8949 100644 --- a/node/subsystem/src/lib.rs +++ b/node/subsystem/src/lib.rs @@ -148,3 +148,23 @@ pub trait Subsystem { /// Start this `Subsystem` and return `SpawnedSubsystem`. fn start(self, ctx: C) -> SpawnedSubsystem; } + +/// A dummy subsystem that implements [`Subsystem`] for all +/// types of messages. Used for tests or as a placeholder. +pub struct DummySubsystem(core::marker::PhantomData); + +impl Default for DummySubsystem { + fn default() -> Self { + Self(core::marker::PhantomData) + } +} + +impl Subsystem for DummySubsystem + where C: SubsystemContext +{ + fn start(self, mut _ctx: C) -> SpawnedSubsystem { + SpawnedSubsystem(Box::pin(async move { + // Do nothing and return immediately + })) + } +} From 0ced05712870d6fb6c3053c6c9dc1536db50fcc0 Mon Sep 17 00:00:00 2001 From: Andronik Ordian Date: Tue, 7 Jul 2020 23:29:19 +0200 Subject: [PATCH 4/9] fix tests fallout --- node/overseer/examples/minimal-example.rs | 19 ++++++++- node/overseer/src/lib.rs | 2 + node/service/src/lib.rs | 52 +++++++++-------------- node/subsystem/src/lib.rs | 10 ++++- 4 files changed, 48 insertions(+), 35 deletions(-) diff --git a/node/overseer/examples/minimal-example.rs b/node/overseer/examples/minimal-example.rs index cdef0340d04f..8dfafb63b262 100644 --- a/node/overseer/examples/minimal-example.rs +++ b/node/overseer/examples/minimal-example.rs @@ -30,9 +30,16 @@ use kv_log_macro as log; use polkadot_primitives::parachain::{BlockData, PoVBlock}; use polkadot_overseer::Overseer; -use polkadot_subsystem::{Subsystem, SubsystemContext, SpawnedSubsystem, FromOverseer}; +use polkadot_subsystem::{ + Subsystem, SubsystemContext, DummySubsystem, + SpawnedSubsystem, FromOverseer, +}; use polkadot_subsystem::messages::{ - AllMessages, CandidateBackingMessage, CandidateValidationMessage + CandidateValidationMessage, CandidateBackingMessage, + CandidateSelectionMessage, StatementDistributionMessage, + AvailabilityDistributionMessage, BitfieldDistributionMessage, + ProvisionerMessage, RuntimeApiMessage, AvailabilityStoreMessage, + NetworkBridgeMessage, AllMessages, }; struct Subsystem1; @@ -131,6 +138,14 @@ fn main() { vec![], Subsystem2, Subsystem1, + DummySubsystem::::default(), + DummySubsystem::::default(), + DummySubsystem::::default(), + DummySubsystem::::default(), + DummySubsystem::::default(), + DummySubsystem::::default(), + DummySubsystem::::default(), + DummySubsystem::::default(), spawner, ).unwrap(); let overseer_fut = overseer.run().fuse(); diff --git a/node/overseer/src/lib.rs b/node/overseer/src/lib.rs index 9b58096d236e..71e721b5d77e 100644 --- a/node/overseer/src/lib.rs +++ b/node/overseer/src/lib.rs @@ -451,6 +451,7 @@ where /// let (overseer, _handler) = Overseer::new( /// vec![], /// ValidationSubsystem, + /// DummySubsystem::::default(), /// DummySubsystem::::default(), /// DummySubsystem::::default(), /// DummySubsystem::::default(), @@ -874,6 +875,7 @@ mod tests { use futures::{executor, pin_mut, select, channel::mpsc, FutureExt}; use polkadot_primitives::parachain::{BlockData, PoVBlock}; + use polkadot_subsystem::DummySubsystem; use super::*; diff --git a/node/service/src/lib.rs b/node/service/src/lib.rs index 4ea8ddd50b5c..44b20ec510a8 100644 --- a/node/service/src/lib.rs +++ b/node/service/src/lib.rs @@ -30,9 +30,13 @@ use sc_executor::native_executor_instance; use log::info; use sp_blockchain::HeaderBackend; use polkadot_overseer::{self as overseer, BlockInfo, Overseer, OverseerHandler}; -use polkadot_subsystem::{ - Subsystem, SubsystemContext, SpawnedSubsystem, - messages::{CandidateValidationMessage, CandidateBackingMessage}, +use polkadot_subsystem::DummySubsystem; +use polkadot_subsystem::messages::{ + CandidateValidationMessage, CandidateBackingMessage, + CandidateSelectionMessage, StatementDistributionMessage, + AvailabilityDistributionMessage, BitfieldDistributionMessage, + ProvisionerMessage, RuntimeApiMessage, AvailabilityStoreMessage, + NetworkBridgeMessage, }; use polkadot_node_core_proposer::ProposerFactory; use sp_trie::PrefixedMemoryDB; @@ -270,38 +274,24 @@ macro_rules! new_full_start { }} } -struct CandidateValidationSubsystem; - -impl Subsystem for CandidateValidationSubsystem - where C: SubsystemContext -{ - fn start(self, mut ctx: C) -> SpawnedSubsystem { - SpawnedSubsystem(Box::pin(async move { - while let Ok(_) = ctx.recv().await {} - })) - } -} - -struct CandidateBackingSubsystem; - -impl Subsystem for CandidateBackingSubsystem - where C: SubsystemContext -{ - fn start(self, mut ctx: C) -> SpawnedSubsystem { - SpawnedSubsystem(Box::pin(async move { - while let Ok(_) = ctx.recv().await {} - })) - } -} - fn real_overseer( leaves: impl IntoIterator, s: S, ) -> Result<(Overseer, OverseerHandler), ServiceError> { - let validation = CandidateValidationSubsystem; - let candidate_backing = CandidateBackingSubsystem; - Overseer::new(leaves, validation, candidate_backing, s) - .map_err(|e| ServiceError::Other(format!("Failed to create an Overseer: {:?}", e))) + Overseer::new( + leaves, + DummySubsystem::::default(), + DummySubsystem::::default(), + DummySubsystem::::default(), + DummySubsystem::::default(), + DummySubsystem::::default(), + DummySubsystem::::default(), + DummySubsystem::::default(), + DummySubsystem::::default(), + DummySubsystem::::default(), + DummySubsystem::::default(), + s, + ).map_err(|e| ServiceError::Other(format!("Failed to create an Overseer: {:?}", e))) } /// Builds a new service for a full client. diff --git a/node/subsystem/src/lib.rs b/node/subsystem/src/lib.rs index 76603fba8949..e29750fb43a0 100644 --- a/node/subsystem/src/lib.rs +++ b/node/subsystem/src/lib.rs @@ -162,9 +162,15 @@ impl Default for DummySubsystem { impl Subsystem for DummySubsystem where C: SubsystemContext { - fn start(self, mut _ctx: C) -> SpawnedSubsystem { + fn start(self, mut ctx: C) -> SpawnedSubsystem { SpawnedSubsystem(Box::pin(async move { - // Do nothing and return immediately + loop { + match ctx.recv().await { + Ok(FromOverseer::Signal(OverseerSignal::Conclude)) => return, + Err(_) => return, + _ => continue, + } + } })) } } From 5e9d6699ad1113a72ef302be8afe96da33721c87 Mon Sep 17 00:00:00 2001 From: Andronik Ordian Date: Wed, 8 Jul 2020 18:19:42 +0200 Subject: [PATCH 5/9] use a struct for all subsystems --- node/overseer/examples/minimal-example.rs | 23 ++- node/overseer/src/lib.rs | 225 +++++++++++++--------- node/service/src/lib.rs | 25 +-- node/subsystem/src/lib.rs | 12 +- 4 files changed, 168 insertions(+), 117 deletions(-) diff --git a/node/overseer/examples/minimal-example.rs b/node/overseer/examples/minimal-example.rs index 8dfafb63b262..e459c488588c 100644 --- a/node/overseer/examples/minimal-example.rs +++ b/node/overseer/examples/minimal-example.rs @@ -134,18 +134,21 @@ fn main() { Delay::new(Duration::from_secs(1)).await; }); + let all_subsystems = AllSubsystems { + candidate_validation: Subsystem2, + candidate_backing: Subsystem1, + candidate_selection: DummySubsystem, + statement_distribution: DummySubsystem, + availability_distribution: DummySubsystem, + bitfield_distribution: DummySubsystem, + provisioner: DummySubsystem, + runtime_api: DummySubsystem, + availability_store: DummySubsystem, + network_bridge: DummySubsystem, + }; let (overseer, _handler) = Overseer::new( vec![], - Subsystem2, - Subsystem1, - DummySubsystem::::default(), - DummySubsystem::::default(), - DummySubsystem::::default(), - DummySubsystem::::default(), - DummySubsystem::::default(), - DummySubsystem::::default(), - DummySubsystem::::default(), - DummySubsystem::::default(), + all_subsystems, spawner, ).unwrap(); let overseer_fut = overseer.run().fuse(); diff --git a/node/overseer/src/lib.rs b/node/overseer/src/lib.rs index 71e721b5d77e..1124eb90cacc 100644 --- a/node/overseer/src/lib.rs +++ b/node/overseer/src/lib.rs @@ -79,7 +79,7 @@ use polkadot_subsystem::messages::{ CandidateValidationMessage, CandidateBackingMessage, CandidateSelectionMessage, StatementDistributionMessage, AvailabilityDistributionMessage, BitfieldDistributionMessage, - ProvisionerMessage, RuntimeApiMessage, AvailabilityStoreMessage, + ProvisionerMessage, RuntimeApiMessage, AvailabilityStoreMessage, NetworkBridgeMessage, AllMessages, }; pub use polkadot_subsystem::{ @@ -375,22 +375,58 @@ pub struct Overseer { active_leaves: HashSet<(Hash, BlockNumber)>, } +/// This struct is passed as an argument to create a new instance of an [`Overseer`]. +/// +/// As any entity that satisfies the interface may act as a [`Subsystem`] this allows +/// mocking in the test code: +/// +/// Each [`Subsystem`] is supposed to implement some interface that is generic over +/// message type that is specific to this [`Subsystem`]. At the moment not all +/// subsystems are implemented and the rest can be mocked with the [`DummySubsystem`]. +/// +/// [`Subsystem`]: trait.Subsystem.html +/// [`DummySubsystem`]: struct.DummySubsystem.html +pub struct AllSubsystems +where + CV: Subsystem> + Send, + CB: Subsystem> + Send, + CS: Subsystem> + Send, + SD: Subsystem> + Send, + AD: Subsystem> + Send, + BD: Subsystem> + Send, + P: Subsystem> + Send, + RA: Subsystem> + Send, + AS: Subsystem> + Send, + NB: Subsystem> + Send, +{ + /// A candidate validation subsystem. + pub candidate_validation: CV, + /// A candidate backing subsystem. + pub candidate_backing: CB, + /// A candidate selection subsystem. + pub candidate_selection: CS, + /// A statement distribution subsystem. + pub statement_distribution: SD, + /// An availability distribution subsystem. + pub availability_distribution: AD, + /// A bitfield distribution subsystem. + pub bitfield_distribution: BD, + /// A provisioner subsystem. + pub provisioner: P, + /// A runtime API subsystem. + pub runtime_api: RA, + /// An availability store subsystem. + pub availability_store: AS, + /// A network bridge subsystem. + pub network_bridge: NB, +} + impl Overseer where S: Spawn, { /// Create a new intance of the `Overseer` with a fixed set of [`Subsystem`]s. /// - /// Each [`Subsystem`] is passed to this function as an explicit parameter - /// and is supposed to implement some interface that is generic over message type - /// that is specific to this [`Subsystem`]. At the moment there are only two - /// subsystems: - /// * Validation - /// * CandidateBacking - /// - /// As any entity that satisfies the interface may act as a [`Subsystem`] this allows - /// mocking in the test code: - /// /// ```text /// +------------------------------------+ /// | Overseer | @@ -417,8 +453,8 @@ where /// # Example /// /// The [`Subsystems`] may be any type as long as they implement an expected interface. - /// Here, we create two mock subsystems and start the `Overseer` with them. For the sake - /// of simplicity the termination of the example is done with a timeout. + /// Here, we create a mock validation subsystem and a few dummy ones and start the `Overseer` with them. + /// For the sake of simplicity the termination of the example is done with a timeout. /// ``` /// # use std::time::Duration; /// # use futures::{executor, pin_mut, select, FutureExt}; @@ -448,18 +484,21 @@ where /// /// # fn main() { executor::block_on(async move { /// let spawner = executor::ThreadPool::new().unwrap(); + /// let all_subsystems = AllSubsystems { + /// candidate_validation: ValidationSubsystem, + /// candidate_backing: DummySubsystem, + /// candidate_selection: DummySubsystem, + /// statement_distribution: DummySubsystem, + /// availability_distribution: DummySubsystem, + /// bitfield_distribution: DummySubsystem, + /// provisioner: DummySubsystem, + /// runtime_api: DummySubsystem, + /// availability_store: DummySubsystem, + /// network_bridge: DummySubsystem, + /// }; /// let (overseer, _handler) = Overseer::new( /// vec![], - /// ValidationSubsystem, - /// DummySubsystem::::default(), - /// DummySubsystem::::default(), - /// DummySubsystem::::default(), - /// DummySubsystem::::default(), - /// DummySubsystem::::default(), - /// DummySubsystem::::default(), - /// DummySubsystem::::default(), - /// DummySubsystem::::default(), - /// DummySubsystem::::default(), + /// all_subsystems, /// spawner, /// ).unwrap(); /// @@ -476,20 +515,23 @@ where /// # /// # }); } /// ``` - pub fn new( + pub fn new( leaves: impl IntoIterator, - candidate_validation: impl Subsystem> + Send, - candidate_backing: impl Subsystem> + Send, - candidate_selection: impl Subsystem> + Send, - statement_distribution: impl Subsystem> + Send, - availability_distribution: impl Subsystem> + Send, - bitfield_distribution: impl Subsystem> + Send, - provisioner: impl Subsystem> + Send, - runtime_api: impl Subsystem> + Send, - availability_store: impl Subsystem> + Send, - network_bridge: impl Subsystem> + Send, + all_subsystems: AllSubsystems, mut s: S, - ) -> SubsystemResult<(Self, OverseerHandler)> { + ) -> SubsystemResult<(Self, OverseerHandler)> + where + CV: Subsystem> + Send, + CB: Subsystem> + Send, + CS: Subsystem> + Send, + SD: Subsystem> + Send, + AD: Subsystem> + Send, + BD: Subsystem> + Send, + P: Subsystem> + Send, + RA: Subsystem> + Send, + AS: Subsystem> + Send, + NB: Subsystem> + Send, + { let (events_tx, events_rx) = mpsc::channel(CHANNEL_CAPACITY); let handler = OverseerHandler { @@ -503,70 +545,70 @@ where &mut s, &mut running_subsystems, &mut running_subsystems_rx, - candidate_validation, + all_subsystems.candidate_validation, )?; let candidate_backing_subsystem = spawn( &mut s, &mut running_subsystems, &mut running_subsystems_rx, - candidate_backing, + all_subsystems.candidate_backing, )?; let candidate_selection_subsystem = spawn( &mut s, &mut running_subsystems, &mut running_subsystems_rx, - candidate_selection, + all_subsystems.candidate_selection, )?; let statement_distribution_subsystem = spawn( &mut s, &mut running_subsystems, &mut running_subsystems_rx, - statement_distribution, + all_subsystems.statement_distribution, )?; let availability_distribution_subsystem = spawn( &mut s, &mut running_subsystems, &mut running_subsystems_rx, - availability_distribution, + all_subsystems.availability_distribution, )?; let bitfield_distribution_subsystem = spawn( &mut s, &mut running_subsystems, &mut running_subsystems_rx, - bitfield_distribution, + all_subsystems.bitfield_distribution, )?; let provisioner_subsystem = spawn( &mut s, &mut running_subsystems, &mut running_subsystems_rx, - provisioner, + all_subsystems.provisioner, )?; let runtime_api_subsystem = spawn( &mut s, &mut running_subsystems, &mut running_subsystems_rx, - runtime_api, + all_subsystems.runtime_api, )?; let availability_store_subsystem = spawn( &mut s, &mut running_subsystems, &mut running_subsystems_rx, - availability_store, + all_subsystems.availability_store, )?; let network_bridge_subsystem = spawn( &mut s, &mut running_subsystems, &mut running_subsystems_rx, - network_bridge, + all_subsystems.network_bridge, )?; let active_leaves = HashSet::new(); @@ -969,18 +1011,21 @@ mod tests { let (s1_tx, mut s1_rx) = mpsc::channel(64); let (s2_tx, mut s2_rx) = mpsc::channel(64); + let all_subsystems = AllSubsystems { + candidate_validation: TestSubsystem1(s1_tx), + candidate_backing: TestSubsystem2(s2_tx), + candidate_selection: DummySubsystem, + statement_distribution: DummySubsystem, + availability_distribution: DummySubsystem, + bitfield_distribution: DummySubsystem, + provisioner: DummySubsystem, + runtime_api: DummySubsystem, + availability_store: DummySubsystem, + network_bridge: DummySubsystem, + }; let (overseer, mut handler) = Overseer::new( vec![], - TestSubsystem1(s1_tx), - TestSubsystem2(s2_tx), - DummySubsystem::::default(), - DummySubsystem::::default(), - DummySubsystem::::default(), - DummySubsystem::::default(), - DummySubsystem::::default(), - DummySubsystem::::default(), - DummySubsystem::::default(), - DummySubsystem::::default(), + all_subsystems, spawner, ).unwrap(); let overseer_fut = overseer.run().fuse(); @@ -1027,18 +1072,21 @@ mod tests { executor::block_on(async move { let (s1_tx, _) = mpsc::channel(64); + let all_subsystems = AllSubsystems { + candidate_validation: TestSubsystem1(s1_tx), + candidate_backing: TestSubsystem4, + candidate_selection: DummySubsystem, + statement_distribution: DummySubsystem, + availability_distribution: DummySubsystem, + bitfield_distribution: DummySubsystem, + provisioner: DummySubsystem, + runtime_api: DummySubsystem, + availability_store: DummySubsystem, + network_bridge: DummySubsystem, + }; let (overseer, _handle) = Overseer::new( vec![], - TestSubsystem1(s1_tx), - TestSubsystem4, - DummySubsystem::::default(), - DummySubsystem::::default(), - DummySubsystem::::default(), - DummySubsystem::::default(), - DummySubsystem::::default(), - DummySubsystem::::default(), - DummySubsystem::::default(), - DummySubsystem::::default(), + all_subsystems, spawner, ).unwrap(); let overseer_fut = overseer.run().fuse(); @@ -1132,19 +1180,21 @@ mod tests { let (tx_5, mut rx_5) = mpsc::channel(64); let (tx_6, mut rx_6) = mpsc::channel(64); - + let all_subsystems = AllSubsystems { + candidate_validation: TestSubsystem5(tx_5), + candidate_backing: TestSubsystem6(tx_6), + candidate_selection: DummySubsystem, + statement_distribution: DummySubsystem, + availability_distribution: DummySubsystem, + bitfield_distribution: DummySubsystem, + provisioner: DummySubsystem, + runtime_api: DummySubsystem, + availability_store: DummySubsystem, + network_bridge: DummySubsystem, + }; let (overseer, mut handler) = Overseer::new( vec![first_block], - TestSubsystem5(tx_5), - TestSubsystem6(tx_6), - DummySubsystem::::default(), - DummySubsystem::::default(), - DummySubsystem::::default(), - DummySubsystem::::default(), - DummySubsystem::::default(), - DummySubsystem::::default(), - DummySubsystem::::default(), - DummySubsystem::::default(), + all_subsystems, spawner, ).unwrap(); @@ -1225,19 +1275,22 @@ mod tests { let (tx_5, mut rx_5) = mpsc::channel(64); let (tx_6, mut rx_6) = mpsc::channel(64); + let all_subsystems = AllSubsystems { + candidate_validation: TestSubsystem5(tx_5), + candidate_backing: TestSubsystem6(tx_6), + candidate_selection: DummySubsystem, + statement_distribution: DummySubsystem, + availability_distribution: DummySubsystem, + bitfield_distribution: DummySubsystem, + provisioner: DummySubsystem, + runtime_api: DummySubsystem, + availability_store: DummySubsystem, + network_bridge: DummySubsystem, + }; // start with two forks of different height. let (overseer, mut handler) = Overseer::new( vec![first_block, second_block], - TestSubsystem5(tx_5), - TestSubsystem6(tx_6), - DummySubsystem::::default(), - DummySubsystem::::default(), - DummySubsystem::::default(), - DummySubsystem::::default(), - DummySubsystem::::default(), - DummySubsystem::::default(), - DummySubsystem::::default(), - DummySubsystem::::default(), + all_subsystems, spawner, ).unwrap(); diff --git a/node/service/src/lib.rs b/node/service/src/lib.rs index 44b20ec510a8..854a063a550f 100644 --- a/node/service/src/lib.rs +++ b/node/service/src/lib.rs @@ -29,7 +29,7 @@ use grandpa::{self, FinalityProofProvider as GrandpaFinalityProofProvider}; use sc_executor::native_executor_instance; use log::info; use sp_blockchain::HeaderBackend; -use polkadot_overseer::{self as overseer, BlockInfo, Overseer, OverseerHandler}; +use polkadot_overseer::{self as overseer, AllSubsystems, BlockInfo, Overseer, OverseerHandler}; use polkadot_subsystem::DummySubsystem; use polkadot_subsystem::messages::{ CandidateValidationMessage, CandidateBackingMessage, @@ -278,18 +278,21 @@ fn real_overseer( leaves: impl IntoIterator, s: S, ) -> Result<(Overseer, OverseerHandler), ServiceError> { + let all_subsystems = AllSubsystems { + candidate_validation: DummySubsystem, + candidate_backing: DummySubsystem, + candidate_selection: DummySubsystem, + statement_distribution: DummySubsystem, + availability_distribution: DummySubsystem, + bitfield_distribution: DummySubsystem, + provisioner: DummySubsystem, + runtime_api: DummySubsystem, + availability_store: DummySubsystem, + network_bridge: DummySubsystem, + }; Overseer::new( leaves, - DummySubsystem::::default(), - DummySubsystem::::default(), - DummySubsystem::::default(), - DummySubsystem::::default(), - DummySubsystem::::default(), - DummySubsystem::::default(), - DummySubsystem::::default(), - DummySubsystem::::default(), - DummySubsystem::::default(), - DummySubsystem::::default(), + all_subsystems, s, ).map_err(|e| ServiceError::Other(format!("Failed to create an Overseer: {:?}", e))) } diff --git a/node/subsystem/src/lib.rs b/node/subsystem/src/lib.rs index e29750fb43a0..db9a0629cfd4 100644 --- a/node/subsystem/src/lib.rs +++ b/node/subsystem/src/lib.rs @@ -151,17 +151,9 @@ pub trait Subsystem { /// A dummy subsystem that implements [`Subsystem`] for all /// types of messages. Used for tests or as a placeholder. -pub struct DummySubsystem(core::marker::PhantomData); +pub struct DummySubsystem; -impl Default for DummySubsystem { - fn default() -> Self { - Self(core::marker::PhantomData) - } -} - -impl Subsystem for DummySubsystem - where C: SubsystemContext -{ +impl Subsystem for DummySubsystem { fn start(self, mut ctx: C) -> SpawnedSubsystem { SpawnedSubsystem(Box::pin(async move { loop { From 91cdecb1282f5249052cfeeb467f9c92e1e34c99 Mon Sep 17 00:00:00 2001 From: Andronik Ordian Date: Wed, 8 Jul 2020 18:45:50 +0200 Subject: [PATCH 6/9] more tests fallout --- node/overseer/examples/minimal-example.rs | 8 ++------ node/overseer/src/lib.rs | 4 ++-- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/node/overseer/examples/minimal-example.rs b/node/overseer/examples/minimal-example.rs index e459c488588c..9b61de04a8a0 100644 --- a/node/overseer/examples/minimal-example.rs +++ b/node/overseer/examples/minimal-example.rs @@ -28,18 +28,14 @@ use futures_timer::Delay; use kv_log_macro as log; use polkadot_primitives::parachain::{BlockData, PoVBlock}; -use polkadot_overseer::Overseer; +use polkadot_overseer::{Overseer, AllSubsystems}; use polkadot_subsystem::{ Subsystem, SubsystemContext, DummySubsystem, SpawnedSubsystem, FromOverseer, }; use polkadot_subsystem::messages::{ - CandidateValidationMessage, CandidateBackingMessage, - CandidateSelectionMessage, StatementDistributionMessage, - AvailabilityDistributionMessage, BitfieldDistributionMessage, - ProvisionerMessage, RuntimeApiMessage, AvailabilityStoreMessage, - NetworkBridgeMessage, AllMessages, + CandidateValidationMessage, CandidateBackingMessage, AllMessages, }; struct Subsystem1; diff --git a/node/overseer/src/lib.rs b/node/overseer/src/lib.rs index 1124eb90cacc..5bec2d230976 100644 --- a/node/overseer/src/lib.rs +++ b/node/overseer/src/lib.rs @@ -459,10 +459,10 @@ where /// # use std::time::Duration; /// # use futures::{executor, pin_mut, select, FutureExt}; /// # use futures_timer::Delay; - /// # use polkadot_overseer::Overseer; + /// # use polkadot_overseer::{Overseer, AllSubsystems}; /// # use polkadot_subsystem::{ /// # Subsystem, DummySubsystem, SpawnedSubsystem, SubsystemContext, - /// # messages::*, + /// # messages::CandidateValidationMessage, /// # }; /// /// struct ValidationSubsystem; From 9cb4dfc9d755c906d29c99b7f944b7cd007cda8e Mon Sep 17 00:00:00 2001 From: Andronik Ordian Date: Thu, 9 Jul 2020 11:12:26 +0200 Subject: [PATCH 7/9] add missing pov_distribution subsystem --- node/overseer/src/lib.rs | 47 +++++++++++++++++++++++++++++----- node/service/src/lib.rs | 1 + node/subsystem/src/messages.rs | 12 ++++----- 3 files changed, 47 insertions(+), 13 deletions(-) diff --git a/node/overseer/src/lib.rs b/node/overseer/src/lib.rs index 5bec2d230976..f4f1691895f1 100644 --- a/node/overseer/src/lib.rs +++ b/node/overseer/src/lib.rs @@ -79,8 +79,8 @@ use polkadot_subsystem::messages::{ CandidateValidationMessage, CandidateBackingMessage, CandidateSelectionMessage, StatementDistributionMessage, AvailabilityDistributionMessage, BitfieldDistributionMessage, - ProvisionerMessage, RuntimeApiMessage, AvailabilityStoreMessage, - NetworkBridgeMessage, AllMessages, + ProvisionerMessage, PoVDistributionMessage, RuntimeApiMessage, + AvailabilityStoreMessage, NetworkBridgeMessage, AllMessages, }; pub use polkadot_subsystem::{ Subsystem, SubsystemContext, OverseerSignal, FromOverseer, SubsystemError, SubsystemResult, @@ -344,6 +344,9 @@ pub struct Overseer { /// A provisioner subsystem. provisioner_subsystem: OverseenSubsystem, + /// A PoV distribution subsystem. + pov_distribution_subsystem: OverseenSubsystem, + /// A runtime API subsystem. runtime_api_subsystem: OverseenSubsystem, @@ -386,7 +389,7 @@ pub struct Overseer { /// /// [`Subsystem`]: trait.Subsystem.html /// [`DummySubsystem`]: struct.DummySubsystem.html -pub struct AllSubsystems +pub struct AllSubsystems where CV: Subsystem> + Send, CB: Subsystem> + Send, @@ -394,7 +397,8 @@ where SD: Subsystem> + Send, AD: Subsystem> + Send, BD: Subsystem> + Send, - P: Subsystem> + Send, + P: Subsystem> + Send, + PoVD: Subsystem> + Send, RA: Subsystem> + Send, AS: Subsystem> + Send, NB: Subsystem> + Send, @@ -413,6 +417,8 @@ where pub bitfield_distribution: BD, /// A provisioner subsystem. pub provisioner: P, + /// A PoV distribution subsystem. + pub pov_distribution: PoVD, /// A runtime API subsystem. pub runtime_api: RA, /// An availability store subsystem. @@ -492,6 +498,7 @@ where /// availability_distribution: DummySubsystem, /// bitfield_distribution: DummySubsystem, /// provisioner: DummySubsystem, + /// pov_distribution: DummySubsystem, /// runtime_api: DummySubsystem, /// availability_store: DummySubsystem, /// network_bridge: DummySubsystem, @@ -515,9 +522,9 @@ where /// # /// # }); } /// ``` - pub fn new( + pub fn new( leaves: impl IntoIterator, - all_subsystems: AllSubsystems, + all_subsystems: AllSubsystems, mut s: S, ) -> SubsystemResult<(Self, OverseerHandler)> where @@ -527,7 +534,8 @@ where SD: Subsystem> + Send, AD: Subsystem> + Send, BD: Subsystem> + Send, - P: Subsystem> + Send, + P: Subsystem> + Send, + PoVD: Subsystem> + Send, RA: Subsystem> + Send, AS: Subsystem> + Send, NB: Subsystem> + Send, @@ -590,6 +598,13 @@ where all_subsystems.provisioner, )?; + let pov_distribution_subsystem = spawn( + &mut s, + &mut running_subsystems, + &mut running_subsystems_rx, + all_subsystems.pov_distribution, + )?; + let runtime_api_subsystem = spawn( &mut s, &mut running_subsystems, @@ -626,6 +641,7 @@ where availability_distribution_subsystem, bitfield_distribution_subsystem, provisioner_subsystem, + pov_distribution_subsystem, runtime_api_subsystem, availability_store_subsystem, network_bridge_subsystem, @@ -670,6 +686,10 @@ where let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; } + if let Some(ref mut s) = self.pov_distribution_subsystem.instance { + let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; + } + if let Some(ref mut s) = self.runtime_api_subsystem.instance { let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; } @@ -811,6 +831,10 @@ where s.tx.send(FromOverseer::Signal(signal.clone())).await?; } + if let Some(ref mut s) = self.pov_distribution_subsystem.instance { + s.tx.send(FromOverseer::Signal(signal.clone())).await?; + } + if let Some(ref mut s) = self.runtime_api_subsystem.instance { s.tx.send(FromOverseer::Signal(signal.clone())).await?; } @@ -863,6 +887,11 @@ where let _ = s.tx.send(FromOverseer::Communication { msg }).await; } } + AllMessages::PoVDistribution(msg) => { + if let Some(ref mut s) = self.pov_distribution_subsystem.instance { + let _ = s.tx.send(FromOverseer::Communication { msg }).await; + } + } AllMessages::RuntimeApi(msg) => { if let Some(ref mut s) = self.runtime_api_subsystem.instance { let _ = s.tx.send(FromOverseer::Communication { msg }).await; @@ -1019,6 +1048,7 @@ mod tests { availability_distribution: DummySubsystem, bitfield_distribution: DummySubsystem, provisioner: DummySubsystem, + pov_distribution: DummySubsystem, runtime_api: DummySubsystem, availability_store: DummySubsystem, network_bridge: DummySubsystem, @@ -1080,6 +1110,7 @@ mod tests { availability_distribution: DummySubsystem, bitfield_distribution: DummySubsystem, provisioner: DummySubsystem, + pov_distribution: DummySubsystem, runtime_api: DummySubsystem, availability_store: DummySubsystem, network_bridge: DummySubsystem, @@ -1188,6 +1219,7 @@ mod tests { availability_distribution: DummySubsystem, bitfield_distribution: DummySubsystem, provisioner: DummySubsystem, + pov_distribution: DummySubsystem, runtime_api: DummySubsystem, availability_store: DummySubsystem, network_bridge: DummySubsystem, @@ -1283,6 +1315,7 @@ mod tests { availability_distribution: DummySubsystem, bitfield_distribution: DummySubsystem, provisioner: DummySubsystem, + pov_distribution: DummySubsystem, runtime_api: DummySubsystem, availability_store: DummySubsystem, network_bridge: DummySubsystem, diff --git a/node/service/src/lib.rs b/node/service/src/lib.rs index ad97d2e0eef1..318b3516c5ae 100644 --- a/node/service/src/lib.rs +++ b/node/service/src/lib.rs @@ -289,6 +289,7 @@ fn real_overseer( availability_distribution: DummySubsystem, bitfield_distribution: DummySubsystem, provisioner: DummySubsystem, + pov_distribution: DummySubsystem, runtime_api: DummySubsystem, availability_store: DummySubsystem, network_bridge: DummySubsystem, diff --git a/node/subsystem/src/messages.rs b/node/subsystem/src/messages.rs index 73371e28fd9b..3a7f248069be 100644 --- a/node/subsystem/src/messages.rs +++ b/node/subsystem/src/messages.rs @@ -224,12 +224,12 @@ pub enum PoVDistributionMessage { /// /// This `CandidateDescriptor` should correspond to a candidate seconded under the provided /// relay-parent hash. - FetchPoV(Hash, CandidateDescriptor, oneshot::Sender>), - /// Distribute a PoV for the given relay-parent and CandidateDescriptor. - /// The PoV should correctly hash to the PoV hash mentioned in the CandidateDescriptor - DistributePoV(Hash, CandidateDescriptor, Arc), - /// An update from the network bridge. - NetworkBridgeUpdate(NetworkBridgeEvent), + FetchPoV(Hash, CandidateDescriptor, oneshot::Sender>), + /// Distribute a PoV for the given relay-parent and CandidateDescriptor. + /// The PoV should correctly hash to the PoV hash mentioned in the CandidateDescriptor + DistributePoV(Hash, CandidateDescriptor, Arc), + /// An update from the network bridge. + NetworkBridgeUpdate(NetworkBridgeEvent), } /// A message type tying together all message types that are used across Subsystems. From 1d2546e7f2dbc35adc4a9d44a8b4d1743cf23b18 Mon Sep 17 00:00:00 2001 From: Andronik Ordian Date: Thu, 9 Jul 2020 11:13:35 +0200 Subject: [PATCH 8/9] remove unused imports and bounds --- node/overseer/src/lib.rs | 15 +-------------- node/service/src/lib.rs | 7 ------- 2 files changed, 1 insertion(+), 21 deletions(-) diff --git a/node/overseer/src/lib.rs b/node/overseer/src/lib.rs index f4f1691895f1..14d860d26784 100644 --- a/node/overseer/src/lib.rs +++ b/node/overseer/src/lib.rs @@ -389,20 +389,7 @@ pub struct Overseer { /// /// [`Subsystem`]: trait.Subsystem.html /// [`DummySubsystem`]: struct.DummySubsystem.html -pub struct AllSubsystems -where - CV: Subsystem> + Send, - CB: Subsystem> + Send, - CS: Subsystem> + Send, - SD: Subsystem> + Send, - AD: Subsystem> + Send, - BD: Subsystem> + Send, - P: Subsystem> + Send, - PoVD: Subsystem> + Send, - RA: Subsystem> + Send, - AS: Subsystem> + Send, - NB: Subsystem> + Send, -{ +pub struct AllSubsystems { /// A candidate validation subsystem. pub candidate_validation: CV, /// A candidate backing subsystem. diff --git a/node/service/src/lib.rs b/node/service/src/lib.rs index 318b3516c5ae..0f4c2a408008 100644 --- a/node/service/src/lib.rs +++ b/node/service/src/lib.rs @@ -31,13 +31,6 @@ use log::info; use sp_blockchain::HeaderBackend; use polkadot_overseer::{self as overseer, AllSubsystems, BlockInfo, Overseer, OverseerHandler}; use polkadot_subsystem::DummySubsystem; -use polkadot_subsystem::messages::{ - CandidateValidationMessage, CandidateBackingMessage, - CandidateSelectionMessage, StatementDistributionMessage, - AvailabilityDistributionMessage, BitfieldDistributionMessage, - ProvisionerMessage, RuntimeApiMessage, AvailabilityStoreMessage, - NetworkBridgeMessage, -}; use polkadot_node_core_proposer::ProposerFactory; use sp_trie::PrefixedMemoryDB; pub use service::{ From c2432183652d140c9e31362e6280fa154a4a4406 Mon Sep 17 00:00:00 2001 From: Andronik Ordian Date: Thu, 9 Jul 2020 11:25:09 +0200 Subject: [PATCH 9/9] fix minimal-example --- node/overseer/examples/minimal-example.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/node/overseer/examples/minimal-example.rs b/node/overseer/examples/minimal-example.rs index 9b61de04a8a0..9e904c93dcfd 100644 --- a/node/overseer/examples/minimal-example.rs +++ b/node/overseer/examples/minimal-example.rs @@ -138,6 +138,7 @@ fn main() { availability_distribution: DummySubsystem, bitfield_distribution: DummySubsystem, provisioner: DummySubsystem, + pov_distribution: DummySubsystem, runtime_api: DummySubsystem, availability_store: DummySubsystem, network_bridge: DummySubsystem,