diff --git a/consensus/core/src/authority_node.rs b/consensus/core/src/authority_node.rs index 22920ace89980..bf59253dbb8f1 100644 --- a/consensus/core/src/authority_node.rs +++ b/consensus/core/src/authority_node.rs @@ -640,7 +640,7 @@ mod tests { ); } } - assert_eq!(committed_subdag.reputation_scores, vec![]); + assert_eq!(committed_subdag.reputation_scores_desc, vec![]); if expected_transactions.is_empty() { break; } diff --git a/consensus/core/src/commit.rs b/consensus/core/src/commit.rs index ba844e5a0d322..676841fd2438c 100644 --- a/consensus/core/src/commit.rs +++ b/consensus/core/src/commit.rs @@ -298,12 +298,12 @@ pub struct CommittedSubDag { pub commit_index: CommitIndex, /// Optional scores that are provided as part of the consensus output to Sui /// that can then be used by Sui for future submission to consensus. - pub reputation_scores: Vec<(AuthorityIndex, u64)>, + pub reputation_scores_desc: Vec<(AuthorityIndex, u64)>, } impl CommittedSubDag { /// Create new (empty) sub-dag. - pub fn new( + pub(crate) fn new( leader: BlockRef, blocks: Vec, timestamp_ms: BlockTimestampMs, @@ -314,17 +314,17 @@ impl CommittedSubDag { blocks, timestamp_ms, commit_index, - reputation_scores: vec![], + reputation_scores_desc: vec![], } } - pub fn update_scores(&mut self, scores: Vec<(AuthorityIndex, u64)>) { - self.reputation_scores = scores; + pub(crate) fn update_scores(&mut self, reputation_scores_desc: Vec<(AuthorityIndex, u64)>) { + self.reputation_scores_desc = reputation_scores_desc; } /// Sort the blocks of the sub-dag by round number then authority index. Any /// deterministic & stable algorithm works. - pub fn sort(&mut self) { + pub(crate) fn sort(&mut self) { self.blocks.sort_by(|a, b| { a.round() .cmp(&b.round()) @@ -359,7 +359,7 @@ impl fmt::Debug for CommittedSubDag { write!( f, "];{}ms;rs{:?})", - self.timestamp_ms, self.reputation_scores + self.timestamp_ms, self.reputation_scores_desc ) } } @@ -427,9 +427,7 @@ pub(crate) enum Decision { Indirect, } -/// The status of every leader output by the committers. While the core only cares -/// about committed leaders, providing a richer status allows for easier debugging, -/// testing, and composition with advanced commit strategies. +/// The status of a leader slot from the direct and indirect commit rules. #[derive(Debug, Clone, PartialEq)] pub(crate) enum LeaderStatus { Commit(VerifiedBlock), @@ -446,14 +444,6 @@ impl LeaderStatus { } } - pub(crate) fn authority(&self) -> AuthorityIndex { - match self { - Self::Commit(block) => block.author(), - Self::Skip(leader) => leader.authority, - Self::Undecided(leader) => leader.authority, - } - } - pub(crate) fn is_decided(&self) -> bool { match self { Self::Commit(_) => true, @@ -462,31 +452,71 @@ impl LeaderStatus { } } - // Only should be called when the leader status is decided (Commit/Skip) - pub fn get_decided_slot(&self) -> Slot { + pub(crate) fn into_decided_leader(self) -> Option { + match self { + Self::Commit(block) => Some(DecidedLeader::Commit(block)), + Self::Skip(slot) => Some(DecidedLeader::Skip(slot)), + Self::Undecided(..) => None, + } + } +} + +impl Display for LeaderStatus { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Commit(block) => write!(f, "Commit({})", block.reference()), + Self::Skip(slot) => write!(f, "Skip({slot})"), + Self::Undecided(slot) => write!(f, "Undecided({slot})"), + } + } +} + +/// Decision of each leader slot. +#[derive(Debug, Clone, PartialEq)] +pub(crate) enum DecidedLeader { + Commit(VerifiedBlock), + Skip(Slot), +} + +impl DecidedLeader { + // Slot where the leader is decided. + pub(crate) fn slot(&self) -> Slot { match self { Self::Commit(block) => block.reference().into(), - Self::Skip(leader) => *leader, - Self::Undecided(..) => panic!("Decided block is either Commit or Skip"), + Self::Skip(slot) => *slot, } } - // Only should be called when the leader status is decided (Commit/Skip) - pub fn into_committed_block(self) -> Option { + // Converts to committed block if the decision is to commit. Returns None otherwise. + pub(crate) fn into_committed_block(self) -> Option { match self { Self::Commit(block) => Some(block), - Self::Skip(_leader) => None, - Self::Undecided(..) => panic!("Decided block is either Commit or Skip"), + Self::Skip(_) => None, + } + } + + #[cfg(test)] + pub(crate) fn round(&self) -> Round { + match self { + Self::Commit(block) => block.round(), + Self::Skip(leader) => leader.round, + } + } + + #[cfg(test)] + pub(crate) fn authority(&self) -> AuthorityIndex { + match self { + Self::Commit(block) => block.author(), + Self::Skip(leader) => leader.authority, } } } -impl Display for LeaderStatus { +impl Display for DecidedLeader { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { Self::Commit(block) => write!(f, "Commit({})", block.reference()), - Self::Skip(leader) => write!(f, "Skip({leader})"), - Self::Undecided(leader) => write!(f, "Undecided({leader})"), + Self::Skip(slot) => write!(f, "Skip({slot})"), } } } diff --git a/consensus/core/src/commit_observer.rs b/consensus/core/src/commit_observer.rs index ea7e630c2cb57..9e093bea968ba 100644 --- a/consensus/core/src/commit_observer.rs +++ b/consensus/core/src/commit_observer.rs @@ -74,17 +74,17 @@ impl CommitObserver { let committed_sub_dags = self.commit_interpreter.handle_commit(committed_leaders); let mut sent_sub_dags = vec![]; - let reputation_scores = self + let reputation_scores_desc = self .leader_schedule .leader_swap_table .read() - .reputation_scores - .authorities_by_score_desc(self.context.clone()); + .reputation_scores_desc + .clone(); for mut committed_sub_dag in committed_sub_dags.into_iter() { // TODO: Only update scores after a leader schedule change // On handle commit the current scores that were used to elect the // leader of the subdag will be added to the subdag and sent to sui. - committed_sub_dag.update_scores(reputation_scores.clone()); + committed_sub_dag.update_scores(reputation_scores_desc.clone()); // Failures in sender.send() are assumed to be permanent if let Err(err) = self.sender.send(committed_sub_dag.clone()) { tracing::error!( @@ -145,8 +145,8 @@ impl CommitObserver { self.leader_schedule .leader_swap_table .read() - .reputation_scores - .authorities_by_score_desc(self.context.clone()), + .reputation_scores_desc + .clone(), ); } self.sender.send(committed_sub_dag).unwrap_or_else(|e| { @@ -290,7 +290,7 @@ mod tests { let mut processed_subdag_index = 0; while let Ok(subdag) = receiver.try_recv() { assert_eq!(subdag, commits[processed_subdag_index]); - assert_eq!(subdag.reputation_scores, vec![]); + assert_eq!(subdag.reputation_scores_desc, vec![]); processed_subdag_index = subdag.commit_index as usize; if processed_subdag_index == leaders.len() { break; @@ -376,7 +376,7 @@ mod tests { while let Ok(subdag) = receiver.try_recv() { tracing::info!("Processed {subdag}"); assert_eq!(subdag, commits[processed_subdag_index]); - assert_eq!(subdag.reputation_scores, vec![]); + assert_eq!(subdag.reputation_scores_desc, vec![]); processed_subdag_index = subdag.commit_index as usize; if processed_subdag_index == expected_last_processed_index { break; @@ -412,7 +412,7 @@ mod tests { while let Ok(subdag) = receiver.try_recv() { tracing::info!("{subdag} was sent but not processed by consumer"); assert_eq!(subdag, commits[processed_subdag_index]); - assert_eq!(subdag.reputation_scores, vec![]); + assert_eq!(subdag.reputation_scores_desc, vec![]); processed_subdag_index = subdag.commit_index as usize; if processed_subdag_index == expected_last_sent_index { break; @@ -448,7 +448,7 @@ mod tests { while let Ok(subdag) = receiver.try_recv() { tracing::info!("Processed {subdag} on resubmission"); assert_eq!(subdag, commits[processed_subdag_index]); - assert_eq!(subdag.reputation_scores, vec![]); + assert_eq!(subdag.reputation_scores_desc, vec![]); processed_subdag_index = subdag.commit_index as usize; if processed_subdag_index == expected_last_sent_index { break; @@ -516,7 +516,7 @@ mod tests { while let Ok(subdag) = receiver.try_recv() { tracing::info!("Processed {subdag}"); assert_eq!(subdag, commits[processed_subdag_index]); - assert_eq!(subdag.reputation_scores, vec![]); + assert_eq!(subdag.reputation_scores_desc, vec![]); processed_subdag_index = subdag.commit_index as usize; if processed_subdag_index == expected_last_processed_index { break; diff --git a/consensus/core/src/core.rs b/consensus/core/src/core.rs index 8aac633430978..1219e43fa6851 100644 --- a/consensus/core/src/core.rs +++ b/consensus/core/src/core.rs @@ -449,9 +449,9 @@ impl Core { .protocol_config .mysticeti_leader_scoring_and_schedule() { - let sequenced_leaders = self.committer.try_commit(self.last_decided_leader); - if let Some(last) = sequenced_leaders.last() { - self.last_decided_leader = last.get_decided_slot(); + let decided_leaders = self.committer.try_decide(self.last_decided_leader); + if let Some(last) = decided_leaders.last() { + self.last_decided_leader = last.slot(); self.context .metrics .node_metrics @@ -459,7 +459,7 @@ impl Core { .set(self.last_decided_leader.round as i64); } - let committed_leaders = sequenced_leaders + let committed_leaders = decided_leaders .into_iter() .filter_map(|leader| leader.into_committed_block()) .collect::>(); @@ -499,7 +499,7 @@ impl Core { // TODO: limit commits by commits_until_update, which may be needed when leader schedule length // is reduced. - let decided_leaders = self.committer.try_commit(self.last_decided_leader); + let decided_leaders = self.committer.try_decide(self.last_decided_leader); let Some(last_decided) = decided_leaders.last().cloned() else { break; @@ -518,7 +518,7 @@ impl Core { self.last_decided_leader = sequenced_leaders.last().unwrap().slot(); sequenced_leaders } else { - self.last_decided_leader = last_decided.get_decided_slot(); + self.last_decided_leader = last_decided.slot(); sequenced_leaders }; @@ -1344,7 +1344,7 @@ mod test { 1 ); let expected_reputation_scores = - ReputationScores::new((11..21).into(), vec![8, 8, 9, 8]); + ReputationScores::new((11..21).into(), vec![9, 8, 8, 8]); assert_eq!( core.leader_schedule .leader_swap_table diff --git a/consensus/core/src/dag_state.rs b/consensus/core/src/dag_state.rs index 6bd0a05c88459..ab35495cf2fb4 100644 --- a/consensus/core/src/dag_state.rs +++ b/consensus/core/src/dag_state.rs @@ -789,17 +789,10 @@ impl DagState { panic!("Fatal error, no quorum has been detected in our DAG on the last two rounds."); } - pub(crate) fn last_reputation_scores_from_store(&self) -> Option { - let commit_info = self - .store + pub(crate) fn recover_last_commit_info(&self) -> Option<(CommitRef, CommitInfo)> { + self.store .read_last_commit_info() - .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e)); - if let Some((commit_ref, commit_info)) = commit_info { - assert!(commit_ref.index <= self.last_commit.as_ref().unwrap().index()); - Some(commit_info.reputation_scores) - } else { - None - } + .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e)) } pub(crate) fn unscored_committed_subdags_count(&self) -> u64 { diff --git a/consensus/core/src/leader_schedule.rs b/consensus/core/src/leader_schedule.rs index 53a857d3b8811..83a78db2e37b0 100644 --- a/consensus/core/src/leader_schedule.rs +++ b/consensus/core/src/leader_schedule.rs @@ -19,7 +19,7 @@ use crate::{ CertificateScoringStrategy, CertifiedVoteScoringStrategyV1, CertifiedVoteScoringStrategyV2, ScoringStrategy, VoteScoringStrategy, }, - Round, + CommitIndex, Round, }; /// The `LeaderSchedule` is responsible for producing the leader schedule across @@ -60,15 +60,13 @@ impl LeaderSchedule { /// Restores the `LeaderSchedule` from storage. It will attempt to retrieve the /// last stored `ReputationScores` and use them to build a `LeaderSwapTable`. pub(crate) fn from_store(context: Arc, dag_state: Arc>) -> Self { - let leader_swap_table = dag_state.read().last_reputation_scores_from_store().map_or( + let leader_swap_table = dag_state.read().recover_last_commit_info().map_or( LeaderSwapTable::default(), - |reputation_scores| { + |(last_commit_ref, last_commit_info)| { LeaderSwapTable::new( context.clone(), - reputation_scores, - context - .protocol_config - .consensus_bad_nodes_stake_threshold(), + last_commit_ref.index, + last_commit_info.reputation_scores, ) }, ); @@ -145,12 +143,11 @@ impl LeaderSchedule { reputation_scores.update_metrics(self.context.clone()); + let last_commit_index = unscored_subdags.last().unwrap().commit_index; self.update_leader_swap_table(LeaderSwapTable::new( self.context.clone(), + last_commit_index, reputation_scores.clone(), - self.context - .protocol_config - .consensus_bad_nodes_stake_threshold(), )); self.context @@ -250,6 +247,10 @@ pub(crate) struct LeaderSwapTable { /// Storing the hostname & stake along side the authority index for debugging. pub(crate) bad_nodes: BTreeMap, + /// Scores by authority in descending order, needed by other parts of the system + /// for a consistent view on how each validator performs in consensus. + pub(crate) reputation_scores_desc: Vec<(AuthorityIndex, u64)>, + // The scores for which the leader swap table was built from. This struct is // used for debugging purposes. Once `good_nodes` & `bad_nodes` are identified // the `reputation_scores` are no longer needed functionally for the swap table. @@ -263,9 +264,27 @@ impl LeaderSwapTable { // The `swap_stake_threshold` should be in the range of [0 - 33]. pub(crate) fn new( context: Arc, + commit_index: CommitIndex, reputation_scores: ReputationScores, - // Needed to prevent linter warning in simtests + ) -> Self { + let swap_stake_threshold = context + .protocol_config + .consensus_bad_nodes_stake_threshold(); + Self::new_inner( + context, + swap_stake_threshold, + commit_index, + reputation_scores, + ) + } + + fn new_inner( + context: Arc, + // Ignore linter warning in simtests. + // TODO: maybe override protocol configs in tests for swap_stake_threshold, and call new(). #[allow(unused_variables)] swap_stake_threshold: u64, + commit_index: CommitIndex, + reputation_scores: ReputationScores, ) -> Self { #[cfg(msim)] let swap_stake_threshold = 33; @@ -275,12 +294,26 @@ impl LeaderSwapTable { "The swap_stake_threshold ({swap_stake_threshold}) should be in range [0 - 33], out of bounds parameter detected" ); + // When reputation scores are disabled or at genesis, use the default value. + if reputation_scores.scores_per_authority.is_empty() { + return Self::default(); + } + + // Randomize order of authorities when they have the same score, + // to avoid bias in the selection of the good and bad nodes. + let mut seed_bytes = [0u8; 32]; + seed_bytes[28..32].copy_from_slice(&commit_index.to_le_bytes()); + let mut rng = StdRng::from_seed(seed_bytes); + let mut authorities_by_score = reputation_scores.authorities_by_score(context.clone()); + assert_eq!(authorities_by_score.len(), context.committee.size()); + authorities_by_score.shuffle(&mut rng); + // Stable sort the authorities by score descending. Order of authorities with the same score is preserved. + authorities_by_score.sort_by(|a1, a2| a2.1.cmp(&a1.1)); + // Calculating the good nodes let good_nodes = Self::retrieve_first_nodes( context.clone(), - reputation_scores - .authorities_by_score_desc(context.clone()) - .into_iter(), + authorities_by_score.iter(), swap_stake_threshold, ) .into_iter() @@ -291,10 +324,7 @@ impl LeaderSwapTable { // low scorers up to the provided stake threshold. let bad_nodes = Self::retrieve_first_nodes( context.clone(), - reputation_scores - .authorities_by_score_desc(context.clone()) - .into_iter() - .rev(), + authorities_by_score.iter().rev(), swap_stake_threshold, ) .into_iter() @@ -322,6 +352,7 @@ impl LeaderSwapTable { Self { good_nodes, bad_nodes, + reputation_scores_desc: authorities_by_score, reputation_scores, } } @@ -375,15 +406,15 @@ impl LeaderSwapTable { /// [0, 100] and expresses the percentage of stake that is considered the cutoff. /// It's the caller's responsibility to ensure that the elements of the `authorities` /// input is already sorted. - fn retrieve_first_nodes( + fn retrieve_first_nodes<'a>( context: Arc, - authorities: impl Iterator, + authorities: impl Iterator, stake_threshold: u64, ) -> Vec<(AuthorityIndex, String, Stake)> { let mut filtered_authorities = Vec::new(); let mut stake = 0; - for (authority_idx, _score) in authorities { + for &(authority_idx, _score) in authorities { stake += context.committee.stake(authority_idx); // If the total accumulated stake has surpassed the stake threshold @@ -718,9 +749,13 @@ mod tests { AuthorityIndex::new_for_test(0) ); assert_eq!(leader_swap_table.bad_nodes.len(), 1); - assert!(leader_swap_table - .bad_nodes - .contains_key(&AuthorityIndex::new_for_test(1))); + assert!( + leader_swap_table + .bad_nodes + .contains_key(&AuthorityIndex::new_for_test(2)), + "{:?}", + leader_swap_table.bad_nodes + ); } #[tokio::test] @@ -853,7 +888,7 @@ mod tests { assert_eq!(leader_swap_table.good_nodes.len(), 1); assert_eq!( leader_swap_table.good_nodes[0].0, - AuthorityIndex::new_for_test(3) + AuthorityIndex::new_for_test(2) ); assert_eq!(leader_swap_table.bad_nodes.len(), 1); assert!(leader_swap_table @@ -861,7 +896,7 @@ mod tests { .contains_key(&AuthorityIndex::new_for_test(0))); assert_eq!( leader_schedule.elect_leader(4, 0), - AuthorityIndex::new_for_test(3) + AuthorityIndex::new_for_test(2) ); } @@ -874,7 +909,7 @@ mod tests { let reputation_scores = ReputationScores::new((0..11).into(), (0..4).map(|i| i as u64).collect::>()); let leader_swap_table = - LeaderSwapTable::new(context, reputation_scores, swap_stake_threshold); + LeaderSwapTable::new_inner(context, swap_stake_threshold, 0, reputation_scores); assert_eq!(leader_swap_table.good_nodes.len(), 1); assert_eq!( @@ -896,7 +931,7 @@ mod tests { let reputation_scores = ReputationScores::new((0..11).into(), (0..4).map(|i| i as u64).collect::>()); let leader_swap_table = - LeaderSwapTable::new(context.clone(), reputation_scores, swap_stake_threshold); + LeaderSwapTable::new_inner(context.clone(), swap_stake_threshold, 0, reputation_scores); // Test swapping a bad leader let leader = AuthorityIndex::new_for_test(0); @@ -918,7 +953,7 @@ mod tests { telemetry_subscribers::init_for_testing(); let context = Arc::new(Context::new_for_test(4).0); - let authorities = vec![ + let authorities = [ (AuthorityIndex::new_for_test(0), 1), (AuthorityIndex::new_for_test(1), 2), (AuthorityIndex::new_for_test(2), 3), @@ -928,7 +963,7 @@ mod tests { let stake_threshold = 50; let filtered_authorities = LeaderSwapTable::retrieve_first_nodes( context.clone(), - authorities.into_iter(), + authorities.iter(), stake_threshold, ); @@ -962,7 +997,7 @@ mod tests { let swap_stake_threshold = 34; let reputation_scores = ReputationScores::new((0..11).into(), (0..4).map(|i| i as u64).collect::>()); - LeaderSwapTable::new(context, reputation_scores, swap_stake_threshold); + LeaderSwapTable::new_inner(context, swap_stake_threshold, 0, reputation_scores); } #[tokio::test] @@ -974,7 +1009,7 @@ mod tests { let reputation_scores = ReputationScores::new((1..11).into(), (0..4).map(|i| i as u64).collect::>()); let leader_swap_table = - LeaderSwapTable::new(context.clone(), reputation_scores, swap_stake_threshold); + LeaderSwapTable::new_inner(context.clone(), swap_stake_threshold, 0, reputation_scores); let leader_schedule = LeaderSchedule::new(context.clone(), LeaderSwapTable::default()); @@ -986,7 +1021,7 @@ mod tests { (0..4).map(|i| i as u64).collect::>(), ); let leader_swap_table = - LeaderSwapTable::new(context.clone(), reputation_scores, swap_stake_threshold); + LeaderSwapTable::new_inner(context.clone(), swap_stake_threshold, 0, reputation_scores); // Update leader from old swap table to new valid swap table leader_schedule.update_leader_swap_table(leader_swap_table.clone()); @@ -1004,7 +1039,7 @@ mod tests { let reputation_scores = ReputationScores::new((1..11).into(), (0..4).map(|i| i as u64).collect::>()); let leader_swap_table = - LeaderSwapTable::new(context.clone(), reputation_scores, swap_stake_threshold); + LeaderSwapTable::new_inner(context.clone(), swap_stake_threshold, 0, reputation_scores); let leader_schedule = LeaderSchedule::new(context.clone(), LeaderSwapTable::default()); @@ -1016,7 +1051,7 @@ mod tests { (0..4).map(|i| i as u64).collect::>(), ); let leader_swap_table = - LeaderSwapTable::new(context.clone(), reputation_scores, swap_stake_threshold); + LeaderSwapTable::new_inner(context.clone(), swap_stake_threshold, 0, reputation_scores); // Update leader from old swap table to new valid swap table leader_schedule.update_leader_swap_table(leader_swap_table.clone()); @@ -1026,7 +1061,7 @@ mod tests { (0..4).map(|i| i as u64).collect::>(), ); let leader_swap_table = - LeaderSwapTable::new(context.clone(), reputation_scores, swap_stake_threshold); + LeaderSwapTable::new_inner(context.clone(), swap_stake_threshold, 0, reputation_scores); // Update leader from old swap table to new invalid swap table leader_schedule.update_leader_swap_table(leader_swap_table.clone()); diff --git a/consensus/core/src/leader_scoring.rs b/consensus/core/src/leader_scoring.rs index 107dea08e5a48..aba1b7a6b0dea 100644 --- a/consensus/core/src/leader_scoring.rs +++ b/consensus/core/src/leader_scoring.rs @@ -2,7 +2,6 @@ // SPDX-License-Identifier: Apache-2.0 use std::{ - cmp::Ordering, collections::{BTreeMap, HashMap}, fmt::Debug, ops::Bound::{Excluded, Included}, @@ -103,13 +102,9 @@ impl ReputationScores { } } - // Returns the authorities in score descending order. - pub(crate) fn authorities_by_score_desc( - &self, - context: Arc, - ) -> Vec<(AuthorityIndex, u64)> { - let mut authorities: Vec<_> = self - .scores_per_authority + // Returns the authorities index with score tuples. + pub(crate) fn authorities_by_score(&self, context: Arc) -> Vec<(AuthorityIndex, u64)> { + self.scores_per_authority .iter() .enumerate() .map(|(index, score)| { @@ -121,20 +116,7 @@ impl ReputationScores { *score, ) }) - .collect(); - - authorities.sort_by(|a1, a2| { - match a2.1.cmp(&a1.1) { - Ordering::Equal => { - // we resolve the score equality deterministically by ordering in authority - // identifier order descending. - a2.0.cmp(&a1.0) - } - result => result, - } - }); - - authorities + .collect() } pub(crate) fn update_metrics(&self, context: Arc) { @@ -334,17 +316,17 @@ mod tests { use crate::{leader_scoring_strategy::VoteScoringStrategy, test_dag_builder::DagBuilder}; #[tokio::test] - async fn test_reputation_scores_authorities_by_score_desc() { + async fn test_reputation_scores_authorities_by_score() { let context = Arc::new(Context::new_for_test(4).0); let scores = ReputationScores::new((1..300).into(), vec![4, 1, 1, 3]); - let authorities = scores.authorities_by_score_desc(context); + let authorities = scores.authorities_by_score(context); assert_eq!( authorities, vec![ (AuthorityIndex::new_for_test(0), 4), - (AuthorityIndex::new_for_test(3), 3), + (AuthorityIndex::new_for_test(1), 1), (AuthorityIndex::new_for_test(2), 1), - (AuthorityIndex::new_for_test(1), 1) + (AuthorityIndex::new_for_test(3), 3), ] ); } diff --git a/consensus/core/src/leader_scoring_strategy.rs b/consensus/core/src/leader_scoring_strategy.rs index 67137227c497a..55c7d9e4405dc 100644 --- a/consensus/core/src/leader_scoring_strategy.rs +++ b/consensus/core/src/leader_scoring_strategy.rs @@ -177,7 +177,6 @@ impl ScoringStrategy for VoteScoringStrategy { fn calculate_scores_for_leader(&self, subdag: &UnscoredSubdag, leader_slot: Slot) -> Vec { let num_authorities = subdag.context.committee.size(); let mut scores_per_authority = vec![0_u64; num_authorities]; - let voting_round = leader_slot.round + 1; let leader_blocks = subdag.get_blocks_at_slot(leader_slot); @@ -192,8 +191,11 @@ impl ScoringStrategy for VoteScoringStrategy { let leader_block = leader_blocks.first().unwrap(); + let voting_round = leader_slot.round + 1; let voting_blocks = subdag.get_blocks_at_round(voting_round); for potential_vote in voting_blocks { + // TODO: use the decided leader as input instead of leader slot. If the leader was skipped, + // votes to skip should be included in the score as well. if subdag.is_vote(&potential_vote, leader_block) { let authority = potential_vote.author(); tracing::trace!( diff --git a/consensus/core/src/tests/pipelined_committer_tests.rs b/consensus/core/src/tests/pipelined_committer_tests.rs index c684dd2def47c..4fe251329602b 100644 --- a/consensus/core/src/tests/pipelined_committer_tests.rs +++ b/consensus/core/src/tests/pipelined_committer_tests.rs @@ -8,7 +8,7 @@ use parking_lot::RwLock; use crate::{ block::{BlockAPI, Slot, TestBlock, Transaction, VerifiedBlock}, - commit::{LeaderStatus, DEFAULT_WAVE_LENGTH}, + commit::{DecidedLeader, DEFAULT_WAVE_LENGTH}, context::Context, dag_state::DagState, leader_schedule::{LeaderSchedule, LeaderSwapTable}, @@ -27,12 +27,12 @@ async fn direct_commit() { build_dag(context, dag_state, None, decision_round_wave_0_pipeline_1); let last_decided = Slot::new_for_test(0, 0); - let sequence = committer.try_commit(last_decided); + let sequence = committer.try_decide(last_decided); tracing::info!("Commit sequence: {sequence:#?}"); assert_eq!(sequence.len(), 1); let leader_round_wave_0_pipeline_1 = committer.committers[1].leader_round(0); - if let LeaderStatus::Commit(ref block) = sequence[0] { + if let DecidedLeader::Commit(ref block) = sequence[0] { assert_eq!(block.round(), leader_round_wave_0_pipeline_1); assert_eq!( block.author(), @@ -61,11 +61,11 @@ async fn idempotence() { // Commit one leader. let last_decided = Slot::new_for_test(0, 0); - let first_sequence = committer.try_commit(last_decided); + let first_sequence = committer.try_decide(last_decided); assert_eq!(first_sequence.len(), 1); tracing::info!("Commit sequence: {first_sequence:#?}"); - if let LeaderStatus::Commit(ref block) = first_sequence[0] { + if let DecidedLeader::Commit(ref block) = first_sequence[0] { assert_eq!(block.round(), leader_round_pipeline_1_wave_0); assert_eq!( block.author(), @@ -77,10 +77,10 @@ async fn idempotence() { // Ensure that if try_commit is called again with the same last decided leader // input the commit decision will be the same. - let first_sequence = committer.try_commit(last_decided); + let first_sequence = committer.try_decide(last_decided); assert_eq!(first_sequence.len(), 1); - if let LeaderStatus::Commit(ref block) = first_sequence[0] { + if let DecidedLeader::Commit(ref block) = first_sequence[0] { assert_eq!(block.round(), leader_round_pipeline_1_wave_0); assert_eq!( block.author(), @@ -92,7 +92,7 @@ async fn idempotence() { // Ensure we don't commit the same leader again once last decided has been updated. let last_decided = Slot::new(first_sequence[0].round(), first_sequence[0].authority()); - let sequence = committer.try_commit(last_decided); + let sequence = committer.try_decide(last_decided); assert!(sequence.is_empty()); } @@ -121,11 +121,11 @@ async fn multiple_direct_commit() { )); // Because of pipelining we are committing a leader every round. - let sequence = committer.try_commit(last_decided); + let sequence = committer.try_decide(last_decided); tracing::info!("Commit sequence: {sequence:#?}"); assert_eq!(sequence.len(), 1); - if let LeaderStatus::Commit(ref block) = sequence[0] { + if let DecidedLeader::Commit(ref block) = sequence[0] { assert_eq!(block.round(), leader_round); assert_eq!( block.author(), @@ -157,14 +157,14 @@ async fn direct_commit_late_call() { build_dag(context.clone(), dag_state.clone(), None, decision_round); let last_decided = Slot::new_for_test(0, 0); - let sequence = committer.try_commit(last_decided); + let sequence = committer.try_decide(last_decided); tracing::info!("Commit sequence: {sequence:#?}"); assert_eq!(sequence.len(), n); for (i, leader_block) in sequence.iter().enumerate() { // First sequenced leader should be in round 1. let leader_round = i as u32 + 1; - if let LeaderStatus::Commit(ref block) = leader_block { + if let DecidedLeader::Commit(ref block) = leader_block { assert_eq!(block.round(), leader_round); assert_eq!(block.author(), committer.get_leaders(leader_round)[0]); } else { @@ -188,7 +188,7 @@ async fn no_genesis_commit() { ancestors = Some(build_dag(context.clone(), dag_state.clone(), ancestors, r)); let last_decided = Slot::new_for_test(0, 0); - let sequence = committer.try_commit(last_decided); + let sequence = committer.try_decide(last_decided); assert!(sequence.is_empty()); } } @@ -232,11 +232,11 @@ async fn direct_skip_no_leader() { // Ensure no blocks are committed because there are 2f+1 blame (non-votes) for // the missing leader. let last_decided = Slot::new_for_test(0, 0); - let sequence = committer.try_commit(last_decided); + let sequence = committer.try_decide(last_decided); tracing::info!("Commit sequence: {sequence:#?}"); assert_eq!(sequence.len(), 1); - if let LeaderStatus::Skip(leader) = sequence[0] { + if let DecidedLeader::Skip(leader) = sequence[0] { assert_eq!(leader.authority, leader_pipeline_1_wave_0); assert_eq!(leader.round, leader_round_pipeline_1_wave_0); } else { @@ -305,11 +305,11 @@ async fn direct_skip_enough_blame() { // Ensure the leader is skipped because there are 2f+1 blame (non-votes) for // the wave 0 leader of pipeline 1. let last_decided = Slot::new_for_test(0, 0); - let sequence = committer.try_commit(last_decided); + let sequence = committer.try_decide(last_decided); tracing::info!("Commit sequence: {sequence:#?}"); assert_eq!(sequence.len(), 1); - if let LeaderStatus::Skip(leader) = sequence[0] { + if let DecidedLeader::Skip(leader) = sequence[0] { assert_eq!(leader.authority, leader_pipeline_1_wave_0); assert_eq!(leader.round, leader_round_pipeline_1_wave_0); } else { @@ -411,13 +411,13 @@ async fn indirect_commit() { // Ensure we commit the first leaders. let last_decided = Slot::new_for_test(0, 0); - let sequence = committer.try_commit(last_decided); + let sequence = committer.try_decide(last_decided); tracing::info!("Commit sequence: {sequence:#?}"); assert_eq!(sequence.len(), 5); let committed_leader_round = 1; let leader = committer.get_leaders(committed_leader_round)[0]; - if let LeaderStatus::Commit(ref block) = sequence[0] { + if let DecidedLeader::Commit(ref block) = sequence[0] { assert_eq!(block.round(), committed_leader_round); assert_eq!(block.author(), leader); } else { @@ -426,7 +426,7 @@ async fn indirect_commit() { let skipped_leader_round = 2; let leader = committer.get_leaders(skipped_leader_round)[0]; - if let LeaderStatus::Skip(ref slot) = sequence[1] { + if let DecidedLeader::Skip(ref slot) = sequence[1] { assert_eq!(slot.round, skipped_leader_round); assert_eq!(slot.authority, leader); } else { @@ -491,7 +491,7 @@ async fn indirect_skip() { // Ensure we commit the first 3 leaders, skip the 4th, and commit the last 2 leaders. let last_decided = Slot::new_for_test(0, 0); - let sequence = committer.try_commit(last_decided); + let sequence = committer.try_decide(last_decided); tracing::info!("Commit sequence: {sequence:#?}"); assert_eq!(sequence.len(), 7); @@ -500,7 +500,7 @@ async fn indirect_skip() { // First sequenced leader should be in round 1. let leader_round = i + 1; let leader = committer.get_leaders(leader_round)[0]; - if let LeaderStatus::Commit(ref block) = sequence[i as usize] { + if let DecidedLeader::Commit(ref block) = sequence[i as usize] { assert_eq!(block.author(), leader); } else { panic!("Expected a committed leader") @@ -508,7 +508,7 @@ async fn indirect_skip() { } // Ensure we skip the leader of wave 1 (pipeline one) but commit the others. - if let LeaderStatus::Skip(leader) = sequence[3] { + if let DecidedLeader::Skip(leader) = sequence[3] { assert_eq!(leader.authority, committer.get_leaders(leader_round_4)[0]); assert_eq!(leader.round, leader_round_4); } else { @@ -519,7 +519,7 @@ async fn indirect_skip() { for i in 4..=6 { let leader_round = i + 1; let leader = committer.get_leaders(leader_round)[0]; - if let LeaderStatus::Commit(ref block) = sequence[i as usize] { + if let DecidedLeader::Commit(ref block) = sequence[i as usize] { assert_eq!(block.author(), leader); } else { panic!("Expected a committed leader") @@ -569,7 +569,7 @@ async fn undecided() { // Ensure no blocks are committed. let last_decided = Slot::new_for_test(0, 0); - let sequence = committer.try_commit(last_decided); + let sequence = committer.try_decide(last_decided); assert!(sequence.is_empty()); } @@ -714,11 +714,11 @@ async fn test_byzantine_validator() { // Expect a successful direct commit of A12 and leaders at rounds 1 ~ 11 as // pipelining is enabled. let last_decided = Slot::new_for_test(0, 0); - let sequence = committer.try_commit(last_decided); + let sequence = committer.try_decide(last_decided); tracing::info!("Commit sequence: {sequence:#?}"); assert_eq!(sequence.len(), 12); - if let LeaderStatus::Commit(ref block) = sequence[11] { + if let DecidedLeader::Commit(ref block) = sequence[11] { assert_eq!(block.round(), leader_round_12); assert_eq!(block.author(), committer.get_leaders(leader_round_12)[0]) } else { @@ -737,7 +737,7 @@ async fn test_byzantine_validator() { // Ensure B13 is marked as undecided as there is <2f+1 blame and <2f+1 certs let last_sequenced = sequence.last().unwrap(); let last_decided = Slot::new(last_sequenced.round(), last_sequenced.authority()); - let sequence = committer.try_commit(last_decided); + let sequence = committer.try_decide(last_decided); assert!(sequence.is_empty()); // Now build an additional 3 dag layers on top of the existing dag so a commit @@ -749,7 +749,7 @@ async fn test_byzantine_validator() { Some(references_round_15), 18, ); - let sequence = committer.try_commit(last_decided); + let sequence = committer.try_decide(last_decided); tracing::info!("Commit sequence: {sequence:#?}"); assert_eq!(sequence.len(), 4); @@ -757,7 +757,7 @@ async fn test_byzantine_validator() { // of the multiple blocks at slot B13. let skipped_leader_round = 13; let leader = *committer.get_leaders(skipped_leader_round).first().unwrap(); - if let LeaderStatus::Skip(ref slot) = sequence[0] { + if let DecidedLeader::Skip(ref slot) = sequence[0] { assert_eq!(slot.round, skipped_leader_round); assert_eq!(slot.authority, leader); } else { diff --git a/consensus/core/src/tests/universal_committer_tests.rs b/consensus/core/src/tests/universal_committer_tests.rs index 1eb440d3ed153..8d57a935fbc6a 100644 --- a/consensus/core/src/tests/universal_committer_tests.rs +++ b/consensus/core/src/tests/universal_committer_tests.rs @@ -8,7 +8,7 @@ use parking_lot::RwLock; use crate::{ block::{BlockAPI, Slot, TestBlock, Transaction, VerifiedBlock}, - commit::LeaderStatus, + commit::DecidedLeader, context::Context, dag_state::DagState, leader_schedule::{LeaderSchedule, LeaderSwapTable}, @@ -43,11 +43,11 @@ async fn direct_commit() { // The universal committer should mark the potential leaders in leader round 6 as // undecided because there is no way to get enough certificates for leaders of // leader round 6 without completing wave 2. - let sequence = test_setup.committer.try_commit(last_decided); + let sequence = test_setup.committer.try_decide(last_decided); tracing::info!("Commit sequence: {sequence:#?}"); assert_eq!(sequence.len(), 1); - if let LeaderStatus::Commit(ref block) = sequence[0] { + if let DecidedLeader::Commit(ref block) = sequence[0] { assert_eq!( block.author(), test_setup.committer.get_leaders(leader_round_wave_1)[0] @@ -74,10 +74,10 @@ async fn idempotence() { // Commit one leader. let last_decided = Slot::new_for_test(0, 0); - let first_sequence = committer.try_commit(last_decided); + let first_sequence = committer.try_decide(last_decided); assert_eq!(first_sequence.len(), 1); - if let LeaderStatus::Commit(ref block) = first_sequence[0] { + if let DecidedLeader::Commit(ref block) = first_sequence[0] { assert_eq!(first_sequence[0].round(), leader_round_wave_1); assert_eq!( block.author(), @@ -89,10 +89,10 @@ async fn idempotence() { // Ensure that if try_commit is called again with the same last decided leader // input the commit decision will be the same. - let first_sequence = committer.try_commit(last_decided); + let first_sequence = committer.try_decide(last_decided); assert_eq!(first_sequence.len(), 1); - if let LeaderStatus::Commit(ref block) = first_sequence[0] { + if let DecidedLeader::Commit(ref block) = first_sequence[0] { assert_eq!(first_sequence[0].round(), leader_round_wave_1); assert_eq!( block.author(), @@ -119,11 +119,11 @@ async fn idempotence() { leader_status_wave_1.authority(), ); let leader_round_wave_2 = committer.committers[0].leader_round(2); - let second_sequence = committer.try_commit(last_decided); + let second_sequence = committer.try_decide(last_decided); tracing::info!("Commit sequence: {second_sequence:#?}"); assert_eq!(second_sequence.len(), 1); - if let LeaderStatus::Commit(ref block) = second_sequence[0] { + if let DecidedLeader::Commit(ref block) = second_sequence[0] { assert_eq!(second_sequence[0].round(), leader_round_wave_2); assert_eq!( block.author(), @@ -154,11 +154,11 @@ async fn multiple_direct_commit() { // After each wave is complete try commit the leader of that wave. let leader_round = committer.committers[0].leader_round(n); - let sequence = committer.try_commit(last_decided); + let sequence = committer.try_decide(last_decided); tracing::info!("Commit sequence: {sequence:#?}"); assert_eq!(sequence.len(), 1); - if let LeaderStatus::Commit(ref block) = sequence[0] { + if let DecidedLeader::Commit(ref block) = sequence[0] { assert_eq!(block.round(), leader_round); assert_eq!(block.author(), committer.get_leaders(leader_round)[0]); } else { @@ -188,7 +188,7 @@ async fn direct_commit_late_call() { ); let last_decided = Slot::new_for_test(0, 0); - let sequence = committer.try_commit(last_decided); + let sequence = committer.try_decide(last_decided); tracing::info!("Commit sequence: {sequence:#?}"); // With 11 waves completed, excluding wave 0 with genesis round as its leader @@ -196,7 +196,7 @@ async fn direct_commit_late_call() { assert_eq!(sequence.len(), num_waves - 1_usize); for (i, leader_block) in sequence.iter().enumerate() { let leader_round = committer.committers[0].leader_round(i as u32 + 1); - if let LeaderStatus::Commit(ref block) = leader_block { + if let DecidedLeader::Commit(ref block) = leader_block { assert_eq!(block.round(), leader_round); assert_eq!(block.author(), committer.get_leaders(leader_round)[0]); } else { @@ -217,7 +217,7 @@ async fn no_genesis_commit() { ancestors = Some(build_dag(context.clone(), dag_state.clone(), ancestors, r)); let last_committed = Slot::new_for_test(0, 0); - let sequence = committer.try_commit(last_committed); + let sequence = committer.try_decide(last_committed); tracing::info!("Commit sequence: {sequence:#?}"); assert!(sequence.is_empty()); } @@ -259,11 +259,11 @@ async fn direct_skip_no_leader_votes() { // Ensure no blocks are committed because there are 2f+1 blame (non-votes) for // the leader of wave 1. let last_decided = Slot::new_for_test(0, 0); - let sequence = test_setup.committer.try_commit(last_decided); + let sequence = test_setup.committer.try_decide(last_decided); tracing::info!("Commit sequence: {sequence:#?}"); assert_eq!(sequence.len(), 1); - if let LeaderStatus::Skip(leader) = sequence[0] { + if let DecidedLeader::Skip(leader) = sequence[0] { assert_eq!(leader.authority, leader_wave_1); assert_eq!(leader.round, leader_round_wave_1); } else { @@ -307,11 +307,11 @@ async fn direct_skip_missing_leader_block() { // Ensure the leader is skipped because the leader is missing. let last_committed = Slot::new_for_test(0, 0); - let sequence = test_setup.committer.try_commit(last_committed); + let sequence = test_setup.committer.try_decide(last_committed); tracing::info!("Commit sequence: {sequence:#?}"); assert_eq!(sequence.len(), 1); - if let LeaderStatus::Skip(leader) = sequence[0] { + if let DecidedLeader::Skip(leader) = sequence[0] { assert_eq!( leader.authority, test_setup.committer.get_leaders(leader_round_wave_1)[0] @@ -381,14 +381,14 @@ async fn indirect_commit() { // Ensure we indirectly commit the leader of wave 1 via the directly committed // leader of wave 2. let last_decided = Slot::new_for_test(0, 0); - let sequence = committer.try_commit(last_decided); + let sequence = committer.try_decide(last_decided); tracing::info!("Commit sequence: {sequence:#?}"); assert_eq!(sequence.len(), 2); for (idx, decided_leader) in sequence.iter().enumerate() { let leader_round = committer.committers[0].leader_round(idx as u32 + 1); let expected_leader = committer.get_leaders(leader_round)[0]; - if let LeaderStatus::Commit(ref block) = decided_leader { + if let DecidedLeader::Commit(ref block) = decided_leader { assert_eq!(block.round(), leader_round); assert_eq!(block.author(), expected_leader); } else { @@ -461,14 +461,14 @@ async fn indirect_skip() { // Ensure we make a commit decision for the leaders of wave 1 ~ 3 let last_committed = Slot::new_for_test(0, 0); - let sequence = committer.try_commit(last_committed); + let sequence = committer.try_decide(last_committed); tracing::info!("Commit sequence: {sequence:#?}"); assert_eq!(sequence.len(), 3); // Ensure we commit the leader of wave 1 directly. let leader_round_wave_1 = committer.committers[0].leader_round(1); let leader_wave_1 = committer.get_leaders(leader_round_wave_1)[0]; - if let LeaderStatus::Commit(ref block) = sequence[0] { + if let DecidedLeader::Commit(ref block) = sequence[0] { assert_eq!(block.round(), leader_round_wave_1); assert_eq!(block.author(), leader_wave_1); } else { @@ -479,7 +479,7 @@ async fn indirect_skip() { // This happens because we do not have enough votes in voting round of wave 2 // for the certificates of decision round wave 2 to form a certified link to // the leader of wave 2. - if let LeaderStatus::Skip(leader) = sequence[1] { + if let DecidedLeader::Skip(leader) = sequence[1] { assert_eq!(leader.authority, leader_wave_2); assert_eq!(leader.round, leader_round_wave_2); } else { @@ -489,7 +489,7 @@ async fn indirect_skip() { // Ensure we commit the 3rd leader directly. let leader_round_wave_3 = committer.committers[0].leader_round(3); let leader_wave_3 = committer.get_leaders(leader_round_wave_3)[0]; - if let LeaderStatus::Commit(ref block) = sequence[2] { + if let DecidedLeader::Commit(ref block) = sequence[2] { assert_eq!(block.round(), leader_round_wave_3); assert_eq!(block.author(), leader_wave_3); } else { @@ -549,7 +549,7 @@ async fn undecided() { // Ensure outcome of direct & indirect rule is undecided. So not commit decisions // should be returned. let last_committed = Slot::new_for_test(0, 0); - let sequence = committer.try_commit(last_committed); + let sequence = committer.try_decide(last_committed); tracing::info!("Commit sequence: {sequence:#?}"); assert!(sequence.is_empty()); } @@ -692,11 +692,11 @@ async fn test_byzantine_direct_commit() { // Expect a successful direct commit of A12 and leaders at rounds 9, 6 & 3. let last_decided = Slot::new_for_test(0, 0); - let sequence = committer.try_commit(last_decided); + let sequence = committer.try_decide(last_decided); tracing::info!("Commit sequence: {sequence:#?}"); assert_eq!(sequence.len(), 4); - if let LeaderStatus::Commit(ref block) = sequence[3] { + if let DecidedLeader::Commit(ref block) = sequence[3] { assert_eq!( block.author(), committer.get_leaders(leader_round_wave_4)[0] diff --git a/consensus/core/src/universal_committer.rs b/consensus/core/src/universal_committer.rs index e7e07310ddf82..dc393b31db96e 100644 --- a/consensus/core/src/universal_committer.rs +++ b/consensus/core/src/universal_committer.rs @@ -8,8 +8,8 @@ use parking_lot::RwLock; use crate::{ base_committer::BaseCommitter, - block::{Round, Slot}, - commit::{Decision, LeaderStatus}, + block::{Round, Slot, GENESIS_ROUND}, + commit::{DecidedLeader, Decision}, context::Context, dag_state::DagState, }; @@ -35,10 +35,10 @@ pub(crate) struct UniversalCommitter { } impl UniversalCommitter { - /// Try to commit part of the dag. This function is idempotent and returns an ordered list of - /// decided slots. + /// Try to decide part of the dag. This function is idempotent and returns an ordered list of + /// decided leaders. #[tracing::instrument(skip_all, fields(last_decided = %last_decided))] - pub(crate) fn try_commit(&self, last_decided: Slot) -> Vec { + pub(crate) fn try_decide(&self, last_decided: Slot) -> Vec { let highest_accepted_round = self.dag_state.read().highest_accepted_round(); // Try to decide as many leaders as possible, starting with the highest round. @@ -50,48 +50,47 @@ impl UniversalCommitter { 'outer: for round in (last_decided.round..=highest_accepted_round.saturating_sub(2)).rev() { for committer in self.committers.iter().rev() { // Skip committers that don't have a leader for this round. - let Some(leader) = committer.elect_leader(round) else { + let Some(slot) = committer.elect_leader(round) else { continue; }; // now that we reached the last committed leader we can stop the commit rule - if leader == last_decided { - tracing::debug!("Reached last committed {leader}, now exit"); + if slot == last_decided { + tracing::debug!("Reached last committed {slot}, now exit"); break 'outer; } - tracing::debug!("Trying to decide {leader} with {committer}",); + tracing::debug!("Trying to decide {slot} with {committer}",); // Try to directly decide the leader. - let mut status = committer.try_direct_decide(leader); + let mut status = committer.try_direct_decide(slot); tracing::debug!("Outcome of direct rule: {status}"); // If we can't directly decide the leader, try to indirectly decide it. if status.is_decided() { - leaders.push_front((status.clone(), Decision::Direct)); + leaders.push_front((status, Decision::Direct)); } else { - status = committer.try_indirect_decide(leader, leaders.iter().map(|(x, _)| x)); - leaders.push_front((status.clone(), Decision::Indirect)); + status = committer.try_indirect_decide(slot, leaders.iter().map(|(x, _)| x)); tracing::debug!("Outcome of indirect rule: {status}"); + leaders.push_front((status, Decision::Indirect)); } } } // The decided sequence is the longest prefix of decided leaders. - leaders - .into_iter() - // Filter out all the genesis. - .filter(|(x, _)| x.round() > 0) - // Stop the sequence upon encountering an undecided leader. - .take_while(|(x, _)| x.is_decided()) - // We want to report metrics at this point to ensure that the decisions - // are reported only once hence we increase our accuracy - .inspect(|(x, direct_decided)| { - self.update_metrics(x, *direct_decided); - tracing::debug!("Decided {x}"); - }) - .map(|(x, _)| x) - .collect() + let mut decided_leaders = Vec::new(); + for (leader, decision) in leaders { + if leader.round() == GENESIS_ROUND { + continue; + } + let Some(decided_leader) = leader.into_decided_leader() else { + break; + }; + self.update_metrics(&decided_leader, decision); + decided_leaders.push(decided_leader); + } + tracing::debug!("Decided {decided_leaders:?}"); + decided_leaders } /// Return list of leaders for the round. @@ -105,23 +104,26 @@ impl UniversalCommitter { } /// Update metrics. - fn update_metrics(&self, leader: &LeaderStatus, decision: Decision) { - let authority = leader.authority().to_string(); + fn update_metrics(&self, decided_leader: &DecidedLeader, decision: Decision) { let decision_str = if decision == Decision::Direct { "direct" } else { "indirect" }; - let status = match leader { - LeaderStatus::Commit(..) => format!("{decision_str}-commit"), - LeaderStatus::Skip(..) => format!("{decision_str}-skip"), - LeaderStatus::Undecided(..) => return, + let status = match decided_leader { + DecidedLeader::Commit(..) => format!("{decision_str}-commit"), + DecidedLeader::Skip(..) => format!("{decision_str}-skip"), }; + let leader_host = &self + .context + .committee + .authority(decided_leader.slot().authority) + .hostname; self.context .metrics .node_metrics .committed_leaders_total - .with_label_values(&[&authority, &status]) + .with_label_values(&[leader_host, &status]) .inc(); } } diff --git a/crates/sui-core/src/consensus_types/consensus_output_api.rs b/crates/sui-core/src/consensus_types/consensus_output_api.rs index fc22c1f7c406b..5764124237f42 100644 --- a/crates/sui-core/src/consensus_types/consensus_output_api.rs +++ b/crates/sui-core/src/consensus_types/consensus_output_api.rs @@ -105,9 +105,9 @@ impl ConsensusOutputAPI for narwhal_types::ConsensusOutput { impl ConsensusOutputAPI for consensus_core::CommittedSubDag { fn reputation_score_sorted_desc(&self) -> Option> { - if !self.reputation_scores.is_empty() { + if !self.reputation_scores_desc.is_empty() { Some( - self.reputation_scores + self.reputation_scores_desc .iter() .map(|(id, score)| (id.value() as AuthorityIndex, *score)) .collect(),