Skip to content

Commit

Permalink
use linear scan to optimize one dimensional problems (#7)
Browse files Browse the repository at this point in the history
  • Loading branch information
rjwalters authored Jun 17, 2020
1 parent d4095fd commit aabaee2
Show file tree
Hide file tree
Showing 6 changed files with 183 additions and 124 deletions.
30 changes: 18 additions & 12 deletions consensus/scp/tests/mock_network/cyclic_topology.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,30 +8,36 @@

use crate::mock_network;

use mc_common::{HashSet, NodeID};
use mc_consensus_scp::{QuorumSet, test_utils};

///////////////////////////////////////////////////////////////////////////////
// Cyclic Topology (similar to Figure 4 in the SCP whitepaper)
///////////////////////////////////////////////////////////////////////////////

/// Constructs a cyclic network (e.g. 1->2->3->4->1)
pub fn directed_cycle(num_nodes: usize) -> mock_network::Network {
let mut nodes = Vec::<mock_network::NodeOptions>::new();
for node_id in 0..num_nodes {
let next_node_id: u32 = if node_id + 1 < num_nodes {
node_id as u32 + 1
} else {
0
for node_index in 0..num_nodes {
let next_node_id: NodeID = {
if node_index + 1 < num_nodes {
test_utils::test_node_id(node_index as u32 + 1)
} else {
test_utils::test_node_id(0)
}
};

let other_node_ids: Vec<u32> = (0..num_nodes)
.filter(|other_node_id| other_node_id != &node_id)
.map(|other_node_id| other_node_id as u32)
.collect();
let peers_vector = (0..num_nodes)
.filter(|other_node_index| other_node_index != &node_index)
.map(|other_node_index| test_utils::test_node_id(other_node_index as u32))
.collect::<Vec<NodeID>>();

nodes.push(mock_network::NodeOptions::new(
other_node_ids,
vec![next_node_id],
1,
test_utils::test_node_id(node_index as u32),
peers_vector.iter().cloned().collect::<HashSet<NodeID>>(),
QuorumSet::new_with_node_ids(1, vec![next_node_id]),
));
}

mock_network::Network::new(format!("cyclic{}", num_nodes), nodes)
}
20 changes: 12 additions & 8 deletions consensus/scp/tests/mock_network/mesh_topology.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@

use crate::mock_network;

use mc_common::{HashSet, NodeID};
use mc_consensus_scp::{QuorumSet, test_utils};

///////////////////////////////////////////////////////////////////////////////
/// Mesh tests
/// (N nodes, each node has all other nodes as it's validators)
Expand All @@ -16,17 +19,18 @@ use crate::mock_network;
/// Constructs a mesh network, where each node has all of it's peers as validators.
pub fn dense_mesh(num_nodes: usize, k: usize) -> mock_network::Network {
let mut nodes = Vec::<mock_network::NodeOptions>::new();
for node_id in 0..num_nodes {
let other_node_ids: Vec<u32> = (0..num_nodes)
.filter(|other_node_id| other_node_id != &node_id)
.map(|other_node_id| other_node_id as u32)
.collect();
for node_index in 0..num_nodes {
let peers_vector = (0..num_nodes)
.filter(|other_node_index| other_node_index != &node_index)
.map(|other_node_index| test_utils::test_node_id(other_node_index as u32))
.collect::<Vec<NodeID>>();

nodes.push(mock_network::NodeOptions::new(
other_node_ids.clone(),
other_node_ids,
k as u32,
test_utils::test_node_id(node_index as u32),
peers_vector.iter().cloned().collect::<HashSet<NodeID>>(),
QuorumSet::new_with_node_ids(k as u32, peers_vector),
));
}

mock_network::Network::new(format!("m{}k{}", num_nodes, k), nodes)
}
92 changes: 45 additions & 47 deletions consensus/scp/tests/mock_network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ pub struct TestOptions {
/// multiple places in the ledger, and that the ledger will contain more than values_to_submit
pub values_to_submit: usize,

/// Approximate rate that values are submitted to nodes.
/// Approximate rate that values are submitted to nodes. Unless we are testing slow submission
/// is it better to set this quite high.
pub submissions_per_sec: u64,

/// We nominate up to this many values from our pending set per slot.
Expand Down Expand Up @@ -72,7 +73,7 @@ impl TestOptions {
Self {
submit_in_parallel: true,
values_to_submit: 5000,
submissions_per_sec: 10000,
submissions_per_sec: 20000,
max_pending_values_to_nominate: 100,
allowed_test_time: Duration::from_secs(300),
log_flush_delay: Duration::from_millis(50),
Expand All @@ -86,17 +87,22 @@ impl TestOptions {
// Describes one simulated node
#[derive(Clone)]
pub struct NodeOptions {
peers: Vec<u32>,
validators: Vec<u32>,
k: u32,
/// This node's id
id: NodeID,

/// The nodes to which this node broadcasts
peers: HashSet<NodeID>,

/// This node's quorum set
quorum_set: QuorumSet,
}

impl NodeOptions {
pub fn new(peers: Vec<u32>, validators: Vec<u32>, k: u32) -> Self {
pub fn new(id: NodeID, peers: HashSet<NodeID>, quorum_set: QuorumSet) -> Self {
Self {
id,
peers,
validators,
k,
quorum_set,
}
}
}
Expand Down Expand Up @@ -131,68 +137,63 @@ impl SimulatedNetwork {
logger: logger.clone(),
};

for (node_index, options_for_this_node) in network.nodes.iter().enumerate() {
let validators = options_for_this_node
.validators
.iter()
.map(|id| test_utils::test_node_id(*id as u32))
.collect::<Vec<NodeID>>();

let qs = QuorumSet::new_with_node_ids(options_for_this_node.k, validators);

let peers = options_for_this_node
.peers
.iter()
.map(|id| test_utils::test_node_id(*id as u32))
.collect::<HashSet<NodeID>>();

let node_id = test_utils::test_node_id(node_index as u32);

assert!(!peers.contains(&node_id));
for node_options in network.nodes.iter() {
assert!(!node_options.peers.contains(&node_options.id));

let nodes_map_clone = Arc::clone(&simulation.nodes_map);
let peers_clone = node_options.peers.clone();

let (node, join_handle_option) = SimulatedNode::new(
format!("{}-{}", network.name, node_index),
node_id.clone(),
qs,
format!("{}-{}", network.name, node_options.id.clone()),
node_options.id.clone(),
node_options.quorum_set.clone(),
test_options,
Arc::new(move |logger, msg| {
SimulatedNetwork::broadcast_msg(logger, &nodes_map_clone, &peers, msg)
Arc::new( move |logger, msg| {
SimulatedNetwork::broadcast_msg(
logger,
&nodes_map_clone,
&peers_clone,
msg
)
}),
logger.new(o!("mc.local_node_id" => node_id.to_string())),
logger.new(o!("mc.local_node_id" => node_options.id.to_string())),
);
simulation.handle_map.insert(
node_id.clone(),
node_options.id.clone(),
join_handle_option.expect("thread failed to spawn"),
);
simulation
.shared_data_map
.insert(node_id.clone(), node.shared_data.clone());
.insert(node_options.id.clone(), node.shared_data.clone());
simulation
.nodes_map
.lock()
.expect("lock failed on nodes_map inserting node")
.insert(node_id.clone(), node);
.insert(node_options.id.clone(), node);
}

simulation
}

fn stop_all(&mut self) {

let mut nodes_map = self
.nodes_map
.lock()
.expect("lock failed on nodes_map in stop_all");

for (_node_id, node) in nodes_map.iter_mut() {
let mut node_ids: Vec<NodeID> = Vec::new();
for (node_id, node) in nodes_map.iter_mut() {
log::trace!(self.logger, "sending stop to {}", node_id);
node.send_stop();
node_ids.push(node_id.clone());
}
drop(nodes_map);

for node_id in node_ids {
log::trace!(self.logger, "joining {}", node_id);

// now join the threads
for node_id in nodes_map.keys() {
self.handle_map
.remove(node_id)
.remove(&node_id)
.expect("thread handle is missing")
.join()
.expect("SimulatedNode join failed");
Expand Down Expand Up @@ -540,15 +541,12 @@ pub fn build_and_test(network: &Network, test_options: &TestOptions, logger: Log
test_options.values_to_submit
);

// pre-compute node_ids
let mut node_ids = Vec::<NodeID>::with_capacity(network.nodes.len());
for n in 0..network.nodes.len() {
node_ids.push(test_utils::test_node_id(n as u32));
}
// get a vector of the node_ids
let node_ids: Vec<NodeID> = network.nodes.iter().map(|n| n.id.clone()).collect();

// check that all ledgers start empty
for node_id in node_ids.iter() {
assert!(simulation.get_ledger_size(&node_id) == 0);
for n in 0..network.nodes.len() {
assert!(simulation.get_ledger_size(&node_ids[n]) == 0);
}

// push values
Expand Down
Loading

0 comments on commit aabaee2

Please sign in to comment.