Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix round manager tests #15369

Merged
merged 5 commits into from
Dec 10, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
213 changes: 117 additions & 96 deletions consensus/src/round_manager_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use aptos_consensus_types::{
},
block_retrieval::{BlockRetrievalRequest, BlockRetrievalStatus},
common::{Author, Payload, Round},
order_vote_msg::OrderVoteMsg,
pipeline::commit_decision::CommitDecision,
proposal_msg::ProposalMsg,
round_timeout::RoundTimeoutMsg,
Expand Down Expand Up @@ -86,6 +87,7 @@ use futures::{
};
use maplit::hashmap;
use std::{
collections::VecDeque,
iter::FromIterator,
sync::{
atomic::{AtomicBool, Ordering},
Expand Down Expand Up @@ -117,6 +119,11 @@ pub struct NodeSetup {
local_consensus_config: ConsensusConfig,
onchain_randomness_config: OnChainRandomnessConfig,
onchain_jwk_consensus_config: OnChainJWKConsensusConfig,
vote_queue: VecDeque<VoteMsg>,
order_vote_queue: VecDeque<OrderVoteMsg>,
proposal_queue: VecDeque<ProposalMsg>,
round_timeout_queue: VecDeque<RoundTimeoutMsg>,
commit_decision_queue: VecDeque<CommitDecision>,
}

impl NodeSetup {
Expand Down Expand Up @@ -316,8 +323,7 @@ impl NodeSetup {

let (round_manager_tx, _) = aptos_channel::new(QueueStyle::LIFO, 1, None);

let mut local_config = local_consensus_config.clone();
local_config.enable_broadcast_vote(false);
let local_config = local_consensus_config.clone();

let mut round_manager = RoundManager::new(
epoch_state,
Expand Down Expand Up @@ -354,6 +360,11 @@ impl NodeSetup {
local_consensus_config,
onchain_randomness_config,
onchain_jwk_consensus_config,
vote_queue: VecDeque::new(),
order_vote_queue: VecDeque::new(),
proposal_queue: VecDeque::new(),
round_timeout_queue: VecDeque::new(),
commit_decision_queue: VecDeque::new(),
}
}

Expand Down Expand Up @@ -401,8 +412,8 @@ impl NodeSetup {
}
}

pub async fn next_network_message(&mut self) -> ConsensusMsg {
match self.next_network_event().await {
pub async fn next_network_message(&mut self) {
let consensus_msg = match self.next_network_event().await {
Event::Message(_, msg) => msg,
Event::RpcRequest(_, msg, _, _) if matches!(msg, ConsensusMsg::CommitMessage(_)) => msg,
Event::RpcRequest(_, msg, _, _) => {
Expand All @@ -412,6 +423,37 @@ impl NodeSetup {
self.identity_desc()
)
},
};

match consensus_msg {
ConsensusMsg::ProposalMsg(proposal) => {
self.proposal_queue.push_back(*proposal);
},
ConsensusMsg::VoteMsg(vote) => {
self.vote_queue.push_back(*vote);
},
ConsensusMsg::OrderVoteMsg(order_vote) => {
self.order_vote_queue.push_back(*order_vote);
},
ConsensusMsg::RoundTimeoutMsg(round_timeout) => {
self.round_timeout_queue.push_back(*round_timeout);
},
ConsensusMsg::CommitDecisionMsg(commit_decision) => {
self.commit_decision_queue.push_back(*commit_decision);
},
ConsensusMsg::CommitMessage(d) if matches!(*d, CommitMessage::Decision(_)) => {
match *d {
CommitMessage::Decision(commit_decision) => {
self.commit_decision_queue.push_back(commit_decision);
},
_ => unreachable!(),
}
},
msg => panic!(
"Unexpected Consensus Message: {:?} on node {}",
msg,
self.identity_desc()
),
}
}

Expand All @@ -427,53 +469,38 @@ impl NodeSetup {
}

pub async fn next_proposal(&mut self) -> ProposalMsg {
match self.next_network_message().await {
ConsensusMsg::ProposalMsg(p) => *p,
msg => panic!(
"Unexpected Consensus Message: {:?} on node {}",
msg,
self.identity_desc()
),
while self.proposal_queue.is_empty() {
self.next_network_message().await;
}
self.proposal_queue.pop_front().unwrap()
}

pub async fn next_vote(&mut self) -> VoteMsg {
match self.next_network_message().await {
ConsensusMsg::VoteMsg(v) => *v,
msg => panic!(
"Unexpected Consensus Message: {:?} on node {}",
msg,
self.identity_desc()
),
while self.vote_queue.is_empty() {
self.next_network_message().await;
}
self.vote_queue.pop_front().unwrap()
}

pub async fn next_order_vote(&mut self) -> OrderVoteMsg {
while self.order_vote_queue.is_empty() {
self.next_network_message().await;
}
self.order_vote_queue.pop_front().unwrap()
}

pub async fn next_timeout(&mut self) -> RoundTimeoutMsg {
match self.next_network_message().await {
ConsensusMsg::RoundTimeoutMsg(v) => *v,
msg => panic!(
"Unexpected Consensus Message: {:?} on node {}",
msg,
self.identity_desc()
),
while self.round_timeout_queue.is_empty() {
self.next_network_message().await;
}
self.round_timeout_queue.pop_front().unwrap()
}

pub async fn next_commit_decision(&mut self) -> CommitDecision {
match self.next_network_message().await {
ConsensusMsg::CommitDecisionMsg(v) => *v,
ConsensusMsg::CommitMessage(d) if matches!(*d, CommitMessage::Decision(_)) => {
match *d {
CommitMessage::Decision(d) => d,
_ => unreachable!(),
}
},
msg => panic!(
"Unexpected Consensus Message: {:?} on node {}",
msg,
self.identity_desc()
),
while self.commit_decision_queue.is_empty() {
self.next_network_message().await;
}
self.commit_decision_queue.pop_front().unwrap()
}

pub async fn poll_block_retreival(&mut self) -> Option<IncomingBlockRetrievalRequest> {
Expand Down Expand Up @@ -628,53 +655,47 @@ fn process_and_vote_on_proposal(
.unwrap();
info!("Finish process proposal on {}", node.identity_desc());
num_votes += 1;
}
}

if let Some(prev_proposer) = apply_commit_prev_proposer {
if prev_proposer != node.id && expected_round > 2 {
for node in nodes.iter_mut() {
info!(
"Fetching {} votes in round {} on node {}",
num_votes,
expected_round,
node.identity_desc()
);
if down_nodes.contains(&node.id) {
// Drop the votes on down nodes
info!("Dropping votes on down node {}", node.identity_desc());
for _ in 0..num_votes {
timed_block_on(runtime, node.next_vote());
}
} else {
let mut votes = Vec::new();
for _ in 0..num_votes {
votes.push(timed_block_on(runtime, node.next_vote()));
}

info!("Processing votes on node {}", node.identity_desc());
if process_votes {
for vote_msg in votes {
timed_block_on(runtime, node.round_manager.process_vote_msg(vote_msg)).unwrap();
}
if apply_commit_prev_proposer.is_some()
&& expected_round > 1
&& apply_commit_on_votes
{
info!(
"Applying commit {} on node {}",
"Applying next commit {} on proposer node {}",
expected_round - 2,
node.identity_desc()
);
timed_block_on(runtime, node.commit_next_ordered(&[expected_round - 2]));
timed_block_on(runtime, node.commit_next_ordered(&[expected_round - 1]));
}
}
}
}

let proposer_node = nodes.get_mut(next_proposer).unwrap();
info!(
"Fetching {} votes in round {} on node {}",
num_votes,
expected_round,
proposer_node.identity_desc()
);
let mut votes = Vec::new();
for _ in 0..num_votes {
votes.push(timed_block_on(runtime, proposer_node.next_vote()));
}

info!("Processing votes on node {}", proposer_node.identity_desc());
if process_votes {
for vote_msg in votes {
timed_block_on(
runtime,
proposer_node.round_manager.process_vote_msg(vote_msg),
)
.unwrap();
}
if apply_commit_prev_proposer.is_some() && expected_round > 1 && apply_commit_on_votes {
info!(
"Applying next commit {} on proposer node {}",
expected_round - 2,
proposer_node.identity_desc()
);
timed_block_on(
runtime,
proposer_node.commit_next_ordered(&[expected_round - 1]),
);
}
}
}

#[test]
Expand Down Expand Up @@ -2170,7 +2191,7 @@ pub fn forking_retrieval_test() {
);

timed_block_on(&runtime, async {
println!("Insert local timeout to all nodes on next round");
info!("Insert local timeout to all nodes on next round");
let mut timeout_votes = 0;
for node in nodes.iter_mut() {
if node.id != behind_node && node.id != forking_node {
Expand All @@ -2182,13 +2203,13 @@ pub fn forking_retrieval_test() {
}
}

println!("Process all local timeouts");
info!("Process all local timeouts");
for node in nodes.iter_mut() {
println!("Timeouts on {}", node.id);
info!("Timeouts on {}", node.id);
for i in 0..timeout_votes {
println!("Timeout {} on {}", i, node.id);
info!("Timeout {} on {}", i, node.id);
if node.id == forking_node && (2..4).contains(&i) {
println!("Got {}", node.next_commit_decision().await);
info!("Got {}", node.next_commit_decision().await);
}

let vote_msg_on_timeout = node.next_vote().await;
Expand All @@ -2215,7 +2236,7 @@ pub fn forking_retrieval_test() {
assert!(vote_msg_on_timeout.vote().is_timeout());
}

println!("Got {}", nodes[forking_node].next_commit_decision().await);
info!("Got {}", nodes[forking_node].next_commit_decision().await);
});

info!("Create forked block");
Expand Down Expand Up @@ -2267,7 +2288,7 @@ pub fn forking_retrieval_test() {
proposals.push(node.next_proposal().await);
}

println!(
info!(
"Processing proposals for behind node {}",
behind_node_obj.identity_desc()
);
Expand Down Expand Up @@ -2311,19 +2332,19 @@ pub fn forking_retrieval_test() {
3,
);

let next_message = timed_block_on(&runtime, nodes[proposal_node].next_network_message());
match next_message {
ConsensusMsg::VoteMsg(_) => info!("Skip extra vote msg"),
ConsensusMsg::ProposalMsg(msg) => {
// put the message back in the queue.
// actual peer doesn't matter, it is ignored, so use self.
let peer = nodes[proposal_node].signer.author();
nodes[proposal_node]
.pending_network_events
.push(Event::Message(peer, ConsensusMsg::ProposalMsg(msg)))
},
_ => panic!("unexpected network message {:?}", next_message),
}
// let next_message = timed_block_on(&runtime, nodes[proposal_node].next_network_message());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this commented out? If this test is incomplete, can we add a unimplemented! here instead so it can break when someone tries to run it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. Actually, the test is flaky and there is already #[ignore] tag added on top of the test. The test doesn't affect forge.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant if someone tries to remove the #[ignore] runs and thinks that the test passes because parts of code is commented out.

// match next_message {
// ConsensusMsg::VoteMsg(_) => info!("Skip extra vote msg"),
// ConsensusMsg::ProposalMsg(msg) => {
// // put the message back in the queue.
// // actual peer doesn't matter, it is ignored, so use self.
// let peer = nodes[proposal_node].signer.author();
// nodes[proposal_node]
// .pending_network_events
// .push(Event::Message(peer, ConsensusMsg::ProposalMsg(msg)))
// },
// _ => panic!("unexpected network message {:?}", next_message),
// }
process_and_vote_on_proposal(
&runtime,
&mut nodes,
Expand Down
Loading