Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

node: spawn block authoring and grandpa voter as blocking tasks #6446

Merged
merged 3 commits into from
Jun 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions bin/node-template/node/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ pub fn new_full(config: Configuration) -> Result<impl AbstractService, ServiceEr

// the AURA authoring task is considered essential, i.e. if it
// fails we take down the service with it.
service.spawn_essential_task("aura", aura);
service.spawn_essential_task_handle().spawn_blocking("aura", aura);
}

// if the node isn't actively participating in consensus then it doesn't
Expand Down Expand Up @@ -184,7 +184,7 @@ pub fn new_full(config: Configuration) -> Result<impl AbstractService, ServiceEr

// the GRANDPA voter task is considered infallible, i.e.
// if it fails we take down the service with it.
service.spawn_essential_task(
service.spawn_essential_task_handle().spawn_blocking(
"grandpa-voter",
sc_finality_grandpa::run_grandpa_voter(grandpa_config)?
);
Expand Down
6 changes: 3 additions & 3 deletions bin/node/cli/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ macro_rules! new_full {
};

let babe = sc_consensus_babe::start_babe(babe_config)?;
service.spawn_essential_task("babe-proposer", babe);
service.spawn_essential_task_handle().spawn_blocking("babe-proposer", babe);
}

// Spawn authority discovery module.
Expand Down Expand Up @@ -250,7 +250,7 @@ macro_rules! new_full {
service.prometheus_registry(),
);

service.spawn_task("authority-discovery", authority_discovery);
service.spawn_task_handle().spawn("authority-discovery", authority_discovery);
}

// if the node isn't actively participating in consensus then it doesn't
Expand Down Expand Up @@ -292,7 +292,7 @@ macro_rules! new_full {

// the GRANDPA voter task is considered infallible, i.e.
// if it fails we take down the service with it.
service.spawn_essential_task(
service.spawn_essential_task_handle().spawn_blocking(
"grandpa-voter",
grandpa::run_grandpa_voter(grandpa_config)?
);
Expand Down
17 changes: 15 additions & 2 deletions client/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ pub use sc_network::config::{
TransactionImportFuture,
};
pub use sc_tracing::TracingReceiver;
pub use task_manager::SpawnTaskHandle;
pub use task_manager::{SpawnEssentialTaskHandle, SpawnTaskHandle};
use task_manager::TaskManager;
use sp_blockchain::{HeaderBackend, HeaderMetadata};
use sp_api::{ApiExt, ConstructRuntimeApi, ApiErrorExt};
Expand Down Expand Up @@ -166,13 +166,19 @@ pub trait AbstractService: Future<Output = Result<(), Error>> + Send + Unpin + S
/// The task name is a `&'static str` as opposed to a `String`. The reason for that is that
/// in order to avoid memory consumption issues with the Prometheus metrics, the set of
/// possible task names has to be bounded.
#[deprecated(note = "Use `spawn_task_handle().spawn() instead.")]
fn spawn_task(&self, name: &'static str, task: impl Future<Output = ()> + Send + 'static);

/// Spawns a task in the background that runs the future passed as
/// parameter. The given task is considered essential, i.e. if it errors we
/// trigger a service exit.
#[deprecated(note = "Use `spawn_essential_task_handle().spawn() instead.")]
fn spawn_essential_task(&self, name: &'static str, task: impl Future<Output = ()> + Send + 'static);

/// Returns a handle for spawning essential tasks. Any task spawned through this handle is
/// considered essential, i.e. if it errors we trigger a service exit.
fn spawn_essential_task_handle(&self) -> SpawnEssentialTaskHandle;

/// Returns a handle for spawning tasks.
fn spawn_task_handle(&self) -> SpawnTaskHandle;

Expand Down Expand Up @@ -269,13 +275,20 @@ where
let _ = essential_failed.send(());
});

let _ = self.spawn_task(name, essential_task);
let _ = self.spawn_task_handle().spawn(name, essential_task);
}

fn spawn_task_handle(&self) -> SpawnTaskHandle {
self.task_manager.spawn_handle()
}

fn spawn_essential_task_handle(&self) -> SpawnEssentialTaskHandle {
SpawnEssentialTaskHandle::new(
self.essential_failed_tx.clone(),
self.task_manager.spawn_handle(),
)
}

fn rpc_query(&self, mem: &RpcSession, request: &str) -> Pin<Box<dyn Future<Output = Option<String>> + Send>> {
Box::pin(
self.rpc_handlers.handle_request(request, mem.metadata.clone())
Expand Down
59 changes: 59 additions & 0 deletions client/service/src/task_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use prometheus_endpoint::{
CounterVec, HistogramOpts, HistogramVec, Opts, Registry, U64
};
use sc_client_api::CloneableSpawn;
use sp_utils::mpsc::TracingUnboundedSender;
use crate::config::TaskType;

mod prometheus_future;
Expand Down Expand Up @@ -149,6 +150,64 @@ impl futures01::future::Executor<Boxed01Future01> for SpawnTaskHandle {
}
}

/// A wrapper over `SpawnTaskHandle` that will notify a receiver whenever any
/// task spawned through it fails. The service should be on the receiver side
/// and will shut itself down whenever it receives any message, i.e. an
/// essential task has failed.
pub struct SpawnEssentialTaskHandle {
essential_failed_tx: TracingUnboundedSender<()>,
inner: SpawnTaskHandle,
}

impl SpawnEssentialTaskHandle {
/// Creates a new `SpawnEssentialTaskHandle`.
pub fn new(
essential_failed_tx: TracingUnboundedSender<()>,
spawn_task_handle: SpawnTaskHandle,
) -> SpawnEssentialTaskHandle {
SpawnEssentialTaskHandle {
essential_failed_tx,
inner: spawn_task_handle,
}
}

/// Spawns the given task with the given name.
///
/// See also [`SpawnTaskHandle::spawn`].
pub fn spawn(&self, name: &'static str, task: impl Future<Output = ()> + Send + 'static) {
self.spawn_inner(name, task, TaskType::Async)
}

/// Spawns the blocking task with the given name.
///
/// See also [`SpawnTaskHandle::spawn_blocking`].
pub fn spawn_blocking(
&self,
name: &'static str,
task: impl Future<Output = ()> + Send + 'static,
) {
self.spawn_inner(name, task, TaskType::Blocking)
}

fn spawn_inner(
&self,
name: &'static str,
task: impl Future<Output = ()> + Send + 'static,
task_type: TaskType,
) {
use futures::sink::SinkExt;
let mut essential_failed = self.essential_failed_tx.clone();
let essential_task = std::panic::AssertUnwindSafe(task)
.catch_unwind()
.map(move |_| {
log::error!("Essential task `{}` failed. Shutting down service.", name);
let _ = essential_failed.send(());
});

let _ = self.inner.spawn_inner(name, essential_task, task_type);
}
}

/// Helper struct to manage background/async tasks in Service.
pub struct TaskManager {
/// A future that resolves when the service has exited, this is useful to
Expand Down