diff --git a/consensus/scp/tests/mock_network/cyclic_topology.rs b/consensus/scp/tests/mock_network/cyclic_topology.rs index 7b162fea0b..9d6691854e 100644 --- a/consensus/scp/tests/mock_network/cyclic_topology.rs +++ b/consensus/scp/tests/mock_network/cyclic_topology.rs @@ -8,6 +8,9 @@ use crate::mock_network; +use mc_common::{HashSet, NodeID}; +use mc_consensus_scp::{QuorumSet, test_utils}; + /////////////////////////////////////////////////////////////////////////////// // Cyclic Topology (similar to Figure 4 in the SCP whitepaper) /////////////////////////////////////////////////////////////////////////////// @@ -15,23 +18,26 @@ use crate::mock_network; /// Constructs a cyclic network (e.g. 1->2->3->4->1) pub fn directed_cycle(num_nodes: usize) -> mock_network::Network { let mut nodes = Vec::::new(); - for node_id in 0..num_nodes { - let next_node_id: u32 = if node_id + 1 < num_nodes { - node_id as u32 + 1 - } else { - 0 + for node_index in 0..num_nodes { + let next_node_id: NodeID = { + if node_index + 1 < num_nodes { + test_utils::test_node_id(node_index as u32 + 1) + } else { + test_utils::test_node_id(0) + } }; - let other_node_ids: Vec = (0..num_nodes) - .filter(|other_node_id| other_node_id != &node_id) - .map(|other_node_id| other_node_id as u32) - .collect(); + let peers_vector = (0..num_nodes) + .filter(|other_node_index| other_node_index != &node_index) + .map(|other_node_index| test_utils::test_node_id(other_node_index as u32)) + .collect::>(); nodes.push(mock_network::NodeOptions::new( - other_node_ids, - vec![next_node_id], - 1, + test_utils::test_node_id(node_index as u32), + peers_vector.iter().cloned().collect::>(), + QuorumSet::new_with_node_ids(1, vec![next_node_id]), )); } + mock_network::Network::new(format!("cyclic{}", num_nodes), nodes) } diff --git a/consensus/scp/tests/mock_network/mesh_topology.rs b/consensus/scp/tests/mock_network/mesh_topology.rs index c1c85db844..c1d336daee 100644 --- a/consensus/scp/tests/mock_network/mesh_topology.rs +++ b/consensus/scp/tests/mock_network/mesh_topology.rs @@ -8,6 +8,9 @@ use crate::mock_network; +use mc_common::{HashSet, NodeID}; +use mc_consensus_scp::{QuorumSet, test_utils}; + /////////////////////////////////////////////////////////////////////////////// /// Mesh tests /// (N nodes, each node has all other nodes as it's validators) @@ -16,17 +19,18 @@ use crate::mock_network; /// Constructs a mesh network, where each node has all of it's peers as validators. pub fn dense_mesh(num_nodes: usize, k: usize) -> mock_network::Network { let mut nodes = Vec::::new(); - for node_id in 0..num_nodes { - let other_node_ids: Vec = (0..num_nodes) - .filter(|other_node_id| other_node_id != &node_id) - .map(|other_node_id| other_node_id as u32) - .collect(); + for node_index in 0..num_nodes { + let peers_vector = (0..num_nodes) + .filter(|other_node_index| other_node_index != &node_index) + .map(|other_node_index| test_utils::test_node_id(other_node_index as u32)) + .collect::>(); nodes.push(mock_network::NodeOptions::new( - other_node_ids.clone(), - other_node_ids, - k as u32, + test_utils::test_node_id(node_index as u32), + peers_vector.iter().cloned().collect::>(), + QuorumSet::new_with_node_ids(k as u32, peers_vector), )); } + mock_network::Network::new(format!("m{}k{}", num_nodes, k), nodes) } diff --git a/consensus/scp/tests/mock_network/mod.rs b/consensus/scp/tests/mock_network/mod.rs index 3cf3367092..cc51be2b30 100644 --- a/consensus/scp/tests/mock_network/mod.rs +++ b/consensus/scp/tests/mock_network/mod.rs @@ -44,7 +44,8 @@ pub struct TestOptions { /// multiple places in the ledger, and that the ledger will contain more than values_to_submit pub values_to_submit: usize, - /// Approximate rate that values are submitted to nodes. + /// Approximate rate that values are submitted to nodes. Unless we are testing slow submission + /// is it better to set this quite high. pub submissions_per_sec: u64, /// We nominate up to this many values from our pending set per slot. @@ -72,7 +73,7 @@ impl TestOptions { Self { submit_in_parallel: true, values_to_submit: 5000, - submissions_per_sec: 10000, + submissions_per_sec: 20000, max_pending_values_to_nominate: 100, allowed_test_time: Duration::from_secs(300), log_flush_delay: Duration::from_millis(50), @@ -86,17 +87,22 @@ impl TestOptions { // Describes one simulated node #[derive(Clone)] pub struct NodeOptions { - peers: Vec, - validators: Vec, - k: u32, + /// This node's id + id: NodeID, + + /// The nodes to which this node broadcasts + peers: HashSet, + + /// This node's quorum set + quorum_set: QuorumSet, } impl NodeOptions { - pub fn new(peers: Vec, validators: Vec, k: u32) -> Self { + pub fn new(id: NodeID, peers: HashSet, quorum_set: QuorumSet) -> Self { Self { + id, peers, - validators, - k, + quorum_set, } } } @@ -131,68 +137,63 @@ impl SimulatedNetwork { logger: logger.clone(), }; - for (node_index, options_for_this_node) in network.nodes.iter().enumerate() { - let validators = options_for_this_node - .validators - .iter() - .map(|id| test_utils::test_node_id(*id as u32)) - .collect::>(); - - let qs = QuorumSet::new_with_node_ids(options_for_this_node.k, validators); - - let peers = options_for_this_node - .peers - .iter() - .map(|id| test_utils::test_node_id(*id as u32)) - .collect::>(); - - let node_id = test_utils::test_node_id(node_index as u32); - - assert!(!peers.contains(&node_id)); + for node_options in network.nodes.iter() { + assert!(!node_options.peers.contains(&node_options.id)); let nodes_map_clone = Arc::clone(&simulation.nodes_map); + let peers_clone = node_options.peers.clone(); let (node, join_handle_option) = SimulatedNode::new( - format!("{}-{}", network.name, node_index), - node_id.clone(), - qs, + format!("{}-{}", network.name, node_options.id.clone()), + node_options.id.clone(), + node_options.quorum_set.clone(), test_options, - Arc::new(move |logger, msg| { - SimulatedNetwork::broadcast_msg(logger, &nodes_map_clone, &peers, msg) + Arc::new( move |logger, msg| { + SimulatedNetwork::broadcast_msg( + logger, + &nodes_map_clone, + &peers_clone, + msg + ) }), - logger.new(o!("mc.local_node_id" => node_id.to_string())), + logger.new(o!("mc.local_node_id" => node_options.id.to_string())), ); simulation.handle_map.insert( - node_id.clone(), + node_options.id.clone(), join_handle_option.expect("thread failed to spawn"), ); simulation .shared_data_map - .insert(node_id.clone(), node.shared_data.clone()); + .insert(node_options.id.clone(), node.shared_data.clone()); simulation .nodes_map .lock() .expect("lock failed on nodes_map inserting node") - .insert(node_id.clone(), node); + .insert(node_options.id.clone(), node); } simulation } fn stop_all(&mut self) { + let mut nodes_map = self .nodes_map .lock() .expect("lock failed on nodes_map in stop_all"); - - for (_node_id, node) in nodes_map.iter_mut() { + let mut node_ids: Vec = Vec::new(); + for (node_id, node) in nodes_map.iter_mut() { + log::trace!(self.logger, "sending stop to {}", node_id); node.send_stop(); + node_ids.push(node_id.clone()); } + drop(nodes_map); + + for node_id in node_ids { + log::trace!(self.logger, "joining {}", node_id); - // now join the threads - for node_id in nodes_map.keys() { self.handle_map - .remove(node_id) + .remove(&node_id) .expect("thread handle is missing") .join() .expect("SimulatedNode join failed"); @@ -540,15 +541,12 @@ pub fn build_and_test(network: &Network, test_options: &TestOptions, logger: Log test_options.values_to_submit ); - // pre-compute node_ids - let mut node_ids = Vec::::with_capacity(network.nodes.len()); - for n in 0..network.nodes.len() { - node_ids.push(test_utils::test_node_id(n as u32)); - } + // get a vector of the node_ids + let node_ids: Vec = network.nodes.iter().map(|n| n.id.clone()).collect(); // check that all ledgers start empty - for node_id in node_ids.iter() { - assert!(simulation.get_ledger_size(&node_id) == 0); + for n in 0..network.nodes.len() { + assert!(simulation.get_ledger_size(&node_ids[n]) == 0); } // push values diff --git a/consensus/scp/tests/mock_network/optimization.rs b/consensus/scp/tests/mock_network/optimization.rs index 923ef9d0be..697518eb04 100644 --- a/consensus/scp/tests/mock_network/optimization.rs +++ b/consensus/scp/tests/mock_network/optimization.rs @@ -4,7 +4,7 @@ // Optimization takes a long time so we will ignore these tests by default // example: -// export MC_LOG=warn; export OPTIMIZE=1; cargo test --release -- --test-threads=1 2>&1 | tee output.log +// export MC_LOG=warn; export OPTIMIZE_SCP=1; cargo test --release -- --test-threads=1 2>&1 | tee output.log // We allow dead code because not all integration tests use all of the common code. // https://github.com/rust-lang/rust/issues/46379 @@ -20,13 +20,13 @@ use std::{ // we are using a fixed number of iterations for the optimizer // this could be improved someday by observing the runtime settle -const OPTIMIZER_ITERATIONS: usize = 20; +const OPTIMIZER_ITERATIONS: usize = 100; // values to submit for consensus -const VALUES_TO_SUBMIT: usize = 1000; +const VALUES_TO_SUBMIT: usize = 5000; // panic if any iteration requires more than the allowed time -const ALLOWED_TEST_TIME: Duration = Duration::from_secs(90); +const ALLOWED_TEST_TIME: Duration = Duration::from_secs(180); // Optimization range limits const MIN_SUBMISSIONS_PER_SEC: f64 = 5000.0; @@ -40,10 +40,10 @@ const MAX_SCP_TIMEBASE_MSEC: f64 = 4000.0; /// Support skipping optimization tests based on environment variables pub fn skip_optimization() -> bool { - std::env::var("OPTIMIZE").is_err() + std::env::var("OPTIMIZE_SCP").is_err() } -/// Measures runtime in msec for a mock network +/// Measures run time in msec for a mock network pub fn mock_network_optimizer( network: &mock_network::Network, parameters_to_vary: Vec, @@ -52,6 +52,8 @@ pub fn mock_network_optimizer( scp_timebase_millis_f64: f64, logger: Logger, ) -> f64 { + let start = Instant::now(); + let mut test_options = mock_network::TestOptions::new(); test_options.values_to_submit = VALUES_TO_SUBMIT; test_options.allowed_test_time = ALLOWED_TEST_TIME; @@ -74,18 +76,29 @@ pub fn mock_network_optimizer( let v1 = test_options.max_pending_values_to_nominate; let v2 = test_options.scp_timebase.as_millis(); - let start = Instant::now(); + let run_time_start = Instant::now(); mock_network::build_and_test(&network, &test_options, logger.clone()); - let runtime = start.elapsed().as_millis(); + let run_time = run_time_start.elapsed().as_millis(); // observe progress - log::info!(logger, "optimizer: {}, {}, {}, {}", v0, v1, v2, runtime,); - - return runtime as f64; + log::warn!( + logger, + "{}, {}, {}, {}, {}, {}, {}, {}, {}, , ", + network.name, + VALUES_TO_SUBMIT, + run_time, + (VALUES_TO_SUBMIT as f64 * 1000.0) / ( run_time as f64 ), + 1, + start.elapsed().as_millis(), + v0, + v1, + v2, + ); + return run_time as f64; } -// optimize performance over submission rate, submissions per slot, and scp timebase -pub fn optimize(network: &mock_network::Network, parameters_to_vary: Vec, logger: Logger) { +// simplex style optimization +fn optimize_simplers(network: &mock_network::Network, parameters_to_vary: Vec, logger: Logger) { let start = Instant::now(); let f = |v: &[f64]| { @@ -124,15 +137,95 @@ pub fn optimize(network: &mock_network::Network, parameters_to_vary: Vec, log::warn!( logger, - "{}, {}, {}, {}, {}, {}, {}, {}, {:?}", + "{}, {}, {}, {}, {}, {}, {}, {}, {}, {:?},", network.name, + VALUES_TO_SUBMIT, min_value, + (VALUES_TO_SUBMIT as f64 * 1000.0) / min_value, + OPTIMIZER_ITERATIONS, + start.elapsed().as_millis(), u64::try_from(c0.trunc() as i64).unwrap(), usize::try_from(c1.trunc() as i64).unwrap(), u64::try_from(c2.trunc() as i64).unwrap(), + input_interval, + ); +} + +// brute force optimization +fn optimize_grid_search(network: &mock_network::Network, parameters_to_vary: Vec, logger: Logger) { + let start = Instant::now(); + + let mut d:usize = 0; + if parameters_to_vary[0] { + d = 0; + } + if parameters_to_vary[1] { + d = 1; + } + if parameters_to_vary[2] { + d = 2; + } + + let f = |v: &[f64]| { + mock_network_optimizer( + &network, + parameters_to_vary.clone(), + v[0], + v[1], + v[2], + logger.clone(), + ) + }; + + let input_interval: Vec<(f64, f64)> = vec![ + (MIN_SUBMISSIONS_PER_SEC, MAX_SUBMISSIONS_PER_SEC), + (MIN_VALUES_PER_SLOT, MAX_VALUES_PER_SLOT), + (MIN_SCP_TIMEBASE_MSEC, MAX_SCP_TIMEBASE_MSEC), + ]; + + let default_options = mock_network::TestOptions::new(); + let c0 = default_options.submissions_per_sec as f64; + let c1 = default_options.max_pending_values_to_nominate as f64; + let c2 = default_options.scp_timebase.as_millis() as f64; + + let mut min_value = std::f64::MAX; + let mut coordinates = vec![c0,c1,c2]; + for i in 0..OPTIMIZER_ITERATIONS { + let (min,max) = input_interval[d]; + let v_i: f64 = min + (i as f64) / (OPTIMIZER_ITERATIONS as f64) * max; + let mut v = vec![c0,c1,c2]; + v[d] = v_i; + let run_time = f(&v); + if run_time <= min_value{ + min_value = run_time; + coordinates = v; + } + } + + log::warn!( + logger, + "{}, {}, {}, {}, {}, {}, {}, {}, {}, {:?},", + network.name, + VALUES_TO_SUBMIT, + min_value, (VALUES_TO_SUBMIT as f64 * 1000.0) / min_value, - start.elapsed().as_millis(), OPTIMIZER_ITERATIONS, + start.elapsed().as_millis(), + u64::try_from(coordinates[0].trunc() as i64).unwrap(), + usize::try_from(coordinates[1].trunc() as i64).unwrap(), + u64::try_from(coordinates[2].trunc() as i64).unwrap(), input_interval, ); } + +// optimize performance over submission rate, submissions per slot, and scp timebase +pub fn optimize(network: &mock_network::Network, parameters_to_vary: Vec, logger: Logger) { + let dimensions = parameters_to_vary.iter().fold(0, |d, is_varied| d + *is_varied as usize); + if dimensions == 0 { + return; // probably not intended? + } + if dimensions == 1 { + return optimize_grid_search(network, parameters_to_vary, logger) + } + optimize_simplers(network, parameters_to_vary, logger) +} diff --git a/consensus/scp/tests/optimize_cyclic_networks.rs b/consensus/scp/tests/optimize_cyclic_networks.rs index a85068f420..395e15c95a 100644 --- a/consensus/scp/tests/optimize_cyclic_networks.rs +++ b/consensus/scp/tests/optimize_cyclic_networks.rs @@ -38,27 +38,6 @@ fn optimize_scp_timebase(logger: Logger) { optimize_cyclic_helper(parameters_to_vary, logger); } -#[test_with_logger] -#[serial] -fn optimize_11(logger: Logger) { - let parameters_to_vary = vec![true, true, false]; - optimize_cyclic_helper(parameters_to_vary, logger); -} - -#[test_with_logger] -#[serial] -fn optimize_23(logger: Logger) { - let parameters_to_vary = vec![false, true, true]; - optimize_cyclic_helper(parameters_to_vary, logger); -} - -#[test_with_logger] -#[serial] -fn optimize_31(logger: Logger) { - let parameters_to_vary = vec![true, false, true]; - optimize_cyclic_helper(parameters_to_vary, logger); -} - #[test_with_logger] #[serial] fn optimize_all(logger: Logger) { diff --git a/consensus/scp/tests/optimize_mesh_networks.rs b/consensus/scp/tests/optimize_mesh_networks.rs index e9a48eb88d..566ce46141 100644 --- a/consensus/scp/tests/optimize_mesh_networks.rs +++ b/consensus/scp/tests/optimize_mesh_networks.rs @@ -44,27 +44,6 @@ fn optimize_scp_timebase(logger: Logger) { optimize_mesh_helper(parameters_to_vary, logger); } -#[test_with_logger] -#[serial] -fn optimize_11(logger: Logger) { - let parameters_to_vary = vec![true, true, false]; - optimize_mesh_helper(parameters_to_vary, logger); -} - -#[test_with_logger] -#[serial] -fn optimize_23(logger: Logger) { - let parameters_to_vary = vec![false, true, true]; - optimize_mesh_helper(parameters_to_vary, logger); -} - -#[test_with_logger] -#[serial] -fn optimize_31(logger: Logger) { - let parameters_to_vary = vec![true, false, true]; - optimize_mesh_helper(parameters_to_vary, logger); -} - #[test_with_logger] #[serial] fn optimize_all(logger: Logger) {