diff --git a/bin/node-template/node/src/command.rs b/bin/node-template/node/src/command.rs index 18e1b22a53f8e..4f2fd3aad6fd3 100644 --- a/bin/node-template/node/src/command.rs +++ b/bin/node-template/node/src/command.rs @@ -18,7 +18,7 @@ use crate::chain_spec; use crate::cli::Cli; use crate::service; -use sc_cli::SubstrateCli; +use sc_cli::{SubstrateCli, RuntimeVersion, Role, ChainSpec}; impl SubstrateCli for Cli { fn impl_name() -> &'static str { @@ -58,6 +58,10 @@ impl SubstrateCli for Cli { )?), }) } + + fn native_runtime_version(_: &Box) -> &'static RuntimeVersion { + &node_template_runtime::VERSION + } } /// Parse and run command line arguments @@ -71,11 +75,10 @@ pub fn run() -> sc_cli::Result<()> { } None => { let runner = cli.create_runner(&cli.run)?; - runner.run_node( - service::new_light, - service::new_full, - node_template_runtime::VERSION - ) + runner.run_node_until_exit(|config| match config.role { + Role::Light => service::new_light(config), + _ => service::new_full(config), + }) } } } diff --git a/bin/node-template/node/src/service.rs b/bin/node-template/node/src/service.rs index e330c17b244b0..89bf159927fc6 100644 --- a/bin/node-template/node/src/service.rs +++ b/bin/node-template/node/src/service.rs @@ -5,7 +5,10 @@ use std::time::Duration; use sc_client_api::ExecutorProvider; use sc_consensus::LongestChain; use node_template_runtime::{self, opaque::Block, RuntimeApi}; -use sc_service::{error::{Error as ServiceError}, AbstractService, Configuration, ServiceBuilder}; +use sc_service::{ + error::{Error as ServiceError}, Configuration, ServiceBuilder, ServiceComponents, + TaskManager, +}; use sp_inherents::InherentDataProviders; use sc_executor::native_executor_instance; pub use sc_executor::NativeExecutor; @@ -93,7 +96,7 @@ macro_rules! new_full_start { } /// Builds a new service for a full client. -pub fn new_full(config: Configuration) -> Result { +pub fn new_full(config: Configuration) -> Result { let role = config.role.clone(); let force_authoring = config.force_authoring; let name = config.network.node_name.clone(); @@ -105,7 +108,10 @@ pub fn new_full(config: Configuration) -> Result>; @@ -115,13 +121,12 @@ pub fn new_full(config: Configuration) -> Result Result( sc_consensus_aura::slot_duration(&*client)?, - client, + client.clone(), select_chain, block_import, proposer, - service.network(), + network.clone(), inherent_data_providers.clone(), force_authoring, - service.keystore(), + keystore.clone(), can_author_with, )?; // the AURA authoring task is considered essential, i.e. if it // fails we take down the service with it. - service.spawn_essential_task_handle().spawn_blocking("aura", aura); + task_manager.spawn_essential_handle().spawn_blocking("aura", aura); } // if the node isn't actively participating in consensus then it doesn't // need a keystore, regardless of which protocol we use below. let keystore = if role.is_authority() { - Some(service.keystore() as sp_core::traits::BareCryptoStorePtr) + Some(keystore.clone() as sp_core::traits::BareCryptoStorePtr) } else { None }; @@ -174,33 +179,33 @@ pub fn new_full(config: Configuration) -> Result Result { +pub fn new_light(config: Configuration) -> Result { let inherent_data_providers = InherentDataProviders::new(); ServiceBuilder::new_light::(config)? @@ -265,4 +270,5 @@ pub fn new_light(config: Configuration) -> Result, log_level: String) -> Result ChainSpec { #[cfg(test)] pub(crate) mod tests { use super::*; - use crate::service::{new_full, new_light}; + use crate::service::{new_full_base, new_light_base}; use sc_service_test; use sp_runtime::BuildStorage; @@ -430,8 +430,14 @@ pub(crate) mod tests { fn test_connectivity() { sc_service_test::connectivity( integration_test_config_with_two_authorities(), - |config| new_full(config), - |config| new_light(config), + |config| { + let (keep_alive, _, client, network, transaction_pool) = new_full_base(config,|_, _| ())?; + Ok(sc_service_test::TestNetComponents::new(keep_alive, client, network, transaction_pool)) + }, + |config| { + let (keep_alive, _, client, network, transaction_pool) = new_light_base(config)?; + Ok(sc_service_test::TestNetComponents::new(keep_alive, client, network, transaction_pool)) + } ); } diff --git a/bin/node/cli/src/command.rs b/bin/node/cli/src/command.rs index bd5483f2cd31e..b07e0cdc907e0 100644 --- a/bin/node/cli/src/command.rs +++ b/bin/node/cli/src/command.rs @@ -19,7 +19,7 @@ use crate::{chain_spec, service, Cli, Subcommand}; use node_executor::Executor; use node_runtime::{Block, RuntimeApi}; -use sc_cli::{Result, SubstrateCli}; +use sc_cli::{Result, SubstrateCli, RuntimeVersion, Role, ChainSpec}; impl SubstrateCli for Cli { fn impl_name() -> &'static str { @@ -61,6 +61,10 @@ impl SubstrateCli for Cli { )?), }) } + + fn native_runtime_version(_: &Box) -> &'static RuntimeVersion { + &node_runtime::VERSION + } } /// Parse command line arguments into service configuration. @@ -70,11 +74,10 @@ pub fn run() -> Result<()> { match &cli.subcommand { None => { let runner = cli.create_runner(&cli.run)?; - runner.run_node( - service::new_light, - service::new_full, - node_runtime::VERSION - ) + runner.run_node_until_exit(|config| match config.role { + Role::Light => service::new_light(config), + _ => service::new_full(config), + }) } Some(Subcommand::Inspect(cmd)) => { let runner = cli.create_runner(cmd)?; diff --git a/bin/node/cli/src/service.rs b/bin/node/cli/src/service.rs index 327949036340d..9707e3d8caf08 100644 --- a/bin/node/cli/src/service.rs +++ b/bin/node/cli/src/service.rs @@ -29,10 +29,16 @@ use node_executor; use node_primitives::Block; use node_runtime::RuntimeApi; use sc_service::{ - AbstractService, ServiceBuilder, config::Configuration, error::{Error as ServiceError}, + ServiceBuilder, config::{Role, Configuration}, error::{Error as ServiceError}, + RpcHandlers, ServiceComponents, TaskManager, }; use sp_inherents::InherentDataProviders; use sc_consensus::LongestChain; +use sc_network::{Event, NetworkService}; +use sp_runtime::traits::Block as BlockT; +use futures::prelude::*; +use sc_client_api::ExecutorProvider; +use sp_core::traits::BareCryptoStorePtr; /// Starts a `ServiceBuilder` for a full service. /// @@ -147,183 +153,197 @@ macro_rules! new_full_start { }} } -/// Creates a full service from the configuration. -/// -/// We need to use a macro because the test suit doesn't work with an opaque service. It expects -/// concrete types instead. -macro_rules! new_full { - ($config:expr, $with_startup_data: expr) => {{ - use futures::prelude::*; - use sc_network::Event; - use sc_client_api::ExecutorProvider; - use sp_core::traits::BareCryptoStorePtr; - - let ( - role, - force_authoring, - name, - disable_grandpa, - ) = ( - $config.role.clone(), - $config.force_authoring, - $config.network.node_name.clone(), - $config.disable_grandpa, - ); +type FullClient = sc_service::TFullClient; +type FullBackend = sc_service::TFullBackend; +type GrandpaBlockImport = grandpa::GrandpaBlockImport< + FullBackend, Block, FullClient, sc_consensus::LongestChain +>; +type BabeBlockImport = sc_consensus_babe::BabeBlockImport; - let (builder, mut import_setup, inherent_data_providers, mut rpc_setup) = - new_full_start!($config); +/// Creates a full service from the configuration. +pub fn new_full_base( + config: Configuration, + with_startup_data: impl FnOnce(&BabeBlockImport, &sc_consensus_babe::BabeLink) +) -> Result<( + TaskManager, + InherentDataProviders, + Arc, Arc::Hash>>, + Arc, Block>> +), ServiceError> { + let ( + role, + force_authoring, + name, + disable_grandpa, + ) = ( + config.role.clone(), + config.force_authoring, + config.network.node_name.clone(), + config.disable_grandpa, + ); + + let (builder, mut import_setup, inherent_data_providers, mut rpc_setup) = + new_full_start!(config); + + let ServiceComponents { + client, transaction_pool, task_manager, keystore, network, select_chain, + prometheus_registry, telemetry_on_connect_sinks, .. + } = builder + .with_finality_proof_provider(|client, backend| { + // GenesisAuthoritySetProvider is implemented for StorageAndProofProvider + let provider = client as Arc>; + Ok(Arc::new(grandpa::FinalityProofProvider::new(backend, provider)) as _) + })? + .build_full()?; - let service = builder - .with_finality_proof_provider(|client, backend| { - // GenesisAuthoritySetProvider is implemented for StorageAndProofProvider - let provider = client as Arc>; - Ok(Arc::new(grandpa::FinalityProofProvider::new(backend, provider)) as _) - })? - .build_full()?; + let (block_import, grandpa_link, babe_link) = import_setup.take() + .expect("Link Half and Block Import are present for Full Services or setup failed before. qed"); - let (block_import, grandpa_link, babe_link) = import_setup.take() - .expect("Link Half and Block Import are present for Full Services or setup failed before. qed"); + let shared_voter_state = rpc_setup.take() + .expect("The SharedVoterState is present for Full Services or setup failed before. qed"); - let shared_voter_state = rpc_setup.take() - .expect("The SharedVoterState is present for Full Services or setup failed before. qed"); + (with_startup_data)(&block_import, &babe_link); - ($with_startup_data)(&block_import, &babe_link); + if let sc_service::config::Role::Authority { .. } = &role { + let proposer = sc_basic_authorship::ProposerFactory::new( + client.clone(), + transaction_pool.clone(), + prometheus_registry.as_ref(), + ); - if let sc_service::config::Role::Authority { .. } = &role { - let proposer = sc_basic_authorship::ProposerFactory::new( - service.client(), - service.transaction_pool(), - service.prometheus_registry().as_ref(), - ); + let select_chain = select_chain + .ok_or(sc_service::Error::SelectChainRequired)?; - let client = service.client(); - let select_chain = service.select_chain() - .ok_or(sc_service::Error::SelectChainRequired)?; + let can_author_with = + sp_consensus::CanAuthorWithNativeVersion::new(client.executor().clone()); - let can_author_with = - sp_consensus::CanAuthorWithNativeVersion::new(client.executor().clone()); + let babe_config = sc_consensus_babe::BabeParams { + keystore: keystore.clone(), + client: client.clone(), + select_chain, + env: proposer, + block_import, + sync_oracle: network.clone(), + inherent_data_providers: inherent_data_providers.clone(), + force_authoring, + babe_link, + can_author_with, + }; - let babe_config = sc_consensus_babe::BabeParams { - keystore: service.keystore(), - client, - select_chain, - env: proposer, - block_import, - sync_oracle: service.network(), - inherent_data_providers: inherent_data_providers.clone(), - force_authoring, - babe_link, - can_author_with, - }; + let babe = sc_consensus_babe::start_babe(babe_config)?; + task_manager.spawn_essential_handle().spawn_blocking("babe-proposer", babe); + } - let babe = sc_consensus_babe::start_babe(babe_config)?; - service.spawn_essential_task_handle().spawn_blocking("babe-proposer", babe); - } - - // Spawn authority discovery module. - if matches!(role, sc_service::config::Role::Authority{..} | sc_service::config::Role::Sentry {..}) { - let (sentries, authority_discovery_role) = match role { - sc_service::config::Role::Authority { ref sentry_nodes } => ( - sentry_nodes.clone(), - sc_authority_discovery::Role::Authority ( - service.keystore(), - ), - ), - sc_service::config::Role::Sentry {..} => ( - vec![], - sc_authority_discovery::Role::Sentry, + // Spawn authority discovery module. + if matches!(role, Role::Authority{..} | Role::Sentry {..}) { + let (sentries, authority_discovery_role) = match role { + sc_service::config::Role::Authority { ref sentry_nodes } => ( + sentry_nodes.clone(), + sc_authority_discovery::Role::Authority ( + keystore.clone(), ), - _ => unreachable!("Due to outer matches! constraint; qed.") - }; + ), + sc_service::config::Role::Sentry {..} => ( + vec![], + sc_authority_discovery::Role::Sentry, + ), + _ => unreachable!("Due to outer matches! constraint; qed.") + }; - let network = service.network(); - let dht_event_stream = network.event_stream("authority-discovery").filter_map(|e| async move { match e { + let dht_event_stream = network.event_stream("authority-discovery") + .filter_map(|e| async move { match e { Event::Dht(e) => Some(e), _ => None, }}).boxed(); - let authority_discovery = sc_authority_discovery::AuthorityDiscovery::new( - service.client(), - network, - sentries, - dht_event_stream, - authority_discovery_role, - service.prometheus_registry(), - ); + let authority_discovery = sc_authority_discovery::AuthorityDiscovery::new( + client.clone(), + network.clone(), + sentries, + dht_event_stream, + authority_discovery_role, + prometheus_registry.clone(), + ); - service.spawn_task_handle().spawn("authority-discovery", authority_discovery); - } + task_manager.spawn_handle().spawn("authority-discovery", authority_discovery); + } - // if the node isn't actively participating in consensus then it doesn't - // need a keystore, regardless of which protocol we use below. - let keystore = if role.is_authority() { - Some(service.keystore() as BareCryptoStorePtr) - } else { - None - }; + // if the node isn't actively participating in consensus then it doesn't + // need a keystore, regardless of which protocol we use below. + let keystore = if role.is_authority() { + Some(keystore.clone() as BareCryptoStorePtr) + } else { + None + }; - let config = grandpa::Config { - // FIXME #1578 make this available through chainspec - gossip_duration: std::time::Duration::from_millis(333), - justification_period: 512, - name: Some(name), - observer_enabled: false, - keystore, - is_authority: role.is_network_authority(), - }; + let config = grandpa::Config { + // FIXME #1578 make this available through chainspec + gossip_duration: std::time::Duration::from_millis(333), + justification_period: 512, + name: Some(name), + observer_enabled: false, + keystore, + is_authority: role.is_network_authority(), + }; - let enable_grandpa = !disable_grandpa; - if enable_grandpa { - // start the full GRANDPA voter - // NOTE: non-authorities could run the GRANDPA observer protocol, but at - // this point the full voter should provide better guarantees of block - // and vote data availability than the observer. The observer has not - // been tested extensively yet and having most nodes in a network run it - // could lead to finality stalls. - let grandpa_config = grandpa::GrandpaParams { - config, - link: grandpa_link, - network: service.network(), - inherent_data_providers: inherent_data_providers.clone(), - telemetry_on_connect: Some(service.telemetry_on_connect_stream()), - voting_rule: grandpa::VotingRulesBuilder::default().build(), - prometheus_registry: service.prometheus_registry(), - shared_voter_state, - }; + let enable_grandpa = !disable_grandpa; + if enable_grandpa { + // start the full GRANDPA voter + // NOTE: non-authorities could run the GRANDPA observer protocol, but at + // this point the full voter should provide better guarantees of block + // and vote data availability than the observer. The observer has not + // been tested extensively yet and having most nodes in a network run it + // could lead to finality stalls. + let grandpa_config = grandpa::GrandpaParams { + config, + link: grandpa_link, + network: network.clone(), + inherent_data_providers: inherent_data_providers.clone(), + telemetry_on_connect: Some(telemetry_on_connect_sinks.on_connect_stream()), + voting_rule: grandpa::VotingRulesBuilder::default().build(), + prometheus_registry: prometheus_registry.clone(), + shared_voter_state, + }; - // the GRANDPA voter task is considered infallible, i.e. - // if it fails we take down the service with it. - service.spawn_essential_task_handle().spawn_blocking( - "grandpa-voter", - grandpa::run_grandpa_voter(grandpa_config)? - ); - } else { - grandpa::setup_disabled_grandpa( - service.client(), - &inherent_data_providers, - service.network(), - )?; - } + // the GRANDPA voter task is considered infallible, i.e. + // if it fails we take down the service with it. + task_manager.spawn_essential_handle().spawn_blocking( + "grandpa-voter", + grandpa::run_grandpa_voter(grandpa_config)? + ); + } else { + grandpa::setup_disabled_grandpa( + client.clone(), + &inherent_data_providers, + network.clone(), + )?; + } - Ok((service, inherent_data_providers)) - }}; - ($config:expr) => {{ - new_full!($config, |_, _| {}) - }} + Ok((task_manager, inherent_data_providers, client, network, transaction_pool)) } /// Builds a new service for a full client. pub fn new_full(config: Configuration) --> Result -{ - new_full!(config).map(|(service, _)| service) +-> Result { + new_full_base(config, |_, _| ()).map(|(task_manager, _, _, _, _)| { + task_manager + }) } -/// Builds a new service for a light client. -pub fn new_light(config: Configuration) --> Result { +type LightClient = sc_service::TLightClient; +type LightFetcher = sc_network::config::OnDemand; + +pub fn new_light_base(config: Configuration) -> Result<( + TaskManager, Arc, Arc, + Arc::Hash>>, + Arc, Block + >> +), ServiceError> { let inherent_data_providers = InherentDataProviders::new(); - let service = ServiceBuilder::new_light::(config)? + let ServiceComponents { + task_manager, rpc_handlers, client, network, transaction_pool, .. + } = ServiceBuilder::new_light::(config)? .with_select_chain(|_config, backend| { Ok(LongestChain::new(backend.clone())) })? @@ -406,16 +426,21 @@ pub fn new_light(config: Configuration) Ok(node_rpc::create_light(light_deps)) })? .build_light()?; + + Ok((task_manager, rpc_handlers, client, network, transaction_pool)) +} - Ok(service) +/// Builds a new service for a light client. +pub fn new_light(config: Configuration) -> Result { + new_light_base(config).map(|(task_manager, _, _, _, _)| { + task_manager + }) } #[cfg(test)] mod tests { use std::{sync::Arc, borrow::Cow, any::Any}; - use sc_consensus_babe::{ - CompatibleDigestItem, BabeIntermediate, INTERMEDIATE_KEY - }; + use sc_consensus_babe::{CompatibleDigestItem, BabeIntermediate, INTERMEDIATE_KEY}; use sc_consensus_epochs::descendent_query; use sp_consensus::{ Environment, Proposer, BlockImportParams, BlockOrigin, ForkChoiceStrategy, BlockImport, @@ -434,10 +459,11 @@ mod tests { use sp_timestamp; use sp_finality_tracker; use sp_keyring::AccountKeyring; - use sc_service::AbstractService; - use crate::service::{new_full, new_light}; + use sc_service_test::TestNetNode; + use crate::service::{new_full_base, new_light_base}; use sp_runtime::traits::IdentifyAccount; use sp_transaction_pool::{MaintainedTransactionPool, ChainEvent}; + use sc_client_api::BlockBackend; type AccountPublic = ::Signer; @@ -466,14 +492,25 @@ mod tests { chain_spec, |config| { let mut setup_handles = None; - new_full!(config, | - block_import: &sc_consensus_babe::BabeBlockImport, - babe_link: &sc_consensus_babe::BabeLink, - | { - setup_handles = Some((block_import.clone(), babe_link.clone())); - }).map(move |(node, x)| (node, (x, setup_handles.unwrap()))) + let (keep_alive, inherent_data_providers, client, network, transaction_pool) = + new_full_base(config, + | + block_import: &sc_consensus_babe::BabeBlockImport, + babe_link: &sc_consensus_babe::BabeLink, + | { + setup_handles = Some((block_import.clone(), babe_link.clone())); + } + )?; + + let node = sc_service_test::TestNetComponents::new( + keep_alive, client, network, transaction_pool + ); + Ok((node, (inherent_data_providers, setup_handles.unwrap()))) + }, + |config| { + let (keep_alive, _, client, network, transaction_pool) = new_light_base(config)?; + Ok(sc_service_test::TestNetComponents::new(keep_alive, client, network, transaction_pool)) }, - |config| new_light(config), |service, &mut (ref inherent_data_providers, (ref mut block_import, ref babe_link))| { let mut inherent_data = inherent_data_providers .create_inherent_data() @@ -620,8 +657,14 @@ mod tests { fn test_consensus() { sc_service_test::consensus( crate::chain_spec::tests::integration_test_config_with_two_authorities(), - |config| new_full(config), - |config| new_light(config), + |config| { + let (keep_alive, _, client, network, transaction_pool) = new_full_base(config, |_, _| ())?; + Ok(sc_service_test::TestNetComponents::new(keep_alive, client, network, transaction_pool)) + }, + |config| { + let (keep_alive, _, client, network, transaction_pool) = new_light_base(config)?; + Ok(sc_service_test::TestNetComponents::new(keep_alive, client, network, transaction_pool)) + }, vec![ "//Alice".into(), "//Bob".into(), diff --git a/bin/node/testing/src/bench.rs b/bin/node/testing/src/bench.rs index fc5daa80ad63e..5df2709f87053 100644 --- a/bin/node/testing/src/bench.rs +++ b/bin/node/testing/src/bench.rs @@ -55,7 +55,7 @@ use sp_api::ProvideRuntimeApi; use sp_block_builder::BlockBuilder; use sp_inherents::InherentData; use sc_client_api::{ - ExecutionStrategy, + ExecutionStrategy, BlockBackend, execution_extensions::{ExecutionExtensions, ExecutionStrategies}, }; use sp_core::{Pair, Public, sr25519, ed25519}; diff --git a/client/api/src/client.rs b/client/api/src/client.rs index 42dd5d53b19da..35d40965e6425 100644 --- a/client/api/src/client.rs +++ b/client/api/src/client.rs @@ -90,6 +90,9 @@ pub trait BlockBackend { /// Get block justification set by id. fn justification(&self, id: &BlockId) -> sp_blockchain::Result>; + + /// Get block hash by number. + fn block_hash(&self, number: NumberFor) -> sp_blockchain::Result>; } /// Provide a list of potential uncle headers for a given block. diff --git a/client/cli/src/lib.rs b/client/cli/src/lib.rs index 9623b08bfbb7f..a702edba65784 100644 --- a/client/cli/src/lib.rs +++ b/client/cli/src/lib.rs @@ -37,7 +37,9 @@ use log::info; pub use params::*; use regex::Regex; pub use runner::*; -use sc_service::{ChainSpec, Configuration, TaskExecutor}; +use sc_service::{Configuration, TaskExecutor}; +pub use sc_service::{ChainSpec, Role}; +pub use sp_version::RuntimeVersion; use std::io::Write; pub use structopt; use structopt::{ @@ -207,6 +209,9 @@ pub trait SubstrateCli: Sized { command.init::()?; Runner::new(self, command) } + + /// Native runtime version. + fn native_runtime_version(chain_spec: &Box) -> &'static RuntimeVersion; } /// Initialize the logger diff --git a/client/cli/src/runner.rs b/client/cli/src/runner.rs index 51ea2d21862ef..fcc869dc87069 100644 --- a/client/cli/src/runner.rs +++ b/client/cli/src/runner.rs @@ -25,10 +25,9 @@ use futures::pin_mut; use futures::select; use futures::{future, future::FutureExt, Future}; use log::info; -use sc_service::{AbstractService, Configuration, Role, ServiceBuilderCommand, TaskType}; +use sc_service::{Configuration, ServiceBuilderCommand, TaskType, TaskManager}; use sp_runtime::traits::{Block as BlockT, Header as HeaderT}; use sp_utils::metrics::{TOKIO_THREADS_ALIVE, TOKIO_THREADS_TOTAL}; -use sp_version::RuntimeVersion; use std::{fmt::Debug, marker::PhantomData, str::FromStr}; #[cfg(target_family = "unix")] @@ -153,7 +152,7 @@ impl Runner { /// 2020-06-03 16:14:21 💾 Database: RocksDb at /tmp/c/chains/flamingfir7/db /// 2020-06-03 16:14:21 ⛓ Native runtime: node-251 (substrate-node-1.tx1.au10) /// ``` - pub fn print_node_infos(&self, runtime_version: RuntimeVersion) { + fn print_node_infos(&self) { info!("{}", C::impl_name()); info!("✌️ version {}", C::impl_version()); info!( @@ -169,64 +168,7 @@ impl Runner { self.config.database, self.config.database.path().map_or_else(|| "".to_owned(), |p| p.display().to_string()) ); - info!("⛓ Native runtime: {}", runtime_version); - } - - /// A helper function that runs an `AbstractService` with tokio and stops if the process - /// receives the signal `SIGTERM` or `SIGINT`. It can run a full or a light node depending on - /// the node's configuration. - pub fn run_node( - self, - new_light: impl FnOnce(Configuration) -> sc_service::error::Result, - new_full: impl FnOnce(Configuration) -> sc_service::error::Result, - runtime_version: RuntimeVersion, - ) -> Result<()> - where - SL: AbstractService + Unpin, - SF: AbstractService + Unpin, - { - match self.config.role { - Role::Light => self.run_light_node(new_light, runtime_version), - _ => self.run_full_node(new_full, runtime_version), - } - } - - /// A helper function that runs an `AbstractService` with tokio and stops if the process - /// receives the signal `SIGTERM` or `SIGINT`. It can only run a "full" node and will fail if - /// the node's configuration uses a "light" role. - pub fn run_full_node( - self, - new_full: impl FnOnce(Configuration) -> sc_service::error::Result, - runtime_version: RuntimeVersion, - ) -> Result<()> - where - S: AbstractService + Unpin, - { - if matches!(self.config.role, Role::Light) { - return Err("Light node has been requested but this is not implemented".into()); - } - - self.print_node_infos(runtime_version); - self.run_service_until_exit(new_full) - } - - /// A helper function that runs an `AbstractService` with tokio and stops if the process - /// receives the signal `SIGTERM` or `SIGINT`. It can only run a "light" node and will fail if - /// the node's configuration uses a "full" role. - pub fn run_light_node( - self, - new_light: impl FnOnce(Configuration) -> sc_service::error::Result, - runtime_version: RuntimeVersion, - ) -> Result<()> - where - S: AbstractService + Unpin, - { - if !matches!(self.config.role, Role::Light) { - return Err("Full node has been requested but this is not implemented".into()); - } - - self.print_node_infos(runtime_version); - self.run_service_until_exit(new_light) + info!("⛓ Native runtime: {}", C::native_runtime_version(&self.config.chain_spec)); } /// A helper function that runs a future with tokio and stops if the process receives the signal @@ -257,34 +199,18 @@ impl Runner { } } - fn run_service_until_exit(mut self, service_builder: F) -> Result<()> - where - F: FnOnce(Configuration) -> std::result::Result, - T: AbstractService + Unpin, - { - let service = service_builder(self.config)?; - - // we eagerly drop the service so that the internal exit future is fired, - // but we need to keep holding a reference to the global telemetry guard - // and drop the runtime first. - let _telemetry = service.telemetry(); - - // we hold a reference to the base path so if the base path is a temporary directory it will - // not be deleted before the tokio runtime finish to clean up - let _base_path = service.base_path(); - - { - let f = service.fuse(); - self.tokio_runtime - .block_on(main(f)) - .map_err(|e| e.to_string())?; - } - - // The `service` **must** have been destroyed here for the shutdown signal to propagate - // to all the tasks. Dropping `tokio_runtime` will block the thread until all tasks have - // shut down. - drop(self.tokio_runtime); - + /// A helper function that runs a node with tokio and stops if the process receives the signal + /// `SIGTERM` or `SIGINT`. + pub fn run_node_until_exit( + mut self, + initialise: impl FnOnce(Configuration) -> sc_service::error::Result, + ) -> Result<()> { + self.print_node_infos(); + let mut task_manager = initialise(self.config)?; + self.tokio_runtime.block_on(main(task_manager.future().fuse())) + .map_err(|e| e.to_string())?; + task_manager.terminate(); + drop(task_manager); Ok(()) } diff --git a/client/finality-grandpa/src/light_import.rs b/client/finality-grandpa/src/light_import.rs index b63c6f0bd7c1d..a7c9a655467c7 100644 --- a/client/finality-grandpa/src/light_import.rs +++ b/client/finality-grandpa/src/light_import.rs @@ -573,7 +573,7 @@ pub mod tests { use sp_consensus::{import_queue::CacheKeyId, ForkChoiceStrategy, BlockImport}; use sp_finality_grandpa::AuthorityId; use sp_core::{H256, crypto::Public}; - use sc_client_api::{in_mem::Blockchain as InMemoryAuxStore, StorageProof}; + use sc_client_api::{in_mem::Blockchain as InMemoryAuxStore, StorageProof, BlockBackend}; use substrate_test_runtime_client::runtime::{Block, Header}; use crate::tests::TestApi; use crate::finality_proof::{ diff --git a/client/service/src/builder.rs b/client/service/src/builder.rs index 1fbf301f5b45b..8c96f514ddaee 100644 --- a/client/service/src/builder.rs +++ b/client/service/src/builder.rs @@ -17,21 +17,20 @@ // along with this program. If not, see . use crate::{ - Service, NetworkStatus, NetworkState, error::Error, DEFAULT_PROTOCOL_ID, MallocSizeOfWasm, + NetworkStatus, NetworkState, error::Error, DEFAULT_PROTOCOL_ID, MallocSizeOfWasm, start_rpc_servers, build_network_future, TransactionPoolAdapter, TaskManager, SpawnTaskHandle, status_sinks, metrics::MetricsService, client::{light, Client, ClientConfig}, config::{Configuration, KeystoreConfig, PrometheusConfig, OffchainWorkerConfig}, }; use sc_client_api::{ - self, light::RemoteBlockchain, execution_extensions::ExtensionsFactory, - ExecutorProvider, CallExecutor, ForkBlocks, BadBlocks, CloneableSpawn, UsageProvider, - backend::RemoteBackend, + self, light::RemoteBlockchain, execution_extensions::ExtensionsFactory, ExecutorProvider, + ForkBlocks, BadBlocks, CloneableSpawn, UsageProvider, backend::RemoteBackend, }; use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedSender, TracingUnboundedReceiver}; use sc_chain_spec::get_extension; use sp_consensus::{ - block_validation::{BlockAnnounceValidator, DefaultBlockAnnounceValidator}, + block_validation::{BlockAnnounceValidator, DefaultBlockAnnounceValidator, Chain}, import_queue::ImportQueue, }; use futures::{ @@ -46,9 +45,9 @@ use sc_network::NetworkService; use parking_lot::{Mutex, RwLock}; use sp_runtime::generic::BlockId; use sp_runtime::traits::{ - Block as BlockT, NumberFor, SaturatedConversion, HashFor, Zero, + Block as BlockT, NumberFor, SaturatedConversion, HashFor, Zero, BlockIdTo, }; -use sp_api::ProvideRuntimeApi; +use sp_api::{ProvideRuntimeApi, CallApiAt}; use sc_executor::{NativeExecutor, NativeExecutionDispatch, RuntimeInfo}; use std::{ collections::HashMap, @@ -62,8 +61,15 @@ use prometheus_endpoint::Registry; use sc_client_db::{Backend, DatabaseSettings}; use sp_core::traits::CodeExecutor; use sp_runtime::BuildStorage; -use sc_client_api::execution_extensions::ExecutionExtensions; +use sc_client_api::{ + BlockBackend, BlockchainEvents, + backend::StorageProvider, + proof_provider::ProofProvider, + execution_extensions::ExecutionExtensions +}; use sp_core::storage::Storage; +use sp_blockchain::{HeaderMetadata, HeaderBackend}; +use crate::{ServiceComponents, TelemetryOnConnectSinks, RpcHandlers, NetworkStatusSinks}; pub type BackgroundTask = Pin + Send>>; @@ -878,11 +884,11 @@ pub trait ServiceBuilderCommand { ) -> Result; } -impl +impl ServiceBuilder< TBl, TRtApi, - Client, + TCl, Arc>, TSc, TImpQu, @@ -892,8 +898,12 @@ ServiceBuilder< TRpc, TBackend, > where - Client: ProvideRuntimeApi, - as ProvideRuntimeApi>::Api: + TCl: ProvideRuntimeApi + HeaderMetadata + Chain + + BlockBackend + BlockIdTo + ProofProvider + + HeaderBackend + BlockchainEvents + ExecutorProvider + UsageProvider + + StorageProvider + CallApiAt + + Send + 'static, + >::Api: sp_api::Metadata + sc_offchain::OffchainWorkerApi + sp_transaction_pool::runtime_api::TaggedTransactionQueue + @@ -903,7 +913,6 @@ ServiceBuilder< TBl: BlockT, TRtApi: 'static + Send + Sync, TBackend: 'static + sc_client_api::backend::Backend + Send, - TExec: 'static + CallExecutor + Send + Sync + Clone, TSc: Clone, TImpQu: 'static + ImportQueue, TExPool: MaintainedTransactionPool::Hash> + MallocSizeOfWasm + 'static, @@ -916,26 +925,12 @@ ServiceBuilder< Ok(self) } - fn build_common(self) -> Result, - TSc, - NetworkStatus, - NetworkService::Hash>, - TExPool, - sc_offchain::OffchainWorkers< - Client, - TBackend::OffchainStorage, - TBl - >, - >, Error> - where TExec: CallExecutor, - { + fn build_common(self) -> Result, Error> { let ServiceBuilder { marker: _, mut config, client, - task_manager, + mut task_manager, fetcher: on_demand, backend, keystore, @@ -949,17 +944,14 @@ ServiceBuilder< block_announce_validator_builder, } = self; + let chain_info = client.usage_info().chain; + sp_session::generate_initial_session_keys( client.clone(), - &BlockId::Hash(client.chain_info().best_hash), + &BlockId::Hash(chain_info.best_hash), config.dev_key_seed.clone().map(|s| vec![s]).unwrap_or_default(), )?; - // A side-channel for essential tasks to communicate shutdown. - let (essential_failed_tx, essential_failed_rx) = tracing_unbounded("mpsc_essential_tasks"); - - let chain_info = client.chain_info(); - info!("📦 Highest known block at #{}", chain_info.best_number); telemetry!( SUBSTRATE_INFO; @@ -968,15 +960,16 @@ ServiceBuilder< "best" => ?chain_info.best_hash ); - let spawn_handle = task_manager.spawn_handle(); let (system_rpc_tx, system_rpc_rx) = tracing_unbounded("mpsc_system_rpc"); let (network, network_status_sinks, network_future) = build_network( - &config, client.clone(), transaction_pool.clone(), Clone::clone(&spawn_handle), on_demand.clone(), - block_announce_validator_builder, finality_proof_request_builder, finality_proof_provider, - system_rpc_rx, import_queue + &config, client.clone(), transaction_pool.clone(), task_manager.spawn_handle(), + on_demand.clone(), block_announce_validator_builder, finality_proof_request_builder, + finality_proof_provider, system_rpc_rx, import_queue )?; + let spawn_handle = task_manager.spawn_handle(); + // The network worker is responsible for gathering all network messages and processing // them. This is quite a heavy task, and at the time of the writing of this comment it // frequently happens that this future takes several seconds or in some situations @@ -1064,7 +1057,7 @@ ServiceBuilder< ); let rpc = start_rpc_servers(&config, gen_handler)?; // This is used internally, so don't restrict access to unsafe RPC - let rpc_handlers = gen_handler(sc_rpc::DenyUnsafe::No); + let rpc_handlers = Arc::new(RpcHandlers(gen_handler(sc_rpc::DenyUnsafe::No))); let telemetry_connection_sinks: Arc>>> = Default::default(); @@ -1110,52 +1103,34 @@ ServiceBuilder< config.informant_output_format, )); - Ok(Service { + task_manager.keep_alive((telemetry, config.base_path, rpc, rpc_handlers.clone())); + + Ok(ServiceComponents { client, task_manager, network, - network_status_sinks, select_chain, transaction_pool, - essential_failed_tx, - essential_failed_rx, rpc_handlers, - _rpc: rpc, - _telemetry: telemetry, - _offchain_workers: offchain_workers, - _telemetry_on_connect_sinks: telemetry_connection_sinks.clone(), keystore, - marker: PhantomData::, + offchain_workers, + telemetry_on_connect_sinks: TelemetryOnConnectSinks(telemetry_connection_sinks), + network_status_sinks: NetworkStatusSinks::new(network_status_sinks), prometheus_registry: config.prometheus_config.map(|config| config.registry), - _base_path: config.base_path.map(Arc::new), }) } /// Builds the light service. - pub fn build_light(self) -> Result, - TSc, - NetworkStatus, - NetworkService::Hash>, - TExPool, - sc_offchain::OffchainWorkers< - Client, - TBackend::OffchainStorage, - TBl - >, - >, Error> - where TExec: CallExecutor, - { + pub fn build_light(self) -> Result, Error> { self.build_common() } } -impl +impl ServiceBuilder< TBl, TRtApi, - Client, + TCl, Arc>, TSc, TImpQu, @@ -1165,8 +1140,12 @@ ServiceBuilder< TRpc, TBackend, > where - Client: ProvideRuntimeApi, - as ProvideRuntimeApi>::Api: + TCl: ProvideRuntimeApi + HeaderMetadata + Chain + + BlockBackend + BlockIdTo + ProofProvider + + HeaderBackend + BlockchainEvents + ExecutorProvider + UsageProvider + + StorageProvider + CallApiAt + + Send + 'static, + >::Api: sp_api::Metadata + sc_offchain::OffchainWorkerApi + sp_transaction_pool::runtime_api::TaggedTransactionQueue + @@ -1176,7 +1155,6 @@ ServiceBuilder< TBl: BlockT, TRtApi: 'static + Send + Sync, TBackend: 'static + sc_client_api::backend::Backend + Send, - TExec: 'static + CallExecutor + Send + Sync + Clone, TSc: Clone, TImpQu: 'static + ImportQueue, TExPool: MaintainedTransactionPool::Hash> + @@ -1187,21 +1165,7 @@ ServiceBuilder< { /// Builds the full service. - pub fn build_full(self) -> Result, - TSc, - NetworkStatus, - NetworkService::Hash>, - TExPool, - sc_offchain::OffchainWorkers< - Client, - TBackend::OffchainStorage, - TBl - >, - >, Error> - where TExec: CallExecutor, - { + pub fn build_full(self) -> Result, Error> { // make transaction pool available for off-chain runtime calls. self.client.execution_extensions() .register_transaction_pool(Arc::downgrade(&self.transaction_pool) as _); @@ -1233,18 +1197,16 @@ async fn transaction_notifications( } // Periodically notify the telemetry. -async fn telemetry_periodic_send( - client: Arc>, +async fn telemetry_periodic_send( + client: Arc, transaction_pool: Arc, mut metrics_service: MetricsService, network_status_sinks: Arc, NetworkState)>>> ) where TBl: BlockT, - TExec: CallExecutor, - Client: ProvideRuntimeApi, + TCl: ProvideRuntimeApi + UsageProvider, TExPool: MaintainedTransactionPool::Hash>, - TBackend: sc_client_api::backend::Backend, { let (state_tx, state_rx) = tracing_unbounded::<(NetworkStatus<_>, NetworkState)>("mpsc_netstat1"); network_status_sinks.lock().push(std::time::Duration::from_millis(5000), state_tx); @@ -1322,11 +1284,11 @@ fn build_telemetry( (telemetry, future) } -fn gen_handler( +fn gen_handler( deny_unsafe: sc_rpc::DenyUnsafe, config: &Configuration, task_manager: &TaskManager, - client: Arc>, + client: Arc, transaction_pool: Arc, keystore: Arc>, on_demand: Option>>, @@ -1337,13 +1299,14 @@ fn gen_handler( ) -> jsonrpc_pubsub::PubSubHandler where TBl: BlockT, - TExec: CallExecutor + Send + Sync + 'static, - TRtApi: Send + Sync + 'static, - Client: ProvideRuntimeApi, + TCl: ProvideRuntimeApi + BlockchainEvents + HeaderBackend + + HeaderMetadata + ExecutorProvider + + CallApiAt + ProofProvider + + StorageProvider + BlockBackend + Send + Sync + 'static, TExPool: MaintainedTransactionPool::Hash> + 'static, TBackend: sc_client_api::backend::Backend + 'static, TRpc: sc_rpc::RpcExtension, - as ProvideRuntimeApi>::Api: + >::Api: sp_session::SessionKeys + sp_api::Metadata, { @@ -1412,15 +1375,14 @@ fn gen_handler( )) } -fn build_network( +fn build_network( config: &Configuration, - client: Arc>, + client: Arc, transaction_pool: Arc, spawn_handle: SpawnTaskHandle, on_demand: Option>>, block_announce_validator_builder: Option>) -> - Box + Send> + Send + dyn FnOnce(Arc) -> Box + Send> + Send >>, finality_proof_request_builder: Option>, finality_proof_provider: Option>>, @@ -1436,11 +1398,10 @@ fn build_network( > where TBl: BlockT, - TExec: CallExecutor + Send + Sync + 'static, - TRtApi: Send + Sync + 'static, - Client: ProvideRuntimeApi, + TCl: ProvideRuntimeApi + HeaderMetadata + Chain + + BlockBackend + BlockIdTo + ProofProvider + + HeaderBackend + BlockchainEvents + 'static, TExPool: MaintainedTransactionPool::Hash> + 'static, - TBackend: sc_client_api::backend::Backend + 'static, TImpQu: ImportQueue + 'static, { let transaction_pool_adapter = Arc::new(TransactionPoolAdapter { diff --git a/client/service/src/client/client.rs b/client/service/src/client/client.rs index 922f34b656813..2f101465d516f 100644 --- a/client/service/src/client/client.rs +++ b/client/service/src/client/client.rs @@ -353,13 +353,6 @@ impl Client where self.executor.runtime_version(id) } - /// Get block hash by number. - pub fn block_hash(&self, - block_number: <::Header as HeaderT>::Number - ) -> sp_blockchain::Result> { - self.backend.blockchain().hash(block_number) - } - /// Reads given header and generates CHT-based header proof for CHT of given size. pub fn header_proof_with_cht_size( &self, @@ -1925,6 +1918,10 @@ impl BlockBackend for Client fn justification(&self, id: &BlockId) -> sp_blockchain::Result> { self.backend.blockchain().justification(*id) } + + fn block_hash(&self, number: NumberFor) -> sp_blockchain::Result> { + self.backend.blockchain().hash(number) + } } impl backend::AuxStore for Client diff --git a/client/service/src/lib.rs b/client/service/src/lib.rs index 036c95777323e..c3c8f60e689ad 100644 --- a/client/service/src/lib.rs +++ b/client/service/src/lib.rs @@ -36,22 +36,15 @@ mod client; mod task_manager; use std::{io, pin::Pin}; -use std::marker::PhantomData; use std::net::SocketAddr; use std::collections::HashMap; use std::time::Duration; use wasm_timer::Instant; -use std::task::{Poll, Context}; +use std::task::Poll; use parking_lot::Mutex; -use client::Client; -use futures::{ - Future, FutureExt, Stream, StreamExt, - compat::*, - sink::SinkExt, - task::{Spawn, FutureObj, SpawnError}, -}; -use sc_network::{NetworkService, NetworkStatus, network_state::NetworkState, PeerId}; +use futures::{Future, FutureExt, Stream, StreamExt, compat::*}; +use sc_network::{NetworkStatus, network_state::NetworkState, PeerId}; use log::{log, warn, debug, error, Level}; use codec::{Encode, Decode}; use sp_runtime::generic::BlockId; @@ -84,14 +77,9 @@ pub use sc_network::config::{ TransactionImportFuture, }; pub use sc_tracing::TracingReceiver; -pub use task_manager::{SpawnEssentialTaskHandle, SpawnTaskHandle}; -use task_manager::TaskManager; -use sp_blockchain::{HeaderBackend, HeaderMetadata}; -use sp_api::{ApiExt, ConstructRuntimeApi, ApiErrorExt}; -use sc_client_api::{ - Backend as BackendT, BlockchainEvents, CallExecutor, UsageProvider, -}; -use sp_block_builder::BlockBuilder; +pub use task_manager::SpawnTaskHandle; +pub use task_manager::TaskManager; +use sc_client_api::{Backend, BlockchainEvents}; const DEFAULT_PROTOCOL_ID: &str = "sup"; @@ -105,88 +93,10 @@ impl MallocSizeOfWasm for T {} #[cfg(target_os = "unknown")] impl MallocSizeOfWasm for T {} -/// Substrate service. -pub struct Service { - client: Arc, - task_manager: TaskManager, - select_chain: Option, - network: Arc, - // Sinks to propagate network status updates. - // For each element, every time the `Interval` fires we push an element on the sender. - network_status_sinks: Arc>>, - transaction_pool: Arc, - // Send a signal when a spawned essential task has concluded. The next time - // the service future is polled it should complete with an error. - essential_failed_tx: TracingUnboundedSender<()>, - // A receiver for spawned essential-tasks concluding. - essential_failed_rx: TracingUnboundedReceiver<()>, - rpc_handlers: sc_rpc_server::RpcHandler, - _rpc: Box, - _telemetry: Option, - _telemetry_on_connect_sinks: Arc>>>, - _offchain_workers: Option>, - keystore: sc_keystore::KeyStorePtr, - marker: PhantomData, - prometheus_registry: Option, - // The base path is kept here because it can be a temporary directory which will be deleted - // when dropped - _base_path: Option>, -} - -impl Unpin for Service {} - -/// Abstraction over a Substrate service. -pub trait AbstractService: Future> + Send + Unpin + Spawn + 'static { - /// Type of block of this chain. - type Block: BlockT; - /// Backend storage for the client. - type Backend: 'static + BackendT; - /// How to execute calls towards the runtime. - type CallExecutor: 'static + CallExecutor + Send + Sync + Clone; - /// API that the runtime provides. - type RuntimeApi: Send + Sync; - /// Chain selection algorithm. - type SelectChain: sp_consensus::SelectChain; - /// Transaction pool. - type TransactionPool: TransactionPool + MallocSizeOfWasm; - /// The generic Client type, the bounds here are the ones specifically required by - /// internal crates like sc_informant. - type Client: - HeaderMetadata + UsageProvider - + BlockchainEvents + HeaderBackend + Send + Sync; - - /// Get event stream for telemetry connection established events. - fn telemetry_on_connect_stream(&self) -> TracingUnboundedReceiver<()>; - - /// return a shared instance of Telemetry (if enabled) - fn telemetry(&self) -> Option; - - /// Spawns a task in the background that runs the future passed as parameter. - /// - /// Information about this task will be reported to Prometheus. - /// - /// The task name is a `&'static str` as opposed to a `String`. The reason for that is that - /// in order to avoid memory consumption issues with the Prometheus metrics, the set of - /// possible task names has to be bounded. - #[deprecated(note = "Use `spawn_task_handle().spawn() instead.")] - fn spawn_task(&self, name: &'static str, task: impl Future + Send + 'static); - - /// Spawns a task in the background that runs the future passed as - /// parameter. The given task is considered essential, i.e. if it errors we - /// trigger a service exit. - #[deprecated(note = "Use `spawn_essential_task_handle().spawn() instead.")] - fn spawn_essential_task(&self, name: &'static str, task: impl Future + Send + 'static); - - /// Returns a handle for spawning essential tasks. Any task spawned through this handle is - /// considered essential, i.e. if it errors we trigger a service exit. - fn spawn_essential_task_handle(&self) -> SpawnEssentialTaskHandle; - - /// Returns a handle for spawning tasks. - fn spawn_task_handle(&self) -> SpawnTaskHandle; - - /// Returns the keystore that stores keys. - fn keystore(&self) -> sc_keystore::KeyStorePtr; +/// RPC handlers that can perform RPC queries. +pub struct RpcHandlers(sc_rpc_server::RpcHandler); +impl RpcHandlers { /// Starts an RPC query. /// /// The query is passed as a string and must be a JSON text similar to what an HTTP client @@ -196,178 +106,76 @@ pub trait AbstractService: Future> + Send + Unpin + S /// /// If the request subscribes you to events, the `Sender` in the `RpcSession` object is used to /// send back spontaneous events. - fn rpc_query(&self, mem: &RpcSession, request: &str) -> Pin> + Send>>; - - /// Get shared client instance. - fn client(&self) -> Arc; - - /// Get clone of select chain. - fn select_chain(&self) -> Option; - - /// Get shared network instance. - fn network(&self) - -> Arc::Hash>>; - - /// Returns a receiver that periodically receives a status of the network. - fn network_status(&self, interval: Duration) -> TracingUnboundedReceiver<(NetworkStatus, NetworkState)>; - - /// Get shared transaction pool instance. - fn transaction_pool(&self) -> Arc; - - /// Get a handle to a future that will resolve on exit. - #[deprecated(note = "Use `spawn_task`/`spawn_essential_task` instead, those functions will attach on_exit signal.")] - fn on_exit(&self) -> ::exit_future::Exit; - - /// Get the prometheus metrics registry, if available. - fn prometheus_registry(&self) -> Option; - - /// Get a clone of the base_path - fn base_path(&self) -> Option>; -} - -impl AbstractService for - Service, TSc, NetworkStatus, - NetworkService, TExPool, TOc> -where - TBl: BlockT, - TBackend: 'static + BackendT, - TExec: 'static + CallExecutor + Send + Sync + Clone, - TRtApi: 'static + Send + Sync + ConstructRuntimeApi>, - >>::RuntimeApi: - sp_api::Core - + ApiExt - + ApiErrorExt - + BlockBuilder, - TSc: sp_consensus::SelectChain + 'static + Clone + Send + Unpin, - TExPool: 'static + TransactionPool + MallocSizeOfWasm, - TOc: 'static + Send + Sync, -{ - type Block = TBl; - type Backend = TBackend; - type CallExecutor = TExec; - type RuntimeApi = TRtApi; - type SelectChain = TSc; - type TransactionPool = TExPool; - type Client = Client; - - fn telemetry_on_connect_stream(&self) -> TracingUnboundedReceiver<()> { - let (sink, stream) = tracing_unbounded("mpsc_telemetry_on_connect"); - self._telemetry_on_connect_sinks.lock().push(sink); - stream - } - - fn telemetry(&self) -> Option { - self._telemetry.clone() - } - - fn keystore(&self) -> sc_keystore::KeyStorePtr { - self.keystore.clone() - } - - fn spawn_task(&self, name: &'static str, task: impl Future + Send + 'static) { - self.task_manager.spawn(name, task) - } - - fn spawn_essential_task(&self, name: &'static str, task: impl Future + Send + 'static) { - let mut essential_failed = self.essential_failed_tx.clone(); - let essential_task = std::panic::AssertUnwindSafe(task) - .catch_unwind() - .map(move |_| { - error!("Essential task `{}` failed. Shutting down service.", name); - let _ = essential_failed.send(()); - }); - - let _ = self.spawn_task_handle().spawn(name, essential_task); - } - - fn spawn_task_handle(&self) -> SpawnTaskHandle { - self.task_manager.spawn_handle() - } - - fn spawn_essential_task_handle(&self) -> SpawnEssentialTaskHandle { - SpawnEssentialTaskHandle::new( - self.essential_failed_tx.clone(), - self.task_manager.spawn_handle(), - ) - } - - fn rpc_query(&self, mem: &RpcSession, request: &str) -> Pin> + Send>> { - Box::pin( - self.rpc_handlers.handle_request(request, mem.metadata.clone()) - .compat() - .map(|res| res.expect("this should never fail")) - ) - } - - fn client(&self) -> Arc { - self.client.clone() - } - - fn select_chain(&self) -> Option { - self.select_chain.clone() + pub fn rpc_query(&self, mem: &RpcSession, request: &str) + -> Pin> + Send>> { + self.0.handle_request(request, mem.metadata.clone()) + .compat() + .map(|res| res.expect("this should never fail")) + .boxed() } +} - fn network(&self) - -> Arc::Hash>> - { - self.network.clone() +/// Sinks to propagate network status updates. +/// For each element, every time the `Interval` fires we push an element on the sender. +pub struct NetworkStatusSinks( + Arc, NetworkState)>>>, +); + +impl NetworkStatusSinks { + fn new( + sinks: Arc, NetworkState)>>> + ) -> Self { + Self(sinks) } - fn network_status(&self, interval: Duration) -> TracingUnboundedReceiver<(NetworkStatus, NetworkState)> { + /// Returns a receiver that periodically receives a status of the network. + pub fn network_status(&self, interval: Duration) + -> TracingUnboundedReceiver<(NetworkStatus, NetworkState)> { let (sink, stream) = tracing_unbounded("mpsc_network_status"); - self.network_status_sinks.lock().push(interval, sink); + self.0.lock().push(interval, sink); stream } - - fn transaction_pool(&self) -> Arc { - self.transaction_pool.clone() - } - - fn on_exit(&self) -> exit_future::Exit { - self.task_manager.on_exit() - } - - fn prometheus_registry(&self) -> Option { - self.prometheus_registry.clone() - } - - fn base_path(&self) -> Option> { - self._base_path.clone() - } } -impl Future for - Service -{ - type Output = Result<(), Error>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { - let this = Pin::into_inner(self); +/// Sinks to propagate telemetry connection established events. +pub struct TelemetryOnConnectSinks(pub Arc>>>); - match Pin::new(&mut this.essential_failed_rx).poll_next(cx) { - Poll::Pending => {}, - Poll::Ready(_) => { - // Ready(None) should not be possible since we hold a live - // sender. - return Poll::Ready(Err(Error::Other("Essential task failed.".into()))); - } - } - - // The service future never ends. - Poll::Pending +impl TelemetryOnConnectSinks { + /// Get event stream for telemetry connection established events. + pub fn on_connect_stream(&self) -> TracingUnboundedReceiver<()> { + let (sink, stream) =tracing_unbounded("mpsc_telemetry_on_connect"); + self.0.lock().push(sink); + stream } } -impl Spawn for - Service -{ - fn spawn_obj( - &self, - future: FutureObj<'static, ()> - ) -> Result<(), SpawnError> { - self.task_manager.spawn_handle().spawn("unnamed", future); - Ok(()) - } +/// The individual components of the chain, built by the service builder. You are encouraged to +/// deconstruct this into its fields. +pub struct ServiceComponents, TSc, TExPool, TCl> { + /// A blockchain client. + pub client: Arc, + /// A shared transaction pool instance. + pub transaction_pool: Arc, + /// The chain task manager. + pub task_manager: TaskManager, + /// A keystore that stores keys. + pub keystore: sc_keystore::KeyStorePtr, + /// A shared network instance. + pub network: Arc::Hash>>, + /// RPC handlers that can perform RPC queries. + pub rpc_handlers: Arc, + /// A shared instance of the chain selection algorithm. + pub select_chain: Option, + /// Sinks to propagate network status updates. + pub network_status_sinks: NetworkStatusSinks, + /// A prometheus metrics registry, (if enabled). + pub prometheus_registry: Option, + /// Shared Telemetry connection sinks, + pub telemetry_on_connect_sinks: TelemetryOnConnectSinks, + /// A shared offchain workers instance. + pub offchain_workers: Option>>, } /// Builds a never-ending future that continuously polls the network. diff --git a/client/service/src/task_manager.rs b/client/service/src/task_manager.rs index 544d76fc472cc..b6cc26005570a 100644 --- a/client/service/src/task_manager.rs +++ b/client/service/src/task_manager.rs @@ -13,14 +13,15 @@ //! Substrate service tasks management module. -use std::{panic, result::Result}; +use std::{panic, result::Result, pin::Pin}; use exit_future::Signal; use log::debug; use futures::{ - Future, FutureExt, + Future, FutureExt, StreamExt, future::{select, Either, BoxFuture}, compat::*, task::{Spawn, FutureObj, SpawnError}, + sink::SinkExt, }; use prometheus_endpoint::{ exponential_buckets, register, @@ -28,8 +29,8 @@ use prometheus_endpoint::{ CounterVec, HistogramOpts, HistogramVec, Opts, Registry, U64 }; use sc_client_api::CloneableSpawn; -use sp_utils::mpsc::TracingUnboundedSender; -use crate::config::{TaskExecutor, TaskType}; +use sp_utils::mpsc::{TracingUnboundedSender, TracingUnboundedReceiver, tracing_unbounded}; +use crate::{config::{TaskExecutor, TaskType}, Error}; mod prometheus_future; @@ -192,7 +193,6 @@ impl SpawnEssentialTaskHandle { task: impl Future + Send + 'static, task_type: TaskType, ) { - use futures::sink::SinkExt; let mut essential_failed = self.essential_failed_tx.clone(); let essential_task = std::panic::AssertUnwindSafe(task) .catch_unwind() @@ -216,6 +216,13 @@ pub struct TaskManager { executor: TaskExecutor, /// Prometheus metric where to report the polling times. metrics: Option, + /// Send a signal when a spawned essential task has concluded. The next time + /// the service future is polled it should complete with an error. + essential_failed_tx: TracingUnboundedSender<()>, + /// A receiver for spawned essential-tasks concluding. + essential_failed_rx: TracingUnboundedReceiver<()>, + /// Things to keep alive until the task manager is dropped. + keep_alive: Box, } impl TaskManager { @@ -226,6 +233,8 @@ impl TaskManager { prometheus_registry: Option<&Registry> ) -> Result { let (signal, on_exit) = exit_future::signal(); + // A side-channel for essential tasks to communicate shutdown. + let (essential_failed_tx, essential_failed_rx) = tracing_unbounded("mpsc_essential_tasks"); let metrics = prometheus_registry.map(Metrics::register).transpose()?; @@ -234,17 +243,15 @@ impl TaskManager { signal: Some(signal), executor, metrics, + essential_failed_tx, + essential_failed_rx, + keep_alive: Box::new(()), }) } - /// Spawn background/async task, which will be aware on exit signal. - /// - /// See also the documentation of [`SpawnTaskHandler::spawn`]. - pub(super) fn spawn(&self, name: &'static str, task: impl Future + Send + 'static) { - self.spawn_handle().spawn(name, task) - } - pub(super) fn spawn_handle(&self) -> SpawnTaskHandle { + /// Get a handle for spawning tasks. + pub fn spawn_handle(&self) -> SpawnTaskHandle { SpawnTaskHandle { on_exit: self.on_exit.clone(), executor: self.executor.clone(), @@ -252,18 +259,37 @@ impl TaskManager { } } - /// Clone on exit signal. - pub(super) fn on_exit(&self) -> exit_future::Exit { - self.on_exit.clone() + /// Get a handle for spawning essential tasks. + pub fn spawn_essential_handle(&self) -> SpawnEssentialTaskHandle { + SpawnEssentialTaskHandle::new(self.essential_failed_tx.clone(), self.spawn_handle()) + } + + /// Return a future that will end if an essential task fails. + pub fn future<'a>(&'a mut self) -> Pin> + Send + 'a>> { + Box::pin(async move { + self.essential_failed_rx.next().await; + + Err(Error::Other("Essential task failed.".into())) + }) + } + + /// Signal to terminate all the running tasks. + pub fn terminate(&mut self) { + if let Some(signal) = self.signal.take() { + let _ = signal.fire(); + } + } + + /// Set what the task manager should keep alivei + pub(super) fn keep_alive(&mut self, to_keep_alive: T) { + self.keep_alive = Box::new(to_keep_alive); } } impl Drop for TaskManager { fn drop(&mut self) { debug!(target: "service", "Tasks manager shutdown"); - if let Some(signal) = self.signal.take() { - let _ = signal.fire(); - } + self.terminate(); } } diff --git a/client/service/test/src/client/light.rs b/client/service/test/src/client/light.rs index 994d846c6a088..e72c290d43bbe 100644 --- a/client/service/test/src/client/light.rs +++ b/client/service/test/src/client/light.rs @@ -40,7 +40,13 @@ use sp_api::{InitializeBlock, StorageTransactionCache, ProofRecorder, OffchainOv use sp_consensus::{BlockOrigin}; use sc_executor::{NativeExecutor, WasmExecutionMethod, RuntimeVersion, NativeVersion}; use sp_core::{H256, tasks::executor as tasks_executor, NativeOrEncoded}; -use sc_client_api::{blockchain::Info, backend::NewBlockState, Backend as ClientBackend, ProofProvider, in_mem::{Backend as InMemBackend, Blockchain as InMemoryBlockchain}, AuxStore, Storage, CallExecutor, cht, ExecutionStrategy, StorageProof, BlockImportOperation, RemoteCallRequest, StorageProvider, ChangesProof, RemoteBodyRequest, RemoteReadRequest, RemoteChangesRequest, FetchChecker, RemoteReadChildRequest, RemoteHeaderRequest}; +use sc_client_api::{ + blockchain::Info, backend::NewBlockState, Backend as ClientBackend, ProofProvider, + in_mem::{Backend as InMemBackend, Blockchain as InMemoryBlockchain}, + AuxStore, Storage, CallExecutor, cht, ExecutionStrategy, StorageProof, BlockImportOperation, + RemoteCallRequest, StorageProvider, ChangesProof, RemoteBodyRequest, RemoteReadRequest, + RemoteChangesRequest, FetchChecker, RemoteReadChildRequest, RemoteHeaderRequest, BlockBackend, +}; use sp_externalities::Extensions; use sc_block_builder::BlockBuilderProvider; use sp_blockchain::{ diff --git a/client/service/test/src/lib.rs b/client/service/test/src/lib.rs index 4ff89f5319ff4..5a676e5263c8a 100644 --- a/client/service/test/src/lib.rs +++ b/client/service/test/src/lib.rs @@ -19,18 +19,18 @@ //! Service integration test utils. use std::iter; -use std::sync::{Arc, Mutex, MutexGuard}; +use std::sync::Arc; use std::net::Ipv4Addr; use std::pin::Pin; use std::time::Duration; -use log::info; +use log::{info, debug}; use futures01::{Future, Stream, Poll}; use futures::{FutureExt as _, TryFutureExt as _}; use tempfile::TempDir; use tokio::{runtime::Runtime, prelude::FutureExt}; use tokio::timer::Interval; use sc_service::{ - AbstractService, + TaskManager, GenericChainSpec, ChainSpecExtension, Configuration, @@ -39,12 +39,15 @@ use sc_service::{ Role, Error, TaskExecutor, + client::Client, }; use sp_blockchain::HeaderBackend; use sc_network::{multiaddr, Multiaddr}; use sc_network::config::{NetworkConfiguration, TransportConfig}; use sp_runtime::{generic::BlockId, traits::Block as BlockT}; use sp_transaction_pool::TransactionPool; +use sc_client_api::{Backend, CallExecutor}; +use parking_lot::Mutex; #[cfg(test)] mod client; @@ -54,47 +57,100 @@ const MAX_WAIT_TIME: Duration = Duration::from_secs(60 * 3); struct TestNet { runtime: Runtime, - authority_nodes: Vec<(usize, SyncService, U, Multiaddr)>, - full_nodes: Vec<(usize, SyncService, U, Multiaddr)>, - light_nodes: Vec<(usize, SyncService, Multiaddr)>, + authority_nodes: Vec<(usize, F, U, Multiaddr)>, + full_nodes: Vec<(usize, F, U, Multiaddr)>, + light_nodes: Vec<(usize, L, Multiaddr)>, chain_spec: GenericChainSpec, base_port: u16, nodes: usize, } -/// Wraps around an `Arc` and implements `Future`. -pub struct SyncService(Arc>); +pub trait TestNetNode: Clone + Future + Send + 'static { + type Block: BlockT; + type Backend: Backend; + type Executor: CallExecutor + Send + Sync; + type RuntimeApi: Send + Sync; + type TransactionPool: TransactionPool; -impl SyncService { - pub fn get(&self) -> MutexGuard { - self.0.lock().unwrap() - } + fn client(&self) -> Arc>; + fn transaction_pool(&self) -> Arc; + fn network(&self) -> Arc::Hash>>; } -impl Clone for SyncService { - fn clone(&self) -> Self { - Self(self.0.clone()) +pub struct TestNetComponents { + task_manager: Arc>, + client: Arc>, + transaction_pool: Arc, + network: Arc::Hash>>, +} + +impl +TestNetComponents { + pub fn new( + task_manager: TaskManager, + client: Arc>, + network: Arc::Hash>>, + transaction_pool: Arc, + ) -> Self { + Self { + client, transaction_pool, network, + task_manager: Arc::new(Mutex::new(task_manager)), + } } } -impl From for SyncService { - fn from(service: T) -> Self { - SyncService(Arc::new(Mutex::new(service))) + +impl Clone for +TestNetComponents { + fn clone(&self) -> Self { + Self { + task_manager: self.task_manager.clone(), + client: self.client.clone(), + transaction_pool: self.transaction_pool.clone(), + network: self.network.clone(), + } } } -impl> + Unpin> Future for SyncService { +impl Future for + TestNetComponents +{ type Item = (); type Error = sc_service::Error; fn poll(&mut self) -> Poll { - let mut f = self.0.lock().unwrap(); - futures::compat::Compat::new(&mut *f).poll() + futures::compat::Compat::new(&mut self.task_manager.lock().future()).poll() + } +} + +impl TestNetNode for +TestNetComponents + where + TBl: BlockT, + TBackend: sc_client_api::Backend + Send + Sync + 'static, + TExec: CallExecutor + Send + Sync + 'static, + TRtApi: Send + Sync + 'static, + TExPool: TransactionPool + Send + Sync + 'static, +{ + type Block = TBl; + type Backend = TBackend; + type Executor = TExec; + type RuntimeApi = TRtApi; + type TransactionPool = TExPool; + + fn client(&self) -> Arc> { + self.client.clone() + } + fn transaction_pool(&self) -> Arc { + self.transaction_pool.clone() + } + fn network(&self) -> Arc::Hash>> { + self.network.clone() } } impl TestNet -where F: Send + 'static, L: Send +'static, U: Clone + Send + 'static +where F: Clone + Send + 'static, L: Clone + Send +'static, U: Clone + Send + 'static { pub fn run_until_all_full( &mut self, @@ -102,8 +158,8 @@ where F: Send + 'static, L: Send +'static, U: Clone + Send + 'static light_predicate: LP, ) where - FP: Send + Fn(usize, &SyncService) -> bool + 'static, - LP: Send + Fn(usize, &SyncService) -> bool + 'static, + FP: Send + Fn(usize, &F) -> bool + 'static, + LP: Send + Fn(usize, &L) -> bool + 'static, { let full_nodes = self.full_nodes.clone(); let light_nodes = self.light_nodes.clone(); @@ -217,8 +273,8 @@ fn node_config TestNet where - F: AbstractService, - L: AbstractService, + F: TestNetNode, + L: TestNetNode, E: ChainSpecExtension + Clone + 'static + Send, G: RuntimeGenesis + 'static, { @@ -276,10 +332,9 @@ impl TestNet where ); let addr = node_config.network.listen_addresses.iter().next().unwrap().clone(); let (service, user_data) = authority(node_config).expect("Error creating test node service"); - let service = SyncService::from(service); executor.spawn(service.clone().map_err(|_| ())); - let addr = addr.with(multiaddr::Protocol::P2p(service.get().network().local_peer_id().clone().into())); + let addr = addr.with(multiaddr::Protocol::P2p(service.network().local_peer_id().clone().into())); self.authority_nodes.push((self.nodes, service, user_data, addr)); self.nodes += 1; } @@ -296,10 +351,9 @@ impl TestNet where ); let addr = node_config.network.listen_addresses.iter().next().unwrap().clone(); let (service, user_data) = full(node_config).expect("Error creating test node service"); - let service = SyncService::from(service); executor.spawn(service.clone().map_err(|_| ())); - let addr = addr.with(multiaddr::Protocol::P2p(service.get().network().local_peer_id().clone().into())); + let addr = addr.with(multiaddr::Protocol::P2p(service.network().local_peer_id().clone().into())); self.full_nodes.push((self.nodes, service, user_data, addr)); self.nodes += 1; } @@ -315,10 +369,10 @@ impl TestNet where &temp, ); let addr = node_config.network.listen_addresses.iter().next().unwrap().clone(); - let service = SyncService::from(light(node_config).expect("Error creating test node service")); + let service = light(node_config).expect("Error creating test node service"); executor.spawn(service.clone().map_err(|_| ())); - let addr = addr.with(multiaddr::Protocol::P2p(service.get().network().local_peer_id().clone().into())); + let addr = addr.with(multiaddr::Protocol::P2p(service.network().local_peer_id().clone().into())); self.light_nodes.push((self.nodes, service, addr)); self.nodes += 1; } @@ -337,9 +391,9 @@ pub fn connectivity( E: ChainSpecExtension + Clone + 'static + Send, G: RuntimeGenesis + 'static, Fb: Fn(Configuration) -> Result, - F: AbstractService, + F: TestNetNode, Lb: Fn(Configuration) -> Result, - L: AbstractService, + L: TestNetNode, { const NUM_FULL_NODES: usize = 5; const NUM_LIGHT_NODES: usize = 5; @@ -363,19 +417,25 @@ pub fn connectivity( info!("Checking star topology"); let first_address = network.full_nodes[0].3.clone(); for (_, service, _, _) in network.full_nodes.iter().skip(1) { - service.get().network().add_reserved_peer(first_address.to_string()) + service.network().add_reserved_peer(first_address.to_string()) .expect("Error adding reserved peer"); } for (_, service, _) in network.light_nodes.iter() { - service.get().network().add_reserved_peer(first_address.to_string()) + service.network().add_reserved_peer(first_address.to_string()) .expect("Error adding reserved peer"); } network.run_until_all_full( - move |_index, service| service.get().network().num_connected() - == expected_full_connections, - move |_index, service| service.get().network().num_connected() - == expected_light_connections, + move |_index, service| { + let connected = service.network().num_connected(); + debug!("Got {}/{} full connections...", connected, expected_full_connections); + connected == expected_full_connections + }, + move |_index, service| { + let connected = service.network().num_connected(); + debug!("Got {}/{} light connections...", connected, expected_light_connections); + connected == expected_light_connections + }, ); network.runtime @@ -404,24 +464,30 @@ pub fn connectivity( for i in 0..max_nodes { if i != 0 { if let Some((_, service, _, node_id)) = network.full_nodes.get(i) { - service.get().network().add_reserved_peer(address.to_string()) + service.network().add_reserved_peer(address.to_string()) .expect("Error adding reserved peer"); address = node_id.clone(); } } if let Some((_, service, node_id)) = network.light_nodes.get(i) { - service.get().network().add_reserved_peer(address.to_string()) + service.network().add_reserved_peer(address.to_string()) .expect("Error adding reserved peer"); address = node_id.clone(); } } network.run_until_all_full( - move |_index, service| service.get().network().num_connected() - == expected_full_connections, - move |_index, service| service.get().network().num_connected() - == expected_light_connections, + move |_index, service| { + let connected = service.network().num_connected(); + debug!("Got {}/{} full connections...", connected, expected_full_connections); + connected == expected_full_connections + }, + move |_index, service| { + let connected = service.network().num_connected(); + debug!("Got {}/{} light connections...", connected, expected_light_connections); + connected == expected_light_connections + }, ); } temp.close().expect("Error removing temp dir"); @@ -436,9 +502,9 @@ pub fn sync( mut extrinsic_factory: ExF ) where Fb: Fn(Configuration) -> Result<(F, U), Error>, - F: AbstractService, + F: TestNetNode, Lb: Fn(Configuration) -> Result, - L: AbstractService, + L: TestNetNode, B: FnMut(&F, &mut U), ExF: FnMut(&F, &U) -> ::Extrinsic, U: Clone + Send + 'static, @@ -468,39 +534,41 @@ pub fn sync( info!("Generating #{}", i + 1); } - make_block_and_import(&first_service.get(), first_user_data); + make_block_and_import(&first_service, first_user_data); } - (network.full_nodes[0].1).0.lock().unwrap().network().update_chain(); + network.full_nodes[0].1.network().update_chain(); network.full_nodes[0].3.clone() }; info!("Running sync"); for (_, service, _, _) in network.full_nodes.iter().skip(1) { - service.get().network().add_reserved_peer(first_address.to_string()).expect("Error adding reserved peer"); + service.network().add_reserved_peer(first_address.to_string()) + .expect("Error adding reserved peer"); } for (_, service, _) in network.light_nodes.iter() { - service.get().network().add_reserved_peer(first_address.to_string()).expect("Error adding reserved peer"); + service.network().add_reserved_peer(first_address.to_string()) + .expect("Error adding reserved peer"); } network.run_until_all_full( |_index, service| - service.get().client().info().best_number == (NUM_BLOCKS as u32).into(), + service.client().info().best_number == (NUM_BLOCKS as u32).into(), |_index, service| - service.get().client().info().best_number == (NUM_BLOCKS as u32).into(), + service.client().info().best_number == (NUM_BLOCKS as u32).into(), ); info!("Checking extrinsic propagation"); let first_service = network.full_nodes[0].1.clone(); let first_user_data = &network.full_nodes[0].2; - let best_block = BlockId::number(first_service.get().client().info().best_number); - let extrinsic = extrinsic_factory(&first_service.get(), first_user_data); + let best_block = BlockId::number(first_service.client().info().best_number); + let extrinsic = extrinsic_factory(&first_service, first_user_data); let source = sp_transaction_pool::TransactionSource::External; futures::executor::block_on( - first_service.get().transaction_pool().submit_one(&best_block, source, extrinsic) + first_service.transaction_pool().submit_one(&best_block, source, extrinsic) ).expect("failed to submit extrinsic"); network.run_until_all_full( - |_index, service| service.get().transaction_pool().ready().count() == 1, + |_index, service| service.transaction_pool().ready().count() == 1, |_index, _service| true, ); } @@ -512,9 +580,9 @@ pub fn consensus( authorities: impl IntoIterator ) where Fb: Fn(Configuration) -> Result, - F: AbstractService, + F: TestNetNode, Lb: Fn(Configuration) -> Result, - L: AbstractService, + L: TestNetNode, E: ChainSpecExtension + Clone + 'static + Send, G: RuntimeGenesis + 'static, { @@ -534,19 +602,22 @@ pub fn consensus( info!("Checking consensus"); let first_address = network.authority_nodes[0].3.clone(); for (_, service, _, _) in network.full_nodes.iter() { - service.get().network().add_reserved_peer(first_address.to_string()).expect("Error adding reserved peer"); + service.network().add_reserved_peer(first_address.to_string()) + .expect("Error adding reserved peer"); } for (_, service, _) in network.light_nodes.iter() { - service.get().network().add_reserved_peer(first_address.to_string()).expect("Error adding reserved peer"); + service.network().add_reserved_peer(first_address.to_string()) + .expect("Error adding reserved peer"); } for (_, service, _, _) in network.authority_nodes.iter().skip(1) { - service.get().network().add_reserved_peer(first_address.to_string()).expect("Error adding reserved peer"); + service.network().add_reserved_peer(first_address.to_string()) + .expect("Error adding reserved peer"); } network.run_until_all_full( |_index, service| - service.get().client().info().finalized_number >= (NUM_BLOCKS as u32 / 2).into(), + service.client().info().finalized_number >= (NUM_BLOCKS as u32 / 2).into(), |_index, service| - service.get().client().info().best_number >= (NUM_BLOCKS as u32 / 2).into(), + service.client().info().best_number >= (NUM_BLOCKS as u32 / 2).into(), ); info!("Adding more peers"); @@ -559,15 +630,17 @@ pub fn consensus( (0..0).map(|_| (String::new(), { |cfg| full_builder(cfg).map(|s| (s, ())) })), ); for (_, service, _, _) in network.full_nodes.iter() { - service.get().network().add_reserved_peer(first_address.to_string()).expect("Error adding reserved peer"); + service.network().add_reserved_peer(first_address.to_string()) + .expect("Error adding reserved peer"); } for (_, service, _) in network.light_nodes.iter() { - service.get().network().add_reserved_peer(first_address.to_string()).expect("Error adding reserved peer"); + service.network().add_reserved_peer(first_address.to_string()) + .expect("Error adding reserved peer"); } network.run_until_all_full( |_index, service| - service.get().client().info().finalized_number >= (NUM_BLOCKS as u32).into(), + service.client().info().finalized_number >= (NUM_BLOCKS as u32).into(), |_index, service| - service.get().client().info().best_number >= (NUM_BLOCKS as u32).into(), + service.client().info().best_number >= (NUM_BLOCKS as u32).into(), ); } diff --git a/test-utils/client/src/client_ext.rs b/test-utils/client/src/client_ext.rs index 706a7b6e95a85..a74bd3258ef0f 100644 --- a/test-utils/client/src/client_ext.rs +++ b/test-utils/client/src/client_ext.rs @@ -19,6 +19,7 @@ use sc_service::client::Client; use sc_client_api::backend::Finalizer; +use sc_client_api::client::BlockBackend; use sp_consensus::{ BlockImportParams, BlockImport, BlockOrigin, Error as ConsensusError, ForkChoiceStrategy, diff --git a/utils/browser/src/lib.rs b/utils/browser/src/lib.rs index 799fe9788ca57..c8034d9466fe7 100644 --- a/utils/browser/src/lib.rs +++ b/utils/browser/src/lib.rs @@ -19,14 +19,15 @@ use futures01::sync::mpsc as mpsc01; use log::{debug, info}; use sc_network::config::TransportConfig; use sc_service::{ - AbstractService, RpcSession, Role, Configuration, + RpcSession, Role, Configuration, TaskManager, RpcHandlers, config::{DatabaseConfig, KeystoreConfig, NetworkConfiguration}, GenericChainSpec, RuntimeGenesis }; use wasm_bindgen::prelude::*; -use futures::{prelude::*, channel::{oneshot, mpsc}, future::{poll_fn, ok}, compat::*}; -use std::task::Poll; -use std::pin::Pin; +use futures::{ + prelude::*, channel::{oneshot, mpsc}, compat::*, future::{ready, ok, select} +}; +use std::{sync::Arc, pin::Pin}; use sc_chain_spec::Extension; use libp2p_wasm_ext::{ExtTransport, ffi}; @@ -120,31 +121,25 @@ struct RpcMessage { } /// Create a Client object that connects to a service. -pub fn start_client(mut service: impl AbstractService) -> Client { +pub fn start_client(mut task_manager: TaskManager, rpc_handlers: Arc) -> Client { // We dispatch a background task responsible for processing the service. // // The main action performed by the code below consists in polling the service with // `service.poll()`. // The rest consists in handling RPC requests. - let (rpc_send_tx, mut rpc_send_rx) = mpsc::unbounded::(); - wasm_bindgen_futures::spawn_local(poll_fn(move |cx| { - loop { - match Pin::new(&mut rpc_send_rx).poll_next(cx) { - Poll::Ready(Some(message)) => { - let fut = service - .rpc_query(&message.session, &message.rpc_json) - .boxed(); - let _ = message.send_back.send(fut); - }, - Poll::Pending => break, - Poll::Ready(None) => return Poll::Ready(()), - } - } - - Pin::new(&mut service) - .poll(cx) - .map(drop) - })); + let (rpc_send_tx, rpc_send_rx) = mpsc::unbounded::(); + wasm_bindgen_futures::spawn_local( + select( + rpc_send_rx.for_each(move |message| { + let fut = rpc_handlers.rpc_query(&message.session, &message.rpc_json); + let _ = message.send_back.send(fut); + ready(()) + }), + Box::pin(async move { + let _ = task_manager.future().await; + }), + ).map(drop) + ); Client { rpc_send_tx,