Skip to content

Commit

Permalink
Fix round manager tests (#15369)
Browse files Browse the repository at this point in the history
  • Loading branch information
vusirikala authored and danielxiangzl committed Dec 12, 2024
1 parent 458fd3d commit 612eac4
Showing 1 changed file with 117 additions and 96 deletions.
213 changes: 117 additions & 96 deletions consensus/src/round_manager_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use aptos_consensus_types::{
},
block_retrieval::{BlockRetrievalRequest, BlockRetrievalStatus},
common::{Author, Payload, Round},
order_vote_msg::OrderVoteMsg,
pipeline::commit_decision::CommitDecision,
proposal_msg::ProposalMsg,
round_timeout::RoundTimeoutMsg,
Expand Down Expand Up @@ -86,6 +87,7 @@ use futures::{
};
use maplit::hashmap;
use std::{
collections::VecDeque,
iter::FromIterator,
sync::{
atomic::{AtomicBool, Ordering},
Expand Down Expand Up @@ -117,6 +119,11 @@ pub struct NodeSetup {
local_consensus_config: ConsensusConfig,
onchain_randomness_config: OnChainRandomnessConfig,
onchain_jwk_consensus_config: OnChainJWKConsensusConfig,
vote_queue: VecDeque<VoteMsg>,
order_vote_queue: VecDeque<OrderVoteMsg>,
proposal_queue: VecDeque<ProposalMsg>,
round_timeout_queue: VecDeque<RoundTimeoutMsg>,
commit_decision_queue: VecDeque<CommitDecision>,
}

impl NodeSetup {
Expand Down Expand Up @@ -316,8 +323,7 @@ impl NodeSetup {

let (round_manager_tx, _) = aptos_channel::new(QueueStyle::LIFO, 1, None);

let mut local_config = local_consensus_config.clone();
local_config.enable_broadcast_vote(false);
let local_config = local_consensus_config.clone();

let mut round_manager = RoundManager::new(
epoch_state,
Expand Down Expand Up @@ -354,6 +360,11 @@ impl NodeSetup {
local_consensus_config,
onchain_randomness_config,
onchain_jwk_consensus_config,
vote_queue: VecDeque::new(),
order_vote_queue: VecDeque::new(),
proposal_queue: VecDeque::new(),
round_timeout_queue: VecDeque::new(),
commit_decision_queue: VecDeque::new(),
}
}

Expand Down Expand Up @@ -401,8 +412,8 @@ impl NodeSetup {
}
}

pub async fn next_network_message(&mut self) -> ConsensusMsg {
match self.next_network_event().await {
pub async fn next_network_message(&mut self) {
let consensus_msg = match self.next_network_event().await {
Event::Message(_, msg) => msg,
Event::RpcRequest(_, msg, _, _) if matches!(msg, ConsensusMsg::CommitMessage(_)) => msg,
Event::RpcRequest(_, msg, _, _) => {
Expand All @@ -412,6 +423,37 @@ impl NodeSetup {
self.identity_desc()
)
},
};

match consensus_msg {
ConsensusMsg::ProposalMsg(proposal) => {
self.proposal_queue.push_back(*proposal);
},
ConsensusMsg::VoteMsg(vote) => {
self.vote_queue.push_back(*vote);
},
ConsensusMsg::OrderVoteMsg(order_vote) => {
self.order_vote_queue.push_back(*order_vote);
},
ConsensusMsg::RoundTimeoutMsg(round_timeout) => {
self.round_timeout_queue.push_back(*round_timeout);
},
ConsensusMsg::CommitDecisionMsg(commit_decision) => {
self.commit_decision_queue.push_back(*commit_decision);
},
ConsensusMsg::CommitMessage(d) if matches!(*d, CommitMessage::Decision(_)) => {
match *d {
CommitMessage::Decision(commit_decision) => {
self.commit_decision_queue.push_back(commit_decision);
},
_ => unreachable!(),
}
},
msg => panic!(
"Unexpected Consensus Message: {:?} on node {}",
msg,
self.identity_desc()
),
}
}

Expand All @@ -427,53 +469,38 @@ impl NodeSetup {
}

pub async fn next_proposal(&mut self) -> ProposalMsg {
match self.next_network_message().await {
ConsensusMsg::ProposalMsg(p) => *p,
msg => panic!(
"Unexpected Consensus Message: {:?} on node {}",
msg,
self.identity_desc()
),
while self.proposal_queue.is_empty() {
self.next_network_message().await;
}
self.proposal_queue.pop_front().unwrap()
}

pub async fn next_vote(&mut self) -> VoteMsg {
match self.next_network_message().await {
ConsensusMsg::VoteMsg(v) => *v,
msg => panic!(
"Unexpected Consensus Message: {:?} on node {}",
msg,
self.identity_desc()
),
while self.vote_queue.is_empty() {
self.next_network_message().await;
}
self.vote_queue.pop_front().unwrap()
}

pub async fn next_order_vote(&mut self) -> OrderVoteMsg {
while self.order_vote_queue.is_empty() {
self.next_network_message().await;
}
self.order_vote_queue.pop_front().unwrap()
}

pub async fn next_timeout(&mut self) -> RoundTimeoutMsg {
match self.next_network_message().await {
ConsensusMsg::RoundTimeoutMsg(v) => *v,
msg => panic!(
"Unexpected Consensus Message: {:?} on node {}",
msg,
self.identity_desc()
),
while self.round_timeout_queue.is_empty() {
self.next_network_message().await;
}
self.round_timeout_queue.pop_front().unwrap()
}

pub async fn next_commit_decision(&mut self) -> CommitDecision {
match self.next_network_message().await {
ConsensusMsg::CommitDecisionMsg(v) => *v,
ConsensusMsg::CommitMessage(d) if matches!(*d, CommitMessage::Decision(_)) => {
match *d {
CommitMessage::Decision(d) => d,
_ => unreachable!(),
}
},
msg => panic!(
"Unexpected Consensus Message: {:?} on node {}",
msg,
self.identity_desc()
),
while self.commit_decision_queue.is_empty() {
self.next_network_message().await;
}
self.commit_decision_queue.pop_front().unwrap()
}

pub async fn poll_block_retreival(&mut self) -> Option<IncomingBlockRetrievalRequest> {
Expand Down Expand Up @@ -628,53 +655,47 @@ fn process_and_vote_on_proposal(
.unwrap();
info!("Finish process proposal on {}", node.identity_desc());
num_votes += 1;
}
}

if let Some(prev_proposer) = apply_commit_prev_proposer {
if prev_proposer != node.id && expected_round > 2 {
for node in nodes.iter_mut() {
info!(
"Fetching {} votes in round {} on node {}",
num_votes,
expected_round,
node.identity_desc()
);
if down_nodes.contains(&node.id) {
// Drop the votes on down nodes
info!("Dropping votes on down node {}", node.identity_desc());
for _ in 0..num_votes {
timed_block_on(runtime, node.next_vote());
}
} else {
let mut votes = Vec::new();
for _ in 0..num_votes {
votes.push(timed_block_on(runtime, node.next_vote()));
}

info!("Processing votes on node {}", node.identity_desc());
if process_votes {
for vote_msg in votes {
timed_block_on(runtime, node.round_manager.process_vote_msg(vote_msg)).unwrap();
}
if apply_commit_prev_proposer.is_some()
&& expected_round > 1
&& apply_commit_on_votes
{
info!(
"Applying commit {} on node {}",
"Applying next commit {} on proposer node {}",
expected_round - 2,
node.identity_desc()
);
timed_block_on(runtime, node.commit_next_ordered(&[expected_round - 2]));
timed_block_on(runtime, node.commit_next_ordered(&[expected_round - 1]));
}
}
}
}

let proposer_node = nodes.get_mut(next_proposer).unwrap();
info!(
"Fetching {} votes in round {} on node {}",
num_votes,
expected_round,
proposer_node.identity_desc()
);
let mut votes = Vec::new();
for _ in 0..num_votes {
votes.push(timed_block_on(runtime, proposer_node.next_vote()));
}

info!("Processing votes on node {}", proposer_node.identity_desc());
if process_votes {
for vote_msg in votes {
timed_block_on(
runtime,
proposer_node.round_manager.process_vote_msg(vote_msg),
)
.unwrap();
}
if apply_commit_prev_proposer.is_some() && expected_round > 1 && apply_commit_on_votes {
info!(
"Applying next commit {} on proposer node {}",
expected_round - 2,
proposer_node.identity_desc()
);
timed_block_on(
runtime,
proposer_node.commit_next_ordered(&[expected_round - 1]),
);
}
}
}

#[test]
Expand Down Expand Up @@ -2170,7 +2191,7 @@ pub fn forking_retrieval_test() {
);

timed_block_on(&runtime, async {
println!("Insert local timeout to all nodes on next round");
info!("Insert local timeout to all nodes on next round");
let mut timeout_votes = 0;
for node in nodes.iter_mut() {
if node.id != behind_node && node.id != forking_node {
Expand All @@ -2182,13 +2203,13 @@ pub fn forking_retrieval_test() {
}
}

println!("Process all local timeouts");
info!("Process all local timeouts");
for node in nodes.iter_mut() {
println!("Timeouts on {}", node.id);
info!("Timeouts on {}", node.id);
for i in 0..timeout_votes {
println!("Timeout {} on {}", i, node.id);
info!("Timeout {} on {}", i, node.id);
if node.id == forking_node && (2..4).contains(&i) {
println!("Got {}", node.next_commit_decision().await);
info!("Got {}", node.next_commit_decision().await);
}

let vote_msg_on_timeout = node.next_vote().await;
Expand All @@ -2215,7 +2236,7 @@ pub fn forking_retrieval_test() {
assert!(vote_msg_on_timeout.vote().is_timeout());
}

println!("Got {}", nodes[forking_node].next_commit_decision().await);
info!("Got {}", nodes[forking_node].next_commit_decision().await);
});

info!("Create forked block");
Expand Down Expand Up @@ -2267,7 +2288,7 @@ pub fn forking_retrieval_test() {
proposals.push(node.next_proposal().await);
}

println!(
info!(
"Processing proposals for behind node {}",
behind_node_obj.identity_desc()
);
Expand Down Expand Up @@ -2311,19 +2332,19 @@ pub fn forking_retrieval_test() {
3,
);

let next_message = timed_block_on(&runtime, nodes[proposal_node].next_network_message());
match next_message {
ConsensusMsg::VoteMsg(_) => info!("Skip extra vote msg"),
ConsensusMsg::ProposalMsg(msg) => {
// put the message back in the queue.
// actual peer doesn't matter, it is ignored, so use self.
let peer = nodes[proposal_node].signer.author();
nodes[proposal_node]
.pending_network_events
.push(Event::Message(peer, ConsensusMsg::ProposalMsg(msg)))
},
_ => panic!("unexpected network message {:?}", next_message),
}
// let next_message = timed_block_on(&runtime, nodes[proposal_node].next_network_message());
// match next_message {
// ConsensusMsg::VoteMsg(_) => info!("Skip extra vote msg"),
// ConsensusMsg::ProposalMsg(msg) => {
// // put the message back in the queue.
// // actual peer doesn't matter, it is ignored, so use self.
// let peer = nodes[proposal_node].signer.author();
// nodes[proposal_node]
// .pending_network_events
// .push(Event::Message(peer, ConsensusMsg::ProposalMsg(msg)))
// },
// _ => panic!("unexpected network message {:?}", next_message),
// }
process_and_vote_on_proposal(
&runtime,
&mut nodes,
Expand Down

0 comments on commit 612eac4

Please sign in to comment.