From c681f7ebafbc23d82c57cbe19c983f8553cdc42a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Silva?= Date: Fri, 19 Jun 2020 17:41:50 +0100 Subject: [PATCH 1/3] service: add spawner for essential tasks --- client/service/src/lib.rs | 17 ++++++++- client/service/src/task_manager.rs | 59 ++++++++++++++++++++++++++++++ 2 files changed, 74 insertions(+), 2 deletions(-) diff --git a/client/service/src/lib.rs b/client/service/src/lib.rs index 5184886efd26c..bfd048c75903a 100644 --- a/client/service/src/lib.rs +++ b/client/service/src/lib.rs @@ -82,7 +82,7 @@ pub use sc_network::config::{ TransactionImportFuture, }; pub use sc_tracing::TracingReceiver; -pub use task_manager::SpawnTaskHandle; +pub use task_manager::{SpawnEssentialTaskHandle, SpawnTaskHandle}; use task_manager::TaskManager; use sp_blockchain::{HeaderBackend, HeaderMetadata}; use sp_api::{ApiExt, ConstructRuntimeApi, ApiErrorExt}; @@ -166,13 +166,19 @@ pub trait AbstractService: Future> + Send + Unpin + S /// The task name is a `&'static str` as opposed to a `String`. The reason for that is that /// in order to avoid memory consumption issues with the Prometheus metrics, the set of /// possible task names has to be bounded. + #[deprecated(note = "Use `spawn_task_handle().spawn() instead.")] fn spawn_task(&self, name: &'static str, task: impl Future + Send + 'static); /// Spawns a task in the background that runs the future passed as /// parameter. The given task is considered essential, i.e. if it errors we /// trigger a service exit. + #[deprecated(note = "Use `spawn_essential_task_handle().spawn() instead.")] fn spawn_essential_task(&self, name: &'static str, task: impl Future + Send + 'static); + /// Returns a handle for spawning essential tasks. Any task spawned through this handle is + /// considered essential, i.e. if it errors we trigger a service exit. + fn spawn_essential_task_handle(&self) -> SpawnEssentialTaskHandle; + /// Returns a handle for spawning tasks. fn spawn_task_handle(&self) -> SpawnTaskHandle; @@ -269,13 +275,20 @@ where let _ = essential_failed.send(()); }); - let _ = self.spawn_task(name, essential_task); + let _ = self.spawn_task_handle().spawn(name, essential_task); } fn spawn_task_handle(&self) -> SpawnTaskHandle { self.task_manager.spawn_handle() } + fn spawn_essential_task_handle(&self) -> SpawnEssentialTaskHandle { + SpawnEssentialTaskHandle::new( + self.essential_failed_tx.clone(), + self.task_manager.spawn_handle(), + ) + } + fn rpc_query(&self, mem: &RpcSession, request: &str) -> Pin> + Send>> { Box::pin( self.rpc_handlers.handle_request(request, mem.metadata.clone()) diff --git a/client/service/src/task_manager.rs b/client/service/src/task_manager.rs index 9cd92538e326e..c5a0bb9f24bfe 100644 --- a/client/service/src/task_manager.rs +++ b/client/service/src/task_manager.rs @@ -28,6 +28,7 @@ use prometheus_endpoint::{ CounterVec, HistogramOpts, HistogramVec, Opts, Registry, U64 }; use sc_client_api::CloneableSpawn; +use sp_utils::mpsc::TracingUnboundedSender; use crate::config::TaskType; mod prometheus_future; @@ -149,6 +150,64 @@ impl futures01::future::Executor for SpawnTaskHandle { } } +/// A wrapper over `SpawnTaskHandle` that will notify a receiver whenever any +/// task spawned through it fails. The service should be on the receiver side +/// and will shut itself down whenever it receives any message, i.e. an +/// essential task has failed. +pub struct SpawnEssentialTaskHandle { + essential_failed_tx: TracingUnboundedSender<()>, + inner: SpawnTaskHandle, +} + +impl SpawnEssentialTaskHandle { + /// Creates a new `SpawnEssentialTaskHandle`. + pub fn new( + essential_failed_tx: TracingUnboundedSender<()>, + spawn_task_handle: SpawnTaskHandle, + ) -> SpawnEssentialTaskHandle { + SpawnEssentialTaskHandle { + essential_failed_tx, + inner: spawn_task_handle, + } + } + + /// Spawns the given task with the given name. + /// + /// See `SpawnTaskHandle::spawn`. + pub fn spawn(&self, name: &'static str, task: impl Future + Send + 'static) { + self.spawn_inner(name, task, TaskType::Async) + } + + /// Spawns the blocking task with the given name. + /// + /// See also `SpawnTaskHandle::spawn_blocking`. + pub fn spawn_blocking( + &self, + name: &'static str, + task: impl Future + Send + 'static, + ) { + self.spawn_inner(name, task, TaskType::Blocking) + } + + fn spawn_inner( + &self, + name: &'static str, + task: impl Future + Send + 'static, + task_type: TaskType, + ) { + use futures::sink::SinkExt; + let mut essential_failed = self.essential_failed_tx.clone(); + let essential_task = std::panic::AssertUnwindSafe(task) + .catch_unwind() + .map(move |_| { + log::error!("Essential task `{}` failed. Shutting down service.", name); + let _ = essential_failed.send(()); + }); + + let _ = self.inner.spawn_inner(name, essential_task, task_type); + } +} + /// Helper struct to manage background/async tasks in Service. pub struct TaskManager { /// A future that resolves when the service has exited, this is useful to From 2474f224e56b1ef693203f5cb7420069a4d08f24 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Silva?= Date: Fri, 19 Jun 2020 17:42:11 +0100 Subject: [PATCH 2/3] node: spawn block authoring and grandpa voter as blocking tasks --- bin/node-template/node/src/service.rs | 4 ++-- bin/node/cli/src/service.rs | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/bin/node-template/node/src/service.rs b/bin/node-template/node/src/service.rs index e8578ab5b52de..e330c17b244b0 100644 --- a/bin/node-template/node/src/service.rs +++ b/bin/node-template/node/src/service.rs @@ -142,7 +142,7 @@ pub fn new_full(config: Configuration) -> Result Result Date: Fri, 19 Jun 2020 19:15:02 +0200 Subject: [PATCH 3/3] Apply suggestions from code review --- client/service/src/task_manager.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/client/service/src/task_manager.rs b/client/service/src/task_manager.rs index c5a0bb9f24bfe..5a400f70df10a 100644 --- a/client/service/src/task_manager.rs +++ b/client/service/src/task_manager.rs @@ -173,14 +173,14 @@ impl SpawnEssentialTaskHandle { /// Spawns the given task with the given name. /// - /// See `SpawnTaskHandle::spawn`. + /// See also [`SpawnTaskHandle::spawn`]. pub fn spawn(&self, name: &'static str, task: impl Future + Send + 'static) { self.spawn_inner(name, task, TaskType::Async) } /// Spawns the blocking task with the given name. /// - /// See also `SpawnTaskHandle::spawn_blocking`. + /// See also [`SpawnTaskHandle::spawn_blocking`]. pub fn spawn_blocking( &self, name: &'static str,