Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

node: spawn block authoring and grandpa voter as blocking tasks #6446

Merged
merged 3 commits into from
Jun 21, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions bin/node-template/node/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ pub fn new_full(config: Configuration) -> Result<impl AbstractService, ServiceEr

// the AURA authoring task is considered essential, i.e. if it
// fails we take down the service with it.
service.spawn_essential_task("aura", aura);
service.spawn_essential_task_handle().spawn_blocking("aura", aura);
}

// if the node isn't actively participating in consensus then it doesn't
Expand Down Expand Up @@ -184,7 +184,7 @@ pub fn new_full(config: Configuration) -> Result<impl AbstractService, ServiceEr

// the GRANDPA voter task is considered infallible, i.e.
// if it fails we take down the service with it.
service.spawn_essential_task(
service.spawn_essential_task_handle().spawn_blocking(
"grandpa-voter",
sc_finality_grandpa::run_grandpa_voter(grandpa_config)?
);
Expand Down
6 changes: 3 additions & 3 deletions bin/node/cli/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ macro_rules! new_full {
};

let babe = sc_consensus_babe::start_babe(babe_config)?;
service.spawn_essential_task("babe-proposer", babe);
service.spawn_essential_task_handle().spawn_blocking("babe-proposer", babe);
}

// Spawn authority discovery module.
Expand Down Expand Up @@ -250,7 +250,7 @@ macro_rules! new_full {
service.prometheus_registry(),
);

service.spawn_task("authority-discovery", authority_discovery);
service.spawn_task_handle().spawn("authority-discovery", authority_discovery);
}

// if the node isn't actively participating in consensus then it doesn't
Expand Down Expand Up @@ -292,7 +292,7 @@ macro_rules! new_full {

// the GRANDPA voter task is considered infallible, i.e.
// if it fails we take down the service with it.
service.spawn_essential_task(
service.spawn_essential_task_handle().spawn_blocking(
"grandpa-voter",
grandpa::run_grandpa_voter(grandpa_config)?
);
Expand Down
17 changes: 15 additions & 2 deletions client/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ pub use sc_network::config::{
TransactionImportFuture,
};
pub use sc_tracing::TracingReceiver;
pub use task_manager::SpawnTaskHandle;
pub use task_manager::{SpawnEssentialTaskHandle, SpawnTaskHandle};
use task_manager::TaskManager;
use sp_blockchain::{HeaderBackend, HeaderMetadata};
use sp_api::{ApiExt, ConstructRuntimeApi, ApiErrorExt};
Expand Down Expand Up @@ -166,13 +166,19 @@ pub trait AbstractService: Future<Output = Result<(), Error>> + Send + Unpin + S
/// The task name is a `&'static str` as opposed to a `String`. The reason for that is that
/// in order to avoid memory consumption issues with the Prometheus metrics, the set of
/// possible task names has to be bounded.
#[deprecated(note = "Use `spawn_task_handle().spawn() instead.")]
fn spawn_task(&self, name: &'static str, task: impl Future<Output = ()> + Send + 'static);

/// Spawns a task in the background that runs the future passed as
/// parameter. The given task is considered essential, i.e. if it errors we
/// trigger a service exit.
#[deprecated(note = "Use `spawn_essential_task_handle().spawn() instead.")]
fn spawn_essential_task(&self, name: &'static str, task: impl Future<Output = ()> + Send + 'static);

/// Returns a handle for spawning essential tasks. Any task spawned through this handle is
/// considered essential, i.e. if it errors we trigger a service exit.
fn spawn_essential_task_handle(&self) -> SpawnEssentialTaskHandle;

/// Returns a handle for spawning tasks.
fn spawn_task_handle(&self) -> SpawnTaskHandle;

Expand Down Expand Up @@ -269,13 +275,20 @@ where
let _ = essential_failed.send(());
});

let _ = self.spawn_task(name, essential_task);
let _ = self.spawn_task_handle().spawn(name, essential_task);
}

fn spawn_task_handle(&self) -> SpawnTaskHandle {
self.task_manager.spawn_handle()
}

fn spawn_essential_task_handle(&self) -> SpawnEssentialTaskHandle {
SpawnEssentialTaskHandle::new(
self.essential_failed_tx.clone(),
self.task_manager.spawn_handle(),
)
}

fn rpc_query(&self, mem: &RpcSession, request: &str) -> Pin<Box<dyn Future<Output = Option<String>> + Send>> {
Box::pin(
self.rpc_handlers.handle_request(request, mem.metadata.clone())
Expand Down
59 changes: 59 additions & 0 deletions client/service/src/task_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use prometheus_endpoint::{
CounterVec, HistogramOpts, HistogramVec, Opts, Registry, U64
};
use sc_client_api::CloneableSpawn;
use sp_utils::mpsc::TracingUnboundedSender;
use crate::config::TaskType;

mod prometheus_future;
Expand Down Expand Up @@ -149,6 +150,64 @@ impl futures01::future::Executor<Boxed01Future01> for SpawnTaskHandle {
}
}

/// A wrapper over `SpawnTaskHandle` that will notify a receiver whenever any
/// task spawned through it fails. The service should be on the receiver side
/// and will shut itself down whenever it receives any message, i.e. an
/// essential task has failed.
pub struct SpawnEssentialTaskHandle {
essential_failed_tx: TracingUnboundedSender<()>,
inner: SpawnTaskHandle,
}

impl SpawnEssentialTaskHandle {
/// Creates a new `SpawnEssentialTaskHandle`.
pub fn new(
essential_failed_tx: TracingUnboundedSender<()>,
spawn_task_handle: SpawnTaskHandle,
) -> SpawnEssentialTaskHandle {
SpawnEssentialTaskHandle {
essential_failed_tx,
inner: spawn_task_handle,
}
}

/// Spawns the given task with the given name.
///
/// See `SpawnTaskHandle::spawn`.
bkchr marked this conversation as resolved.
Show resolved Hide resolved
pub fn spawn(&self, name: &'static str, task: impl Future<Output = ()> + Send + 'static) {
self.spawn_inner(name, task, TaskType::Async)
}

/// Spawns the blocking task with the given name.
///
/// See also `SpawnTaskHandle::spawn_blocking`.
bkchr marked this conversation as resolved.
Show resolved Hide resolved
pub fn spawn_blocking(
&self,
name: &'static str,
task: impl Future<Output = ()> + Send + 'static,
) {
self.spawn_inner(name, task, TaskType::Blocking)
}

fn spawn_inner(
&self,
name: &'static str,
task: impl Future<Output = ()> + Send + 'static,
task_type: TaskType,
) {
use futures::sink::SinkExt;
let mut essential_failed = self.essential_failed_tx.clone();
let essential_task = std::panic::AssertUnwindSafe(task)
.catch_unwind()
.map(move |_| {
log::error!("Essential task `{}` failed. Shutting down service.", name);
let _ = essential_failed.send(());
});

let _ = self.inner.spawn_inner(name, essential_task, task_type);
}
}

/// Helper struct to manage background/async tasks in Service.
pub struct TaskManager {
/// A future that resolves when the service has exited, this is useful to
Expand Down