From b29f6e5c77223579672055914b97bb0bd3ee4741 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Oliveira?= Date: Mon, 5 Feb 2024 14:55:51 +0000 Subject: [PATCH] remove exit-future usage, as it is non maintained, and replace with async-channel which is already in the repo. --- Cargo.lock | 23 ++----- Cargo.toml | 1 + beacon_node/beacon_chain/Cargo.toml | 1 - beacon_node/execution_layer/Cargo.toml | 1 - beacon_node/lighthouse_network/Cargo.toml | 3 +- .../lighthouse_network/tests/common.rs | 4 +- beacon_node/network/Cargo.toml | 4 +- beacon_node/network/src/service/tests.rs | 4 +- common/task_executor/Cargo.toml | 2 +- common/task_executor/src/lib.rs | 61 ++++++++++--------- common/task_executor/src/test_utils.rs | 4 +- lighthouse/environment/Cargo.toml | 2 +- lighthouse/environment/src/lib.rs | 8 +-- .../execution_engine_integration/Cargo.toml | 4 +- .../src/test_rig.rs | 4 +- testing/web3signer_tests/Cargo.toml | 2 +- testing/web3signer_tests/src/lib.rs | 5 +- validator_client/Cargo.toml | 1 - 18 files changed, 60 insertions(+), 74 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4fc8ef8fe97..1698fc4f726 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -681,7 +681,6 @@ dependencies = [ "ethereum_ssz", "ethereum_ssz_derive", "execution_layer", - "exit-future", "fork_choice", "futures", "genesis", @@ -2191,10 +2190,10 @@ dependencies = [ name = "environment" version = "0.1.2" dependencies = [ + "async-channel 1.9.0", "ctrlc", "eth2_config", "eth2_network_config", - "exit-future", "futures", "logging", "serde", @@ -2729,12 +2728,12 @@ dependencies = [ name = "execution_engine_integration" version = "0.1.0" dependencies = [ + "async-channel 1.9.0", "deposit_contract", "environment", "ethers-core", "ethers-providers", "execution_layer", - "exit-future", "fork_choice", "futures", "hex", @@ -2763,7 +2762,6 @@ dependencies = [ "ethereum_serde_utils", "ethereum_ssz", "ethers-core", - "exit-future", "fork_choice", "futures", "hash-db", @@ -2800,15 +2798,6 @@ dependencies = [ "zeroize", ] -[[package]] -name = "exit-future" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e43f2f1833d64e33f15592464d6fdd70f349dda7b1a53088eb83cd94014008c5" -dependencies = [ - "futures", -] - [[package]] name = "eyre" version = "0.6.11" @@ -4882,7 +4871,6 @@ dependencies = [ "error-chain", "ethereum_ssz", "ethereum_ssz_derive", - "exit-future", "fnv", "futures", "futures-ticker", @@ -5440,6 +5428,7 @@ dependencies = [ name = "network" version = "0.2.0" dependencies = [ + "async-channel 1.9.0", "beacon_chain", "beacon_processor", "delay_map", @@ -5450,7 +5439,6 @@ dependencies = [ "ethereum-types 0.14.1", "ethereum_ssz", "execution_layer", - "exit-future", "fnv", "futures", "genesis", @@ -8033,7 +8021,7 @@ checksum = "c63f48baada5c52e65a29eef93ab4f8982681b67f9e8d29c7b05abcfec2b9ffe" name = "task_executor" version = "0.1.0" dependencies = [ - "exit-future", + "async-channel 1.9.0", "futures", "lazy_static", "lighthouse_metrics", @@ -8842,7 +8830,6 @@ dependencies = [ "eth2", "eth2_keystore", "ethereum_serde_utils", - "exit-future", "filesystem", "futures", "hex", @@ -9184,10 +9171,10 @@ name = "web3signer_tests" version = "0.1.0" dependencies = [ "account_utils", + "async-channel 1.9.0", "environment", "eth2_keystore", "eth2_network_config", - "exit-future", "futures", "lazy_static", "parking_lot 0.12.1", diff --git a/Cargo.toml b/Cargo.toml index ce3b47012c8..8a4cec7fde4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -95,6 +95,7 @@ edition = "2021" [workspace.dependencies] arbitrary = { version = "1", features = ["derive"] } +async-channel = "1.9.0" bincode = "1" bitvec = "1" byteorder = "1" diff --git a/beacon_node/beacon_chain/Cargo.toml b/beacon_node/beacon_chain/Cargo.toml index 533ef7d9546..43c2c896f71 100644 --- a/beacon_node/beacon_chain/Cargo.toml +++ b/beacon_node/beacon_chain/Cargo.toml @@ -68,7 +68,6 @@ execution_layer = { workspace = true } sensitive_url = { workspace = true } superstruct = { workspace = true } hex = { workspace = true } -exit-future = { workspace = true } oneshot_broadcast = { path = "../../common/oneshot_broadcast/" } slog-term = { workspace = true } slog-async = { workspace = true } diff --git a/beacon_node/execution_layer/Cargo.toml b/beacon_node/execution_layer/Cargo.toml index 7f652689806..8e7713ef310 100644 --- a/beacon_node/execution_layer/Cargo.toml +++ b/beacon_node/execution_layer/Cargo.toml @@ -29,7 +29,6 @@ kzg = { workspace = true } state_processing = { workspace = true } superstruct = { workspace = true } lru = { workspace = true } -exit-future = { workspace = true } tree_hash = { workspace = true } tree_hash_derive = { workspace = true } parking_lot = { workspace = true } diff --git a/beacon_node/lighthouse_network/Cargo.toml b/beacon_node/lighthouse_network/Cargo.toml index cd0de37d3ba..e61cd2af0f1 100644 --- a/beacon_node/lighthouse_network/Cargo.toml +++ b/beacon_node/lighthouse_network/Cargo.toml @@ -5,6 +5,7 @@ authors = ["Sigma Prime "] edition = { workspace = true } [dependencies] +async-channel = { workspace = true } discv5 = { workspace = true } unsigned-varint = { version = "0.6", features = ["codec"] } ssz_types = { workspace = true } @@ -55,7 +56,6 @@ hex_fmt = "0.3.0" instant = "0.1.12" quick-protobuf = "0.8" void = "1.0.2" -async-channel = "1.9.0" asynchronous-codec = "0.7.0" base64 = "0.21.5" libp2p-mplex = "0.41" @@ -70,7 +70,6 @@ features = ["identify", "yamux", "noise", "dns", "tcp", "tokio", "plaintext", "s slog-term = { workspace = true } slog-async = { workspace = true } tempfile = { workspace = true } -exit-future = { workspace = true } quickcheck = { workspace = true } quickcheck_macros = { workspace = true } async-std = { version = "1.6.3", features = ["unstable"] } diff --git a/beacon_node/lighthouse_network/tests/common.rs b/beacon_node/lighthouse_network/tests/common.rs index af48244678d..3351ac23cb5 100644 --- a/beacon_node/lighthouse_network/tests/common.rs +++ b/beacon_node/lighthouse_network/tests/common.rs @@ -46,7 +46,7 @@ pub struct Libp2pInstance( LibP2PService, #[allow(dead_code)] // This field is managed for lifetime purposes may not be used directly, hence the `#[allow(dead_code)]` attribute. - exit_future::Signal, + async_channel::Sender<()>, ); impl std::ops::Deref for Libp2pInstance { @@ -110,7 +110,7 @@ pub async fn build_libp2p_instance( let config = build_config(boot_nodes); // launch libp2p service - let (signal, exit) = exit_future::signal(); + let (signal, exit) = async_channel::bounded(1); let (shutdown_tx, _) = futures::channel::mpsc::channel(1); let executor = task_executor::TaskExecutor::new(rt, exit, log.clone(), shutdown_tx); let libp2p_context = lighthouse_network::Context { diff --git a/beacon_node/network/Cargo.toml b/beacon_node/network/Cargo.toml index d8766d0091e..37637bb2265 100644 --- a/beacon_node/network/Cargo.toml +++ b/beacon_node/network/Cargo.toml @@ -8,12 +8,12 @@ edition = { workspace = true } sloggers = { workspace = true } genesis = { workspace = true } matches = "0.1.8" -exit-future = { workspace = true } slog-term = { workspace = true } slog-async = { workspace = true } eth2 = { workspace = true } [dependencies] +async-channel = { workspace = true } beacon_chain = { workspace = true } store = { workspace = true } lighthouse_network = { workspace = true } @@ -56,4 +56,4 @@ environment = { workspace = true } # NOTE: This can be run via cargo build --bin lighthouse --features network/disable-backfill disable-backfill = [] fork_from_env = ["beacon_chain/fork_from_env"] -portable = ["beacon_chain/portable"] \ No newline at end of file +portable = ["beacon_chain/portable"] diff --git a/beacon_node/network/src/service/tests.rs b/beacon_node/network/src/service/tests.rs index 85b3f6b7528..39e5e129268 100644 --- a/beacon_node/network/src/service/tests.rs +++ b/beacon_node/network/src/service/tests.rs @@ -62,7 +62,7 @@ mod tests { let runtime = Arc::new(Runtime::new().unwrap()); - let (signal, exit) = exit_future::signal(); + let (signal, exit) = async_channel::bounded(1); let (shutdown_tx, _) = futures::channel::mpsc::channel(1); let executor = task_executor::TaskExecutor::new( Arc::downgrade(&runtime), @@ -139,7 +139,7 @@ mod tests { // Build network service. let (mut network_service, network_globals, _network_senders) = runtime.block_on(async { - let (_, exit) = exit_future::signal(); + let (_, exit) = async_channel::bounded(1); let (shutdown_tx, _) = futures::channel::mpsc::channel(1); let executor = task_executor::TaskExecutor::new( Arc::downgrade(&runtime), diff --git a/common/task_executor/Cargo.toml b/common/task_executor/Cargo.toml index 38f4eca3699..b3d58fa5ea8 100644 --- a/common/task_executor/Cargo.toml +++ b/common/task_executor/Cargo.toml @@ -5,10 +5,10 @@ authors = ["Sigma Prime "] edition = { workspace = true } [dependencies] +async-channel = { workspace = true } tokio = { workspace = true } slog = { workspace = true } futures = { workspace = true } -exit-future = { workspace = true } lazy_static = { workspace = true } lighthouse_metrics = { workspace = true } sloggers = { workspace = true } diff --git a/common/task_executor/src/lib.rs b/common/task_executor/src/lib.rs index 2b8877b26ba..d6edfd3121c 100644 --- a/common/task_executor/src/lib.rs +++ b/common/task_executor/src/lib.rs @@ -73,7 +73,7 @@ pub struct TaskExecutor { /// The handle to the runtime on which tasks are spawned handle_provider: HandleProvider, /// The receiver exit future which on receiving shuts down the task - exit: exit_future::Exit, + exit: async_channel::Receiver<()>, /// Sender given to tasks, so that if they encounter a state in which execution cannot /// continue they can request that everything shuts down. /// @@ -93,7 +93,7 @@ impl TaskExecutor { /// crate). pub fn new>( handle: T, - exit: exit_future::Exit, + exit: async_channel::Receiver<()>, log: slog::Logger, signal_tx: Sender, ) -> Self { @@ -159,8 +159,8 @@ impl TaskExecutor { /// Spawn a future on the tokio runtime. /// - /// The future is wrapped in an `exit_future::Exit`. The task is cancelled when the corresponding - /// exit_future `Signal` is fired/dropped. + /// The future is wrapped in an `async-channel::Receiver`. The task is cancelled when the corresponding + /// Sender is dropped. /// /// The future is monitored via another spawned future to ensure that it doesn't panic. In case /// of a panic, the executor will be shut down via `self.signal_tx`. @@ -172,9 +172,9 @@ impl TaskExecutor { } } - /// Spawn a future on the tokio runtime. This function does not wrap the task in an `exit_future::Exit` + /// Spawn a future on the tokio runtime. This function does not wrap the task in an `async-channel::Receiver` /// like [spawn](#method.spawn). - /// The caller of this function is responsible for wrapping up the task with an `exit_future::Exit` to + /// The caller of this function is responsible for wrapping up the task with an `async-channel::Receiver` to /// ensure that the task gets canceled appropriately. /// This function generates prometheus metrics on number of tasks and task duration. /// @@ -213,9 +213,9 @@ impl TaskExecutor { } } - /// Spawn a future on the tokio runtime wrapped in an `exit_future::Exit` returning an optional + /// Spawn a future on the tokio runtime wrapped in an `async-channel::Receiver` returning an optional /// join handle to the future. - /// The task is canceled when the corresponding exit_future `Signal` is fired/dropped. + /// The task is canceled when the corresponding async-channel is dropped. /// /// This function generates prometheus metrics on number of tasks and task duration. pub fn spawn_handle( @@ -223,30 +223,29 @@ impl TaskExecutor { task: impl Future + Send + 'static, name: &'static str, ) -> Option>> { - let exit = self.exit.clone(); + let exit = self.exit(); let log = self.log.clone(); if let Some(int_gauge) = metrics::get_int_gauge(&metrics::ASYNC_TASKS_COUNT, &[name]) { // Task is shutdown before it completes if `exit` receives let int_gauge_1 = int_gauge.clone(); - let future = future::select(Box::pin(task), exit).then(move |either| { - let result = match either { - future::Either::Left((value, _)) => { - trace!(log, "Async task completed"; "task" => name); - Some(value) - } - future::Either::Right(_) => { - debug!(log, "Async task shutdown, exit received"; "task" => name); - None - } - }; - int_gauge_1.dec(); - futures::future::ready(result) - }); - int_gauge.inc(); if let Some(handle) = self.handle() { - Some(handle.spawn(future)) + Some(handle.spawn(async move { + futures::pin_mut!(exit); + let result = match future::select(Box::pin(task), exit).await { + future::Either::Left((value, _)) => { + trace!(log, "Async task completed"; "task" => name); + Some(value) + } + future::Either::Right(_) => { + debug!(log, "Async task shutdown, exit received"; "task" => name); + None + } + }; + int_gauge_1.dec(); + result + })) } else { debug!(self.log, "Couldn't spawn task. Runtime shutting down"); None @@ -324,7 +323,7 @@ impl TaskExecutor { metrics::inc_gauge_vec(&metrics::BLOCK_ON_TASKS_COUNT, &[name]); let log = self.log.clone(); let handle = self.handle()?; - let exit = self.exit.clone(); + let exit = self.exit(); debug!( log, @@ -362,9 +361,13 @@ impl TaskExecutor { self.handle_provider.handle() } - /// Returns a copy of the `exit_future::Exit`. - pub fn exit(&self) -> exit_future::Exit { - self.exit.clone() + /// Returns a future that completes when `async-channel::Sender` is dropped or () is sent, + /// which translates to the exit signal being triggered. + pub fn exit(&self) -> impl Future { + let exit = self.exit.clone(); + async move { + let _ = exit.recv().await; + } } /// Get a channel to request shutting down. diff --git a/common/task_executor/src/test_utils.rs b/common/task_executor/src/test_utils.rs index c6e5ad01e68..6e372d97575 100644 --- a/common/task_executor/src/test_utils.rs +++ b/common/task_executor/src/test_utils.rs @@ -14,7 +14,7 @@ use tokio::runtime; /// This struct should never be used in production, only testing. pub struct TestRuntime { runtime: Option>, - _runtime_shutdown: exit_future::Signal, + _runtime_shutdown: async_channel::Sender<()>, pub task_executor: TaskExecutor, pub log: Logger, } @@ -24,7 +24,7 @@ impl Default for TestRuntime { /// called *outside* any existing runtime, create a new `Runtime` and keep it alive until the /// `Self` is dropped. fn default() -> Self { - let (runtime_shutdown, exit) = exit_future::signal(); + let (runtime_shutdown, exit) = async_channel::bounded(1); let (shutdown_tx, _) = futures::channel::mpsc::channel(1); let log = null_logger().unwrap(); diff --git a/lighthouse/environment/Cargo.toml b/lighthouse/environment/Cargo.toml index b57e1e9dee0..f95751392c8 100644 --- a/lighthouse/environment/Cargo.toml +++ b/lighthouse/environment/Cargo.toml @@ -5,6 +5,7 @@ authors = ["Paul Hauner "] edition = { workspace = true } [dependencies] +async-channel = { workspace = true } tokio = { workspace = true } slog = { workspace = true } sloggers = { workspace = true } @@ -17,7 +18,6 @@ slog-term = { workspace = true } slog-async = { workspace = true } futures = { workspace = true } slog-json = "2.3.0" -exit-future = { workspace = true } serde = { workspace = true } [target.'cfg(not(target_family = "unix"))'.dependencies] diff --git a/lighthouse/environment/src/lib.rs b/lighthouse/environment/src/lib.rs index 40001f1e1d4..e59b1d455a4 100644 --- a/lighthouse/environment/src/lib.rs +++ b/lighthouse/environment/src/lib.rs @@ -343,7 +343,7 @@ impl EnvironmentBuilder { /// Consumes the builder, returning an `Environment`. pub fn build(self) -> Result, String> { - let (signal, exit) = exit_future::signal(); + let (signal, exit) = async_channel::bounded(1); let (signal_tx, signal_rx) = channel(1); Ok(Environment { runtime: self @@ -370,8 +370,8 @@ pub struct Environment { signal_rx: Option>, /// Sender to request shutting down. signal_tx: Sender, - signal: Option, - exit: exit_future::Exit, + signal: Option>, + exit: async_channel::Receiver<()>, log: Logger, sse_logging_components: Option, eth_spec_instance: E, @@ -543,7 +543,7 @@ impl Environment { /// Fire exit signal which shuts down all spawned services pub fn fire_signal(&mut self) { if let Some(signal) = self.signal.take() { - let _ = signal.fire(); + drop(signal); } } diff --git a/testing/execution_engine_integration/Cargo.toml b/testing/execution_engine_integration/Cargo.toml index 6de108fcb69..7f66658f0fa 100644 --- a/testing/execution_engine_integration/Cargo.toml +++ b/testing/execution_engine_integration/Cargo.toml @@ -4,12 +4,12 @@ version = "0.1.0" edition = { workspace = true } [dependencies] +async-channel = { workspace = true } tempfile = { workspace = true } serde_json = { workspace = true } task_executor = { workspace = true } tokio = { workspace = true } futures = { workspace = true } -exit-future = { workspace = true } environment = { workspace = true } execution_layer = { workspace = true } sensitive_url = { workspace = true } @@ -24,4 +24,4 @@ fork_choice = { workspace = true } logging = { workspace = true } [features] -portable = ["types/portable"] \ No newline at end of file +portable = ["types/portable"] diff --git a/testing/execution_engine_integration/src/test_rig.rs b/testing/execution_engine_integration/src/test_rig.rs index b0701e80a1b..6be9a34d3dc 100644 --- a/testing/execution_engine_integration/src/test_rig.rs +++ b/testing/execution_engine_integration/src/test_rig.rs @@ -42,7 +42,7 @@ pub struct TestRig { ee_a: ExecutionPair, ee_b: ExecutionPair, spec: ChainSpec, - _runtime_shutdown: exit_future::Signal, + _runtime_shutdown: async_channel::Sender<()>, } /// Import a private key into the execution engine and unlock it so that we can @@ -111,7 +111,7 @@ impl TestRig { .build() .unwrap(), ); - let (runtime_shutdown, exit) = exit_future::signal(); + let (runtime_shutdown, exit) = async_channel::bounded(1); let (shutdown_tx, _) = futures::channel::mpsc::channel(1); let executor = TaskExecutor::new(Arc::downgrade(&runtime), exit, log.clone(), shutdown_tx); let mut spec = TEST_FORK.make_genesis_spec(MainnetEthSpec::default_spec()); diff --git a/testing/web3signer_tests/Cargo.toml b/testing/web3signer_tests/Cargo.toml index 38b775b3928..1bdf62cd22e 100644 --- a/testing/web3signer_tests/Cargo.toml +++ b/testing/web3signer_tests/Cargo.toml @@ -8,6 +8,7 @@ edition = { workspace = true } [dependencies] [dev-dependencies] +async-channel = { workspace = true } eth2_keystore = { workspace = true } types = { workspace = true } tempfile = { workspace = true } @@ -17,7 +18,6 @@ url = { workspace = true } validator_client = { workspace = true } slot_clock = { workspace = true } futures = { workspace = true } -exit-future = { workspace = true } task_executor = { workspace = true } environment = { workspace = true } account_utils = { workspace = true } diff --git a/testing/web3signer_tests/src/lib.rs b/testing/web3signer_tests/src/lib.rs index 8feea1fd7f2..19637c71263 100644 --- a/testing/web3signer_tests/src/lib.rs +++ b/testing/web3signer_tests/src/lib.rs @@ -307,8 +307,7 @@ mod tests { validator_store: Arc>, _validator_dir: TempDir, runtime: Arc, - _runtime_shutdown: exit_future::Signal, - using_web3signer: bool, + _runtime_shutdown: async_channel::Sender<()>, } impl ValidatorStoreRig { @@ -340,7 +339,7 @@ mod tests { .build() .unwrap(), ); - let (runtime_shutdown, exit) = exit_future::signal(); + let (runtime_shutdown, exit) = async_channel::bounded(1); let (shutdown_tx, _) = futures::channel::mpsc::channel(1); let executor = TaskExecutor::new(Arc::downgrade(&runtime), exit, log.clone(), shutdown_tx); diff --git a/validator_client/Cargo.toml b/validator_client/Cargo.toml index 90a82b7e3b2..ba50021c832 100644 --- a/validator_client/Cargo.toml +++ b/validator_client/Cargo.toml @@ -30,7 +30,6 @@ directory = { workspace = true } lockfile = { workspace = true } environment = { workspace = true } parking_lot = { workspace = true } -exit-future = { workspace = true } filesystem = { workspace = true } hex = { workspace = true } deposit_contract = { workspace = true }