From 60a430b34c7f6d775e3ac6181917a5ed158253d5 Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Thu, 13 Jul 2023 17:25:04 +1000 Subject: [PATCH 1/3] Add retry to eth1 sim tests --- Cargo.lock | 1 + testing/node_test_rig/Cargo.toml | 1 + testing/node_test_rig/src/lib.rs | 34 ++++- testing/simulator/src/eth1_sim.rs | 170 +++++++++++++++---------- testing/simulator/src/local_network.rs | 24 +++- testing/simulator/src/main.rs | 1 + testing/simulator/src/no_eth1_sim.rs | 4 +- testing/simulator/src/retry.rs | 62 +++++++++ testing/simulator/src/sync_sim.rs | 4 +- 9 files changed, 223 insertions(+), 78 deletions(-) create mode 100644 testing/simulator/src/retry.rs diff --git a/Cargo.lock b/Cargo.lock index 532f7ff204..b780f3565e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5521,6 +5521,7 @@ dependencies = [ "execution_layer", "sensitive_url", "tempfile", + "tokio", "types", "validator_client", "validator_dir", diff --git a/testing/node_test_rig/Cargo.toml b/testing/node_test_rig/Cargo.toml index ea5d005c16..ac77349c58 100644 --- a/testing/node_test_rig/Cargo.toml +++ b/testing/node_test_rig/Cargo.toml @@ -14,3 +14,4 @@ validator_client = { path = "../../validator_client" } validator_dir = { path = "../../common/validator_dir", features = ["insecure_keys"] } sensitive_url = { path = "../../common/sensitive_url" } execution_layer = { path = "../../beacon_node/execution_layer" } +tokio = { version = "1.14.0", features = ["time"] } diff --git a/testing/node_test_rig/src/lib.rs b/testing/node_test_rig/src/lib.rs index d4fd115bec..22c9168d10 100644 --- a/testing/node_test_rig/src/lib.rs +++ b/testing/node_test_rig/src/lib.rs @@ -10,6 +10,7 @@ use std::path::PathBuf; use std::time::Duration; use std::time::{SystemTime, UNIX_EPOCH}; use tempfile::{Builder as TempBuilder, TempDir}; +use tokio::time::timeout; use types::EthSpec; use validator_client::ProductionValidatorClient; use validator_dir::insecure_keys::build_deterministic_validator_dirs; @@ -24,6 +25,8 @@ pub use validator_client::Config as ValidatorConfig; /// The global timeout for HTTP requests to the beacon node. const HTTP_TIMEOUT: Duration = Duration::from_secs(4); +/// The timeout for a beacon node to start up. +const STARTUP_TIMEOUT: Duration = Duration::from_secs(60); /// Provides a beacon node that is running in the current process on a given tokio executor (it /// is _local_ to this process). @@ -34,6 +37,18 @@ pub struct LocalBeaconNode { pub datadir: TempDir, } +#[derive(Debug)] +pub enum LocalBeaconNodeError { + TimeoutError, + StartupError(String), +} + +impl ToString for LocalBeaconNodeError { + fn to_string(&self) -> String { + format!("{:?}", self) + } +} + impl LocalBeaconNode { /// Starts a new, production beacon node on the tokio runtime in the given `context`. /// @@ -41,7 +56,7 @@ impl LocalBeaconNode { pub async fn production( context: RuntimeContext, mut client_config: ClientConfig, - ) -> Result { + ) -> Result { // Creates a temporary directory that will be deleted once this `TempDir` is dropped. let datadir = TempBuilder::new() .prefix("lighthouse_node_test_rig") @@ -51,12 +66,17 @@ impl LocalBeaconNode { client_config.set_data_dir(datadir.path().into()); client_config.network.network_dir = PathBuf::from(datadir.path()).join("network"); - ProductionBeaconNode::new(context, client_config) - .await - .map(move |client| Self { - client: client.into_inner(), - datadir, - }) + timeout( + STARTUP_TIMEOUT, + ProductionBeaconNode::new(context, client_config), + ) + .await + .map_err(|_| LocalBeaconNodeError::TimeoutError)? + .map(move |client| Self { + client: client.into_inner(), + datadir, + }) + .map_err(LocalBeaconNodeError::StartupError) } } diff --git a/testing/simulator/src/eth1_sim.rs b/testing/simulator/src/eth1_sim.rs index 3e764d27d0..2a992cc34b 100644 --- a/testing/simulator/src/eth1_sim.rs +++ b/testing/simulator/src/eth1_sim.rs @@ -1,14 +1,16 @@ use crate::local_network::{EXECUTION_PORT, TERMINAL_BLOCK, TERMINAL_DIFFICULTY}; -use crate::{checks, LocalNetwork, E}; +use crate::{checks, LocalNetwork}; use clap::ArgMatches; use eth1::{Eth1Endpoint, DEFAULT_CHAIN_ID}; use eth1_test_rig::AnvilEth1Instance; +use crate::retry::with_retry; use execution_layer::http::deposit_methods::Eth1Id; use futures::prelude::*; +use node_test_rig::environment::RuntimeContext; use node_test_rig::{ environment::{EnvironmentBuilder, LoggerConfig}, - testing_client_config, testing_validator_config, ClientGenesis, ValidatorFiles, + testing_client_config, testing_validator_config, ClientConfig, ClientGenesis, ValidatorFiles, }; use rayon::prelude::*; use sensitive_url::SensitiveUrl; @@ -107,71 +109,24 @@ pub fn run_eth1_sim(matches: &ArgMatches) -> Result<(), String> { let context = env.core_context(); let main_future = async { - /* - * Deploy the deposit contract, spawn tasks to keep creating new blocks and deposit - * validators. - */ - let anvil_eth1_instance = AnvilEth1Instance::new(DEFAULT_CHAIN_ID.into()).await?; - let deposit_contract = anvil_eth1_instance.deposit_contract; - let chain_id = anvil_eth1_instance.anvil.chain_id(); - let anvil = anvil_eth1_instance.anvil; - let eth1_endpoint = SensitiveUrl::parse(anvil.endpoint().as_str()) - .expect("Unable to parse anvil endpoint."); - let deposit_contract_address = deposit_contract.address(); - - // Start a timer that produces eth1 blocks on an interval. - tokio::spawn(async move { - let mut interval = tokio::time::interval(eth1_block_time); - loop { - interval.tick().await; - let _ = anvil.evm_mine().await; - } - }); - - // Submit deposits to the deposit contract. - tokio::spawn(async move { - for i in 0..total_validator_count { - println!("Submitting deposit for validator {}...", i); - let _ = deposit_contract - .deposit_deterministic_async::(i, deposit_amount) - .await; - } - }); - - let mut beacon_config = testing_client_config(); - - beacon_config.genesis = ClientGenesis::DepositContract; - beacon_config.eth1.endpoint = Eth1Endpoint::NoAuth(eth1_endpoint); - beacon_config.eth1.deposit_contract_address = deposit_contract_address; - beacon_config.eth1.deposit_contract_deploy_block = 0; - beacon_config.eth1.lowest_cached_block_number = 0; - beacon_config.eth1.follow_distance = 1; - beacon_config.eth1.node_far_behind_seconds = 20; - beacon_config.dummy_eth1_backend = false; - beacon_config.sync_eth1_chain = true; - beacon_config.eth1.auto_update_interval_millis = eth1_block_time.as_millis() as u64; - beacon_config.eth1.chain_id = Eth1Id::from(chain_id); - beacon_config.network.target_peers = node_count + proposer_nodes - 1; - - beacon_config.network.enr_address = (Some(Ipv4Addr::LOCALHOST), None); - - if post_merge_sim { - let el_config = execution_layer::Config { - execution_endpoints: vec![SensitiveUrl::parse(&format!( - "http://localhost:{}", - EXECUTION_PORT - )) - .unwrap()], - ..Default::default() - }; - - beacon_config.execution_layer = Some(el_config); - } - /* * Create a new `LocalNetwork` with one beacon node. */ - let network = LocalNetwork::new(context.clone(), beacon_config.clone()).await?; + let max_retries = 3; + let (network, beacon_config) = with_retry(max_retries, || { + Box::pin(create_local_network( + LocalNetworkParams { + eth1_block_time, + total_validator_count, + deposit_amount, + node_count, + proposer_nodes, + post_merge_sim, + }, + context.clone(), + )) + }) + .await?; /* * One by one, add beacon nodes to the network. @@ -341,3 +296,90 @@ pub fn run_eth1_sim(matches: &ArgMatches) -> Result<(), String> { Ok(()) } + +struct LocalNetworkParams { + eth1_block_time: Duration, + total_validator_count: usize, + deposit_amount: u64, + node_count: usize, + proposer_nodes: usize, + post_merge_sim: bool, +} + +async fn create_local_network( + LocalNetworkParams { + eth1_block_time, + total_validator_count, + deposit_amount, + node_count, + proposer_nodes, + post_merge_sim, + }: LocalNetworkParams, + context: RuntimeContext, +) -> Result<(LocalNetwork, ClientConfig), String> { + /* + * Deploy the deposit contract, spawn tasks to keep creating new blocks and deposit + * validators. + */ + let anvil_eth1_instance = AnvilEth1Instance::new(DEFAULT_CHAIN_ID.into()).await?; + let deposit_contract = anvil_eth1_instance.deposit_contract; + let chain_id = anvil_eth1_instance.anvil.chain_id(); + let anvil = anvil_eth1_instance.anvil; + let eth1_endpoint = + SensitiveUrl::parse(anvil.endpoint().as_str()).expect("Unable to parse anvil endpoint."); + let deposit_contract_address = deposit_contract.address(); + + // Start a timer that produces eth1 blocks on an interval. + tokio::spawn(async move { + let mut interval = tokio::time::interval(eth1_block_time); + loop { + interval.tick().await; + let _ = anvil.evm_mine().await; + } + }); + + // Submit deposits to the deposit contract. + tokio::spawn(async move { + for i in 0..total_validator_count { + println!("Submitting deposit for validator {}...", i); + let _ = deposit_contract + .deposit_deterministic_async::(i, deposit_amount) + .await; + } + }); + + let mut beacon_config = testing_client_config(); + + beacon_config.genesis = ClientGenesis::DepositContract; + beacon_config.eth1.endpoint = Eth1Endpoint::NoAuth(eth1_endpoint); + beacon_config.eth1.deposit_contract_address = deposit_contract_address; + beacon_config.eth1.deposit_contract_deploy_block = 0; + beacon_config.eth1.lowest_cached_block_number = 0; + beacon_config.eth1.follow_distance = 1; + beacon_config.eth1.node_far_behind_seconds = 20; + beacon_config.dummy_eth1_backend = false; + beacon_config.sync_eth1_chain = true; + beacon_config.eth1.auto_update_interval_millis = eth1_block_time.as_millis() as u64; + beacon_config.eth1.chain_id = Eth1Id::from(chain_id); + beacon_config.network.target_peers = node_count + proposer_nodes - 1; + + beacon_config.network.enr_address = (Some(Ipv4Addr::LOCALHOST), None); + + if post_merge_sim { + let el_config = execution_layer::Config { + execution_endpoints: vec![SensitiveUrl::parse(&format!( + "http://localhost:{}", + EXECUTION_PORT + )) + .unwrap()], + ..Default::default() + }; + + beacon_config.execution_layer = Some(el_config); + } + + let network = LocalNetwork::new(context, beacon_config.clone()) + .await + .map_err(|e| e.to_string())?; + Ok((network, beacon_config)) +} diff --git a/testing/simulator/src/local_network.rs b/testing/simulator/src/local_network.rs index e35870d126..f00207fd0e 100644 --- a/testing/simulator/src/local_network.rs +++ b/testing/simulator/src/local_network.rs @@ -1,8 +1,8 @@ use node_test_rig::{ environment::RuntimeContext, eth2::{types::StateId, BeaconNodeHttpClient}, - ClientConfig, LocalBeaconNode, LocalExecutionNode, LocalValidatorClient, MockExecutionConfig, - MockServerConfig, ValidatorConfig, ValidatorFiles, + ClientConfig, LocalBeaconNode, LocalBeaconNodeError, LocalExecutionNode, LocalValidatorClient, + MockExecutionConfig, MockServerConfig, ValidatorConfig, ValidatorFiles, }; use parking_lot::RwLock; use sensitive_url::SensitiveUrl; @@ -53,12 +53,23 @@ impl Deref for LocalNetwork { } } +#[derive(Debug)] +pub enum LocalNetworkError { + LocalBeaconNodeError(LocalBeaconNodeError), +} + +impl ToString for LocalNetworkError { + fn to_string(&self) -> String { + format!("{:?}", self) + } +} + impl LocalNetwork { /// Creates a new network with a single `BeaconNode` and a connected `ExecutionNode`. pub async fn new( context: RuntimeContext, mut beacon_config: ClientConfig, - ) -> Result { + ) -> Result { beacon_config.network.set_ipv4_listening_address( std::net::Ipv4Addr::UNSPECIFIED, BOOTNODE_PORT, @@ -93,7 +104,8 @@ impl LocalNetwork { let beacon_node = LocalBeaconNode::production(context.service_context("boot_node".into()), beacon_config) - .await?; + .await + .map_err(LocalNetworkError::LocalBeaconNodeError)?; Ok(Self { inner: Arc::new(Inner { context, @@ -188,7 +200,9 @@ impl LocalNetwork { self.context.service_context(format!("node_{}", count)), beacon_config, ) - .await?; + .await + .map_err(|e| e.to_string())?; + if is_proposer { self_1.proposer_nodes.write().push(beacon_node); } else { diff --git a/testing/simulator/src/main.rs b/testing/simulator/src/main.rs index a19777c5ab..e8af9c1806 100644 --- a/testing/simulator/src/main.rs +++ b/testing/simulator/src/main.rs @@ -21,6 +21,7 @@ mod cli; mod eth1_sim; mod local_network; mod no_eth1_sim; +mod retry; mod sync_sim; use cli::cli_app; diff --git a/testing/simulator/src/no_eth1_sim.rs b/testing/simulator/src/no_eth1_sim.rs index fc18b1cd48..1eab761e7b 100644 --- a/testing/simulator/src/no_eth1_sim.rs +++ b/testing/simulator/src/no_eth1_sim.rs @@ -95,7 +95,9 @@ pub fn run_no_eth1_sim(matches: &ArgMatches) -> Result<(), String> { beacon_config.network.enr_address = (Some(Ipv4Addr::LOCALHOST), None); let main_future = async { - let network = LocalNetwork::new(context.clone(), beacon_config.clone()).await?; + let network = LocalNetwork::new(context.clone(), beacon_config.clone()) + .await + .map_err(|e| e.to_string())?; /* * One by one, add beacon nodes to the network. */ diff --git a/testing/simulator/src/retry.rs b/testing/simulator/src/retry.rs new file mode 100644 index 0000000000..5004449281 --- /dev/null +++ b/testing/simulator/src/retry.rs @@ -0,0 +1,62 @@ +use std::fmt::Debug; +use std::future::Future; +use std::pin::Pin; + +/// Executes the function with a specified number of retries if the function returns an error. +/// Once it exceeds `max_retries` and still fails, the error is returned. +pub async fn with_retry(max_retries: usize, mut func: F) -> Result +where + F: FnMut() -> Pin>>>, + E: Debug, +{ + let mut retry_count = 0; + loop { + let result = Box::pin(func()).await; + if result.is_ok() || retry_count >= max_retries { + break result; + } + if let Err(e) = result { + eprintln!( + "Operation failed with error {:?}, retrying {} of {}", + e, retry_count, max_retries + ); + } + retry_count += 1; + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::collections::VecDeque; + + async fn my_async_func(is_ok: bool) -> Result<(), ()> { + if is_ok { + Ok(()) + } else { + Err(()) + } + } + + #[tokio::test] + async fn test_with_retry_ok() { + let res = with_retry(3, || Box::pin(my_async_func(true))).await; + assert!(res.is_ok()); + } + + #[tokio::test] + async fn test_with_retry_2nd_ok() { + let mut mock_results = VecDeque::from([false, true]); + let res = with_retry(3, || { + Box::pin(my_async_func(mock_results.pop_front().unwrap())) + }) + .await; + assert!(res.is_ok()); + } + + #[tokio::test] + async fn test_with_retry_fail() { + let res = with_retry(3, || Box::pin(my_async_func(false))).await; + assert!(res.is_err()); + } +} diff --git a/testing/simulator/src/sync_sim.rs b/testing/simulator/src/sync_sim.rs index 78f7e1ee9f..83327f28cf 100644 --- a/testing/simulator/src/sync_sim.rs +++ b/testing/simulator/src/sync_sim.rs @@ -106,7 +106,9 @@ fn syncing_sim( /* * Create a new `LocalNetwork` with one beacon node. */ - let network = LocalNetwork::new(context, beacon_config.clone()).await?; + let network = LocalNetwork::new(context, beacon_config.clone()) + .await + .map_err(|e| e.to_string())?; /* * Add a validator client which handles all validators from the genesis state. From b6601f78841aac71dbd88017ab9b353684c8a2bd Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Thu, 13 Jul 2023 21:43:47 +1000 Subject: [PATCH 2/3] Reorder retry_count increment so it logs correctly --- testing/simulator/src/retry.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/testing/simulator/src/retry.rs b/testing/simulator/src/retry.rs index 5004449281..a4eb52cea1 100644 --- a/testing/simulator/src/retry.rs +++ b/testing/simulator/src/retry.rs @@ -15,13 +15,14 @@ where if result.is_ok() || retry_count >= max_retries { break result; } + retry_count += 1; + if let Err(e) = result { eprintln!( "Operation failed with error {:?}, retrying {} of {}", e, retry_count, max_retries ); } - retry_count += 1; } } From e9c63054d86d7344db69071b71bb7a4ebeea273f Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Thu, 13 Jul 2023 22:25:38 +1000 Subject: [PATCH 3/3] Remove some unnecessary error types to reduce amount of changes --- testing/node_test_rig/src/lib.rs | 17 ++--------------- testing/simulator/src/eth1_sim.rs | 4 +--- testing/simulator/src/local_network.rs | 24 +++++------------------- testing/simulator/src/no_eth1_sim.rs | 4 +--- testing/simulator/src/sync_sim.rs | 4 +--- 5 files changed, 10 insertions(+), 43 deletions(-) diff --git a/testing/node_test_rig/src/lib.rs b/testing/node_test_rig/src/lib.rs index 22c9168d10..62db67b8c5 100644 --- a/testing/node_test_rig/src/lib.rs +++ b/testing/node_test_rig/src/lib.rs @@ -37,18 +37,6 @@ pub struct LocalBeaconNode { pub datadir: TempDir, } -#[derive(Debug)] -pub enum LocalBeaconNodeError { - TimeoutError, - StartupError(String), -} - -impl ToString for LocalBeaconNodeError { - fn to_string(&self) -> String { - format!("{:?}", self) - } -} - impl LocalBeaconNode { /// Starts a new, production beacon node on the tokio runtime in the given `context`. /// @@ -56,7 +44,7 @@ impl LocalBeaconNode { pub async fn production( context: RuntimeContext, mut client_config: ClientConfig, - ) -> Result { + ) -> Result { // Creates a temporary directory that will be deleted once this `TempDir` is dropped. let datadir = TempBuilder::new() .prefix("lighthouse_node_test_rig") @@ -71,12 +59,11 @@ impl LocalBeaconNode { ProductionBeaconNode::new(context, client_config), ) .await - .map_err(|_| LocalBeaconNodeError::TimeoutError)? + .map_err(|_| format!("Beacon node startup timed out after {:?}", STARTUP_TIMEOUT))? .map(move |client| Self { client: client.into_inner(), datadir, }) - .map_err(LocalBeaconNodeError::StartupError) } } diff --git a/testing/simulator/src/eth1_sim.rs b/testing/simulator/src/eth1_sim.rs index 2a992cc34b..57c944cf1a 100644 --- a/testing/simulator/src/eth1_sim.rs +++ b/testing/simulator/src/eth1_sim.rs @@ -378,8 +378,6 @@ async fn create_local_network( beacon_config.execution_layer = Some(el_config); } - let network = LocalNetwork::new(context, beacon_config.clone()) - .await - .map_err(|e| e.to_string())?; + let network = LocalNetwork::new(context, beacon_config.clone()).await?; Ok((network, beacon_config)) } diff --git a/testing/simulator/src/local_network.rs b/testing/simulator/src/local_network.rs index f00207fd0e..e35870d126 100644 --- a/testing/simulator/src/local_network.rs +++ b/testing/simulator/src/local_network.rs @@ -1,8 +1,8 @@ use node_test_rig::{ environment::RuntimeContext, eth2::{types::StateId, BeaconNodeHttpClient}, - ClientConfig, LocalBeaconNode, LocalBeaconNodeError, LocalExecutionNode, LocalValidatorClient, - MockExecutionConfig, MockServerConfig, ValidatorConfig, ValidatorFiles, + ClientConfig, LocalBeaconNode, LocalExecutionNode, LocalValidatorClient, MockExecutionConfig, + MockServerConfig, ValidatorConfig, ValidatorFiles, }; use parking_lot::RwLock; use sensitive_url::SensitiveUrl; @@ -53,23 +53,12 @@ impl Deref for LocalNetwork { } } -#[derive(Debug)] -pub enum LocalNetworkError { - LocalBeaconNodeError(LocalBeaconNodeError), -} - -impl ToString for LocalNetworkError { - fn to_string(&self) -> String { - format!("{:?}", self) - } -} - impl LocalNetwork { /// Creates a new network with a single `BeaconNode` and a connected `ExecutionNode`. pub async fn new( context: RuntimeContext, mut beacon_config: ClientConfig, - ) -> Result { + ) -> Result { beacon_config.network.set_ipv4_listening_address( std::net::Ipv4Addr::UNSPECIFIED, BOOTNODE_PORT, @@ -104,8 +93,7 @@ impl LocalNetwork { let beacon_node = LocalBeaconNode::production(context.service_context("boot_node".into()), beacon_config) - .await - .map_err(LocalNetworkError::LocalBeaconNodeError)?; + .await?; Ok(Self { inner: Arc::new(Inner { context, @@ -200,9 +188,7 @@ impl LocalNetwork { self.context.service_context(format!("node_{}", count)), beacon_config, ) - .await - .map_err(|e| e.to_string())?; - + .await?; if is_proposer { self_1.proposer_nodes.write().push(beacon_node); } else { diff --git a/testing/simulator/src/no_eth1_sim.rs b/testing/simulator/src/no_eth1_sim.rs index 1eab761e7b..fc18b1cd48 100644 --- a/testing/simulator/src/no_eth1_sim.rs +++ b/testing/simulator/src/no_eth1_sim.rs @@ -95,9 +95,7 @@ pub fn run_no_eth1_sim(matches: &ArgMatches) -> Result<(), String> { beacon_config.network.enr_address = (Some(Ipv4Addr::LOCALHOST), None); let main_future = async { - let network = LocalNetwork::new(context.clone(), beacon_config.clone()) - .await - .map_err(|e| e.to_string())?; + let network = LocalNetwork::new(context.clone(), beacon_config.clone()).await?; /* * One by one, add beacon nodes to the network. */ diff --git a/testing/simulator/src/sync_sim.rs b/testing/simulator/src/sync_sim.rs index 83327f28cf..78f7e1ee9f 100644 --- a/testing/simulator/src/sync_sim.rs +++ b/testing/simulator/src/sync_sim.rs @@ -106,9 +106,7 @@ fn syncing_sim( /* * Create a new `LocalNetwork` with one beacon node. */ - let network = LocalNetwork::new(context, beacon_config.clone()) - .await - .map_err(|e| e.to_string())?; + let network = LocalNetwork::new(context, beacon_config.clone()).await?; /* * Add a validator client which handles all validators from the genesis state.