From 537f02b0b2a7c389780f1a6fbe370ccdfd762268 Mon Sep 17 00:00:00 2001 From: Anastasios Kichidis Date: Wed, 15 Jan 2025 15:03:59 +0000 Subject: [PATCH 1/3] [refactor] allow blocks synced via commit syncer to be accepted when above the last committed leader's gc. --- consensus/core/src/authority_service.rs | 9 +- consensus/core/src/base_committer.rs | 1 + consensus/core/src/block_manager.rs | 200 ++++++-- consensus/core/src/commit.rs | 28 + consensus/core/src/commit_syncer.rs | 46 +- consensus/core/src/core.rs | 596 ++++++++++++++++++++-- consensus/core/src/core_thread.rs | 39 ++ consensus/core/src/dag_state.rs | 6 +- consensus/core/src/error.rs | 8 + consensus/core/src/leader_timeout.rs | 8 + consensus/core/src/round_prober.rs | 9 +- consensus/core/src/test_dag_builder.rs | 32 +- consensus/core/src/universal_committer.rs | 22 +- 13 files changed, 919 insertions(+), 85 deletions(-) diff --git a/consensus/core/src/authority_service.rs b/consensus/core/src/authority_service.rs index ec71d3a273aa1..810914d2d6fc7 100644 --- a/consensus/core/src/authority_service.rs +++ b/consensus/core/src/authority_service.rs @@ -686,7 +686,7 @@ mod tests { use crate::{ authority_service::AuthorityService, block::{BlockAPI, BlockRef, SignedBlock, TestBlock, VerifiedBlock}, - commit::CommitRange, + commit::{CertifiedCommit, CommitRange}, commit_vote_monitor::CommitVoteMonitor, context::Context, core_thread::{CoreError, CoreThreadDispatcher}, @@ -743,6 +743,13 @@ mod tests { Ok(BTreeSet::new()) } + async fn add_certified_commits( + &self, + _commits: Vec, + ) -> Result, CoreError> { + todo!() + } + async fn new_block(&self, _round: Round, _force: bool) -> Result<(), CoreError> { Ok(()) } diff --git a/consensus/core/src/base_committer.rs b/consensus/core/src/base_committer.rs index f529a9db90701..58ba73babb985 100644 --- a/consensus/core/src/base_committer.rs +++ b/consensus/core/src/base_committer.rs @@ -5,6 +5,7 @@ use std::{collections::HashMap, fmt::Display, sync::Arc}; use consensus_config::{AuthorityIndex, Stake}; use parking_lot::RwLock; +use tracing::warn; use crate::{ block::{BlockAPI, BlockRef, Round, Slot, VerifiedBlock}, diff --git a/consensus/core/src/block_manager.rs b/consensus/core/src/block_manager.rs index 967b493f9fac1..316b1e6094720 100644 --- a/consensus/core/src/block_manager.rs +++ b/consensus/core/src/block_manager.rs @@ -3,7 +3,6 @@ use std::{ collections::{BTreeMap, BTreeSet}, - iter, sync::Arc, time::Instant, }; @@ -84,11 +83,46 @@ impl BlockManager { /// Tries to accept the provided blocks assuming that all their causal history exists. The method /// returns all the blocks that have been successfully processed in round ascending order, that includes also previously /// suspended blocks that have now been able to get accepted. Method also returns a set with the missing ancestor blocks. + #[tracing::instrument(skip_all)] pub(crate) fn try_accept_blocks( &mut self, - mut blocks: Vec, + blocks: Vec, ) -> (Vec, BTreeSet) { let _s = monitored_scope("BlockManager::try_accept_blocks"); + self.try_accept_blocks_internal(blocks, false) + } + + // Tries to accept blocks that have been committed. Returns all the blocks that have been accepted, both from the ones + // provided and any children blocks. + #[tracing::instrument(skip_all)] + pub(crate) fn try_accept_committed_blocks( + &mut self, + blocks: Vec, + ) -> Vec { + // If GC is disabled then should not run any of this logic. + if !self.dag_state.read().gc_enabled() { + return Vec::new(); + } + + // Just accept the blocks + let _s = monitored_scope("BlockManager::try_accept_committed_blocks"); + let (accepted_blocks, missing_blocks) = self.try_accept_blocks_internal(blocks, true); + assert!( + missing_blocks.is_empty(), + "No missing blocks should be returned for committed blocks" + ); + + accepted_blocks + } + + /// Attempts to accept the provided blocks. When `committed = true` then the blocks are considered to be committed via certified commits and + /// are handled differently. + fn try_accept_blocks_internal( + &mut self, + mut blocks: Vec, + committed: bool, + ) -> (Vec, BTreeSet) { + let _s = monitored_scope("BlockManager::try_accept_blocks_internal"); blocks.sort_by_key(|b| b.round()); debug!( @@ -104,46 +138,82 @@ impl BlockManager { // Try to accept the input block. let block_ref = block.reference(); - let block = match self.try_accept_one_block(block) { - TryAcceptResult::Accepted(block) => block, - TryAcceptResult::Suspended(ancestors_to_fetch) => { - debug!( - "Missing ancestors to fetch for block {block_ref}: {}", - ancestors_to_fetch.iter().map(|b| b.to_string()).join(",") - ); - missing_blocks.extend(ancestors_to_fetch); - continue; - } - TryAcceptResult::Processed | TryAcceptResult::Skipped => continue, + + let mut to_verify_timestamps_and_accept = vec![]; + if committed { + match self.try_accept_one_committed_block(block) { + TryAcceptResult::Accepted(block) => { + // As this is a committed block, then it's already accepted and there is no need to verify its timestamps. + // Just add it to the accepted blocks list. + accepted_blocks.push(block); + } + TryAcceptResult::Processed => continue, + TryAcceptResult::Suspended(_) | TryAcceptResult::Skipped => panic!( + "Did not expect to suspend or skip a committed block: {:?}", + block_ref + ), + }; + } else { + match self.try_accept_one_block(block) { + TryAcceptResult::Accepted(block) => { + to_verify_timestamps_and_accept.push(block); + } + TryAcceptResult::Suspended(ancestors_to_fetch) => { + debug!( + "Missing ancestors to fetch for block {block_ref}: {}", + ancestors_to_fetch.iter().map(|b| b.to_string()).join(",") + ); + missing_blocks.extend(ancestors_to_fetch); + continue; + } + TryAcceptResult::Processed | TryAcceptResult::Skipped => continue, + }; }; // If the block is accepted, try to unsuspend its children blocks if any. - let unsuspended_blocks = self.try_unsuspend_children_blocks(block.reference()); + let unsuspended_blocks = self.try_unsuspend_children_blocks(block_ref); + to_verify_timestamps_and_accept.extend(unsuspended_blocks); // Verify block timestamps - let blocks_to_accept = self - .verify_block_timestamps_and_accept(iter::once(block).chain(unsuspended_blocks)); + let blocks_to_accept = + self.verify_block_timestamps_and_accept(to_verify_timestamps_and_accept); accepted_blocks.extend(blocks_to_accept); } - let metrics = &self.context.metrics.node_metrics; - metrics - .missing_blocks_total - .inc_by(missing_blocks.len() as u64); - metrics - .block_manager_suspended_blocks - .set(self.suspended_blocks.len() as i64); - metrics - .block_manager_missing_ancestors - .set(self.missing_ancestors.len() as i64); - metrics - .block_manager_missing_blocks - .set(self.missing_blocks.len() as i64); + self.update_stats(missing_blocks.len() as u64); // Figure out the new missing blocks (accepted_blocks, missing_blocks) } + fn try_accept_one_committed_block(&mut self, block: VerifiedBlock) -> TryAcceptResult { + if self.dag_state.read().contains_block(&block.reference()) { + return TryAcceptResult::Processed; + } + + // Remove the block from missing and suspended blocks + self.missing_blocks.remove(&block.reference()); + + // If the block has been already fetched and parked as suspended block, then remove it. Also find all the references of missing + // ancestors to remove those as well. If we don't do that then it's possible once the missing ancestor is fetched to cause a panic + // when trying to unsuspend this children as it won't be found in the suspended blocks map. + if let Some(suspended_block) = self.suspended_blocks.remove(&block.reference()) { + suspended_block + .missing_ancestors + .iter() + .for_each(|ancestor| { + if let Some(references) = self.missing_ancestors.get_mut(ancestor) { + references.remove(&block.reference()); + } + }); + } + + // Accept this block before any unsuspended children blocks + self.dag_state.write().accept_blocks(vec![block.clone()]); + + TryAcceptResult::Accepted(block) + } + /// Tries to find the provided block_refs in DagState and BlockManager, /// and returns missing block refs. pub(crate) fn try_find_blocks(&mut self, block_refs: Vec) -> BTreeSet { @@ -563,6 +633,20 @@ impl BlockManager { self.missing_blocks.clone() } + fn update_stats(&mut self, missing_blocks: u64) { + let metrics = &self.context.metrics.node_metrics; + metrics.missing_blocks_total.inc_by(missing_blocks); + metrics + .block_manager_suspended_blocks + .set(self.suspended_blocks.len() as i64); + metrics + .block_manager_missing_ancestors + .set(self.missing_ancestors.len() as i64); + metrics + .block_manager_missing_blocks + .set(self.missing_blocks.len() as i64); + } + fn update_block_received_metrics(&mut self, block: &VerifiedBlock) { let (min_round, max_round) = if let Some((curr_min, curr_max)) = self.received_block_rounds[block.author()] { @@ -1047,6 +1131,64 @@ mod tests { } } + #[rstest] + #[tokio::test] + async fn try_accept_committed_blocks() { + // GIVEN + let (mut context, _key_pairs) = Context::new_for_test(4); + // We set the gc depth to 4 + context + .protocol_config + .set_consensus_gc_depth_for_testing(4); + let context = Arc::new(context); + let store = Arc::new(MemStore::new()); + let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone()))); + + // We "fake" the commit for round 6, so GC round moves to (commit_round - gc_depth = 6 - 4 = 2) + let last_commit = TrustedCommit::new_for_test( + 10, + CommitDigest::MIN, + context.clock.timestamp_utc_ms(), + BlockRef::new(6, AuthorityIndex::new_for_test(0), BlockDigest::MIN), + vec![], + ); + dag_state.write().set_last_commit(last_commit); + assert_eq!( + dag_state.read().gc_round(), + 2, + "GC round should have moved to round 2" + ); + + let mut block_manager = + BlockManager::new(context.clone(), dag_state, Arc::new(NoopBlockVerifier)); + + // create a DAG of 12 rounds + let mut dag_builder = DagBuilder::new(context.clone()); + dag_builder.layers(1..=12).build(); + + // Now try to accept via the normal acceptance block path the blocks of rounds 7 ~ 12. None of them should be accepted + let blocks = dag_builder.blocks(7..=12); + let (accepted_blocks, missing) = block_manager.try_accept_blocks(blocks.clone()); + assert!(accepted_blocks.is_empty()); + assert_eq!(missing.len(), 4); + + // Now try to accept via the committed blocks path the blocks of rounds 3 ~ 6. All of them should be accepted and also the blocks + // of rounds 7 ~ 12 should be unsuspended and accepted as well. + let blocks = dag_builder.blocks(3..=6); + + // WHEN + let mut accepted_blocks = block_manager.try_accept_committed_blocks(blocks); + + // THEN + accepted_blocks.sort_by_key(|b| b.reference()); + + let mut all_blocks = dag_builder.blocks(3..=12); + all_blocks.sort_by_key(|b| b.reference()); + + assert_eq!(accepted_blocks, all_blocks); + assert!(block_manager.is_empty()); + } + struct TestBlockVerifier { fail: BTreeSet, } diff --git a/consensus/core/src/commit.rs b/consensus/core/src/commit.rs index d1fbed756b461..c3ae24b6eebdd 100644 --- a/consensus/core/src/commit.rs +++ b/consensus/core/src/commit.rs @@ -207,6 +207,33 @@ impl Deref for TrustedCommit { } } +#[derive(Clone, Debug)] +pub(crate) struct CertifiedCommit { + commit: Arc, + blocks: Vec, +} + +impl CertifiedCommit { + pub(crate) fn new_certified(commit: TrustedCommit, blocks: Vec) -> Self { + Self { + commit: Arc::new(commit), + blocks, + } + } + + pub fn blocks(&self) -> &[VerifiedBlock] { + &self.blocks + } +} + +impl Deref for CertifiedCommit { + type Target = TrustedCommit; + + fn deref(&self) -> &Self::Target { + &self.commit + } +} + /// Digest of a consensus commit. #[derive(Clone, Copy, Default, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)] pub struct CommitDigest([u8; consensus_config::DIGEST_LENGTH]); @@ -411,6 +438,7 @@ pub fn load_committed_subdag_from_store( pub(crate) enum Decision { Direct, Indirect, + Certified, // This is a commit certified leader so no commit decision was made locally. } /// The status of a leader slot from the direct and indirect commit rules. diff --git a/consensus/core/src/commit_syncer.rs b/consensus/core/src/commit_syncer.rs index 89fc651a91f2f..6cc4d626453f4 100644 --- a/consensus/core/src/commit_syncer.rs +++ b/consensus/core/src/commit_syncer.rs @@ -48,7 +48,10 @@ use tracing::{debug, info, warn}; use crate::{ block::{BlockAPI, BlockRef, SignedBlock, VerifiedBlock}, block_verifier::BlockVerifier, - commit::{Commit, CommitAPI as _, CommitDigest, CommitRange, CommitRef, TrustedCommit}, + commit::{ + CertifiedCommit, Commit, CommitAPI as _, CommitDigest, CommitRange, CommitRef, + TrustedCommit, + }, commit_vote_monitor::CommitVoteMonitor, context::Context, core_thread::CoreThreadDispatcher, @@ -90,7 +93,7 @@ pub(crate) struct CommitSyncer { // Additional ranges of commits to fetch. pending_fetches: BTreeSet, // Fetched commits and blocks by commit range. - fetched_ranges: BTreeMap>, + fetched_ranges: BTreeMap, Vec)>, // Highest commit index among inflight and pending fetches. // Used to determine the start of new ranges to be fetched. highest_scheduled_index: Option, @@ -271,13 +274,15 @@ impl CommitSyncer { // Only add new blocks if at least some of them are not already synced. if self.synced_commit_index < commit_end { self.fetched_ranges - .insert((commit_start..=commit_end).into(), blocks); + .insert((commit_start..=commit_end).into(), (commits, blocks)); } // Try to process as many fetched blocks as possible. - while let Some((fetched_commit_range, _blocks)) = self.fetched_ranges.first_key_value() { + while let Some((fetched_commit_range, (_commits, _blocks))) = + self.fetched_ranges.first_key_value() + { // Only pop fetched_ranges if there is no gap with blocks already synced. // Note: start, end and synced_commit_index are all inclusive. - let (fetched_commit_range, blocks) = + let (fetched_commit_range, (commits, blocks)) = if fetched_commit_range.start() <= self.synced_commit_index + 1 { self.fetched_ranges.pop_first().unwrap() } else { @@ -296,11 +301,37 @@ impl CommitSyncer { fetched_commit_range, blocks.iter().map(|b| b.reference().to_string()).join(","), ); + + let mut blocks_map = BTreeMap::new(); + for block in blocks { + blocks_map.insert(block.reference(), block); + } + + let certified_commits = commits + .into_iter() + .map(|commit| { + let blocks = commit + .blocks() + .iter() + .map(|block_ref| { + // It's impossible for two different commits to sequence the same block so that operation should be safe. + blocks_map.remove(block_ref).expect("Block should exist") + }) + .collect::>(); + CertifiedCommit::new_certified(commit, blocks) + }) + .collect(); + // If core thread cannot handle the incoming blocks, it is ok to block here. // Also it is possible to have missing ancestors because an equivocating validator // may produce blocks that are not included in commits but are ancestors to other blocks. // Synchronizer is needed to fill in the missing ancestors in this case. - match self.inner.core_thread_dispatcher.add_blocks(blocks).await { + match self + .inner + .core_thread_dispatcher + .add_certified_commits(certified_commits) + .await + { Ok(missing) => { if !missing.is_empty() { warn!( @@ -326,6 +357,7 @@ impl CommitSyncer { return; } }; + // Once commits and blocks are sent to Core, ratchet up synced_commit_index self.synced_commit_index = self.synced_commit_index.max(fetched_commit_range.end()); } @@ -641,7 +673,7 @@ impl CommitSyncer { } #[cfg(test)] - fn fetched_ranges(&self) -> BTreeMap> { + fn fetched_ranges(&self) -> BTreeMap, Vec)> { self.fetched_ranges.clone() } diff --git a/consensus/core/src/core.rs b/consensus/core/src/core.rs index 356127c706aac..39aec0192ee61 100644 --- a/consensus/core/src/core.rs +++ b/consensus/core/src/core.rs @@ -1,7 +1,13 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use std::{collections::BTreeSet, iter, sync::Arc, time::Duration, vec}; +use std::{ + collections::{BTreeMap, BTreeSet}, + iter, mem, + sync::Arc, + time::Duration, + vec, +}; #[cfg(test)] use consensus_config::{local_committee_and_keys, Stake}; @@ -25,7 +31,7 @@ use crate::{ Slot, VerifiedBlock, GENESIS_ROUND, }, block_manager::BlockManager, - commit::CommittedSubDag, + commit::{CertifiedCommit, CommitAPI, CommittedSubDag, DecidedLeader, Decision}, commit_observer::CommitObserver, context::Context, dag_state::DagState, @@ -209,7 +215,7 @@ impl Core { } // Try to commit and propose, since they may not have run after the last storage write. - self.try_commit().unwrap(); + self.try_commit(vec![]).unwrap(); let last_proposed_block = if let Some(last_proposed_block) = self.try_propose(true).unwrap() { @@ -247,6 +253,7 @@ impl Core { /// Processes the provided blocks and accepts them if possible when their causal history exists. /// The method returns: /// - The references of ancestors missing their block + #[tracing::instrument(skip_all)] pub(crate) fn add_blocks( &mut self, blocks: Vec, @@ -265,7 +272,6 @@ impl Core { .core_add_blocks_batch_size .observe(blocks.len() as f64); - // Try to accept them via the block manager let (accepted_blocks, missing_block_refs) = self.block_manager.try_accept_blocks(blocks); if !accepted_blocks.is_empty() { @@ -278,7 +284,7 @@ impl Core { ); // Try to commit the new blocks if possible. - self.try_commit()?; + self.try_commit(vec![])?; // Try to propose now since there are new blocks accepted. self.try_propose(false)?; @@ -299,6 +305,47 @@ impl Core { Ok(missing_block_refs) } + // Adds the certified commits that have been synced via the commit syncer. We are using the commit info in order to skip running the decision + // rule and immediately commit the corresponding leaders and sub dags. Pay attention that no block acceptance is happening here, but rather + // internally in the `try_commit` method which ensures that everytime only the blocks corresponding to the certified commits that are about to + // be committed are accepted. + #[tracing::instrument(skip_all)] + pub(crate) fn add_certified_commits( + &mut self, + commits: Vec, + ) -> ConsensusResult> { + let _scope = monitored_scope("Core::add_certified_commits"); + + // We want to enable the commit process logic when GC is enabled. + if self.dag_state.read().gc_enabled() { + let commits = self + .validate_certified_commits(commits) + .expect("Certified commits validation failed"); + + // Try to commit the new blocks. Take into account the trusted commit that has been provided. + self.try_commit(commits)?; + + // Try to propose now since there are new blocks accepted. + self.try_propose(false)?; + + // Now set up leader timeout if needed. + // This needs to be called after try_commit() and try_propose(), which may + // have advanced the threshold clock round. + self.try_signal_new_round(); + + return Ok(BTreeSet::new()); + } + + // If GC is not enabled then process blocks as usual. + let blocks = commits + .iter() + .flat_map(|commit| commit.blocks()) + .cloned() + .collect::>(); + + self.add_blocks(blocks) + } + /// Checks if provided block refs have been accepted. If not, missing block refs are kept for synchronizations. /// Returns the references of missing blocks among the input blocks. pub(crate) fn check_block_refs( @@ -328,7 +375,6 @@ impl Core { missing_block_refs.iter().map(|b| b.to_string()).join(", ") ); } - Ok(missing_block_refs) } @@ -378,6 +424,43 @@ impl Core { Ok(None) } + /// Keeps only the certified commits that have a commit index > last commit index. It also ensures that the first commit in the list is the next one in line, otherwise it panics. + fn validate_certified_commits( + &mut self, + commits: Vec, + ) -> ConsensusResult> { + // Filter out the commits that have been already locally committed and keep only anything that is above the last committed index. + let last_commit_index = self.dag_state.read().last_commit_index(); + let commits = commits + .iter() + .filter(|commit| { + if commit.index() > last_commit_index { + true + } else { + tracing::debug!( + "Skip commit for index {} as it is already committed with last commit index {}", + commit.index(), + last_commit_index + ); + false + } + }) + .cloned() + .collect::>(); + + // Make sure that the first commit we find is the next one in line and there is no gap. + if let Some(commit) = commits.first() { + if commit.index() != last_commit_index + 1 { + return Err(ConsensusError::UnexpectedCertifiedCommitIndex { + expected_commit_index: last_commit_index + 1, + commit_index: commit.index(), + }); + } + } + + Ok(commits) + } + // Attempts to create a new block, persist and propose it to all peers. // When force is true, ignore if leader from the last round exists among ancestors and if // the minimum round delay has passed. @@ -391,7 +474,7 @@ impl Core { fail_point!("consensus-after-propose"); // The new block may help commit. - self.try_commit()?; + self.try_commit(vec![])?; return Ok(Some(extended_block.block)); } Ok(None) @@ -601,8 +684,12 @@ impl Core { }) } - /// Runs commit rule to attempt to commit additional blocks from the DAG. - fn try_commit(&mut self) -> ConsensusResult> { + /// Runs commit rule to attempt to commit additional blocks from the DAG. If any `certified_commits` are provided, then + /// it will attempt to commit those first before trying to commit any further leaders. + fn try_commit( + &mut self, + mut certified_commits: Vec, + ) -> ConsensusResult> { let _s = self .context .metrics @@ -611,6 +698,21 @@ impl Core { .with_label_values(&["Core::try_commit"]) .start_timer(); + let mut certified_commits_map = BTreeMap::new(); + for c in &certified_commits { + certified_commits_map.insert(c.index(), c.reference()); + } + + if !certified_commits.is_empty() { + info!( + "Will try to commit synced commits first : {:?}", + certified_commits + .iter() + .map(|c| (c.index(), c.leader())) + .collect::>() + ); + } + let mut committed_sub_dags = Vec::new(); // TODO: Add optimization to abort early without quorum for a round. loop { @@ -621,8 +723,10 @@ impl Core { let mut commits_until_update = self .leader_schedule .commits_until_leader_schedule_update(self.dag_state.clone()); + if commits_until_update == 0 { let last_commit_index = self.dag_state.read().last_commit_index(); + tracing::info!( "Leader schedule change triggered at commit index {last_commit_index}" ); @@ -647,30 +751,52 @@ impl Core { } assert!(commits_until_update > 0); - // TODO: limit commits by commits_until_update, which may be needed when leader schedule length - // is reduced. - let decided_leaders = self.committer.try_decide(self.last_decided_leader); + // Always try to process the synced commits first. If there are certified commits to process then the decided leaders and the commits will be returned. + let (mut decided_leaders, decided_certified_commits): ( + Vec, + Vec, + ) = self + .try_decide_certified(&mut certified_commits, commits_until_update) + .into_iter() + .unzip(); + + // Only accept blocks for the certified commits that we are certain to sequence. + // This ensures that only blocks corresponding to committed certified commits are flushed to disk. + // Blocks from non-committed certified commits will not be flushed, preventing issues during crash-recovery. + // This avoids scenarios where accepting and flushing blocks of non-committed certified commits could lead to + // premature commit rule execution. Due to GC, this could cause a panic if the commit rule tries to access + // missing causal history from blocks of certified commits. + let blocks = decided_certified_commits + .iter() + .flat_map(|c| c.blocks()) + .cloned() + .collect::>(); + self.block_manager.try_accept_committed_blocks(blocks); + + // If the certified `decided_leaders` is empty then try to run the decision rule. + if decided_leaders.is_empty() { + // TODO: limit commits by commits_until_update, which may be needed when leader schedule length is reduced. + decided_leaders = self.committer.try_decide(self.last_decided_leader); + + // Truncate the decided leaders to fit the commit schedule limit. + if decided_leaders.len() >= commits_until_update { + let _ = decided_leaders.split_off(commits_until_update); + } + } + // If the decided leaders list is empty then just break the loop. let Some(last_decided) = decided_leaders.last().cloned() else { break; }; - tracing::debug!("Decided {} leaders and {commits_until_update} commits can be made before next leader schedule change", decided_leaders.len()); - let mut sequenced_leaders = decided_leaders + self.last_decided_leader = last_decided.slot(); + + let sequenced_leaders = decided_leaders .into_iter() .filter_map(|leader| leader.into_committed_block()) .collect::>(); - // If the sequenced leaders are truncated to fit the leader schedule, use the last sequenced leader - // as the last decided leader. Otherwise, use the last decided leader from try_commit(). - let sequenced_leaders = if sequenced_leaders.len() >= commits_until_update { - let _ = sequenced_leaders.split_off(commits_until_update); - self.last_decided_leader = sequenced_leaders.last().unwrap().slot(); - sequenced_leaders - } else { - self.last_decided_leader = last_decided.slot(); - sequenced_leaders - }; + tracing::debug!("Decided {} leaders and {commits_until_update} commits can be made before next leader schedule change", sequenced_leaders.len()); self.context .metrics @@ -678,9 +804,12 @@ impl Core { .last_decided_leader_round .set(self.last_decided_leader.round as i64); + // It's possible to reach this point as the decided leaders might all of them be "Skip" decisions. In this case there is no + // leader to commit and we should break the loop. if sequenced_leaders.is_empty() { break; } + tracing::info!( "Committing {} leaders: {}", sequenced_leaders.len(), @@ -700,6 +829,18 @@ impl Core { .try_unsuspend_blocks_for_latest_gc_round(); committed_sub_dags.extend(subdags); + + fail_point!("consensus-after-handle-commit"); + } + + // Sanity check: for commits that have been linearized using the certified commits, ensure that the same sub dag has been committed. + for sub_dag in &committed_sub_dags { + if let Some(commit_ref) = certified_commits_map.remove(&sub_dag.commit_ref.index) { + assert_eq!( + commit_ref, sub_dag.commit_ref, + "Certified commit has different reference than the committed sub dag" + ); + } } // Notify about our own committed blocks @@ -820,6 +961,50 @@ impl Core { true } + // Try to decide which of the certified commits will have to be committed next respecting the `limit`. If provided `limit` is zero, it will panic. + // The function returns the list of decided leaders and updates in place the remaining certified commits. If empty vector is returned, it means that + // there are no certified commits to be committed as `certified_commits` is either empty or all of the certified commits are already committed. + #[tracing::instrument(skip_all)] + fn try_decide_certified( + &mut self, + certified_commits: &mut Vec, + limit: usize, + ) -> Vec<(DecidedLeader, CertifiedCommit)> { + // If GC is disabled then should not run any of this logic. + if !self.dag_state.read().gc_enabled() { + return Vec::new(); + } + + assert!(limit > 0, "limit should be greater than 0"); + + let to_commit = if certified_commits.len() >= limit { + // We keep only the number of leaders as dictated by the `limit` + certified_commits.drain(..limit).collect::>() + } else { + // Otherwise just take all of them and leave the `synced_commits` empty. + mem::take(certified_commits) + }; + + tracing::info!( + "Decided {} certified leaders: {}", + to_commit.len(), + to_commit.iter().map(|c| c.leader().to_string()).join(",") + ); + + let sequenced_leaders = to_commit + .into_iter() + .map(|commit| { + let leader = commit.blocks().last().expect("Certified commit should have at least one block"); + assert_eq!(leader.reference(), commit.leader(), "Last block of the committed sub dag should have the same digest as the leader of the commit"); + let leader = DecidedLeader::Commit(leader.clone()); + UniversalCommitter::update_metrics(&self.context, &leader, Decision::Certified); + (leader, commit) + }) + .collect::>(); + + sequenced_leaders + } + /// Retrieves the next ancestors to propose to form a block at `clock_round` round. /// If smart selection is enabled then this will try to select the best ancestors /// based on the propagation scores of the authorities. @@ -1152,7 +1337,7 @@ pub(crate) fn create_cores(context: Context, authorities: Vec) -> Vec, own_index: AuthorityIndex) -> Self { + fn new( + context: Context, + authorities: Vec, + own_index: AuthorityIndex, + sync_last_known_own_block: bool, + ) -> Self { let (committee, mut signers) = local_committee_and_keys(0, authorities.clone()); let mut context = context.clone(); context = context @@ -1220,7 +1410,7 @@ impl CoreTextFixture { signals, block_signer, dag_state, - false, + sync_last_known_own_block, ); Self { @@ -1247,7 +1437,7 @@ mod test { use crate::{ block::{genesis_blocks, TestBlock}, block_verifier::NoopBlockVerifier, - commit::CommitAPI as _, + commit::CommitAPI, leader_scoring::ReputationScores, storage::{mem_store::MemStore, Store, WriteBatch}, test_dag_builder::DagBuilder, @@ -1484,7 +1674,7 @@ mod test { } // Run commit rule. - core.try_commit().ok(); + core.try_commit(vec![]).ok(); let last_commit = store .read_last_commit() .unwrap() @@ -2863,6 +3053,318 @@ mod test { } } + #[tokio::test] + async fn test_validate_certified_commits() { + telemetry_subscribers::init_for_testing(); + + let (context, _key_pairs) = Context::new_for_test(4); + let context = context.with_parameters(Parameters { + sync_last_known_own_block_timeout: Duration::from_millis(2_000), + ..Default::default() + }); + + let authority_index = AuthorityIndex::new_for_test(0); + let core = CoreTextFixture::new(context, vec![1, 1, 1, 1], authority_index, true); + let mut core = core.core; + + // No new block should have been produced + assert_eq!( + core.last_proposed_round(), + GENESIS_ROUND, + "No block should have been created other than genesis" + ); + + // create a DAG of 12 rounds + let mut dag_builder = DagBuilder::new(core.context.clone()); + dag_builder.layers(1..=12).build(); + + // Store all blocks up to round 6 which should be enough to decide up to leader 4 + dag_builder.print(); + let blocks = dag_builder.blocks(1..=6); + + for block in blocks { + core.dag_state.write().accept_block(block); + } + + // Get all the committed sub dags up to round 10 + let sub_dags_and_commits = dag_builder.get_sub_dag_and_certified_commits(1..=10); + + // Now try to commit up to the latest leader (round = 4). Do not provide any certified commits. + let committed_sub_dags = core.try_commit(vec![]).unwrap(); + + // We should have committed up to round 4 + assert_eq!(committed_sub_dags.len(), 4); + + // Now validate the certified commits. We'll try 3 different scenarios: + println!("Case 1. Provide certified commits that are all before the last committed round."); + + // Highest certified commit should be for leader of round 4. + let certified_commits = sub_dags_and_commits + .iter() + .take(4) + .map(|(_, c)| c) + .cloned() + .collect::>(); + assert!( + certified_commits.last().unwrap().index() + <= committed_sub_dags.last().unwrap().commit_ref.index, + "Highest certified commit should older than the highest committed index." + ); + + let certified_commits = core.validate_certified_commits(certified_commits).unwrap(); + + // No commits should be processed + assert!(certified_commits.is_empty()); + + println!("Case 2. Provide certified commits that are all after the last committed round."); + + // Highest certified commit should be for leader of round 4. + let certified_commits = sub_dags_and_commits + .iter() + .take(5) + .map(|(_, c)| c.clone()) + .collect::>(); + + let certified_commits = core + .validate_certified_commits(certified_commits.clone()) + .unwrap(); + + // The certified commit of index 5 should be processed. + assert_eq!(certified_commits.len(), 1); + assert_eq!(certified_commits.first().unwrap().reference().index, 5); + + println!("Case 3. Provide certified commits where the first certified commit index is not the last_commited_index + 1."); + + // Highest certified commit should be for leader of round 4. + let certified_commits = sub_dags_and_commits + .iter() + .skip(5) + .take(1) + .map(|(_, c)| c.clone()) + .collect::>(); + + let err = core + .validate_certified_commits(certified_commits.clone()) + .unwrap_err(); + match err { + ConsensusError::UnexpectedCertifiedCommitIndex { + expected_commit_index: 5, + commit_index: 6, + } => (), + _ => panic!("Unexpected error: {:?}", err), + } + } + + #[tokio::test] + async fn test_add_certified_commits() { + telemetry_subscribers::init_for_testing(); + + let (context, _key_pairs) = Context::new_for_test(4); + let context = context.with_parameters(Parameters { + sync_last_known_own_block_timeout: Duration::from_millis(2_000), + ..Default::default() + }); + + let authority_index = AuthorityIndex::new_for_test(0); + let core = CoreTextFixture::new(context, vec![1, 1, 1, 1], authority_index, true); + let store = core.store.clone(); + let mut core = core.core; + + // No new block should have been produced + assert_eq!( + core.last_proposed_round(), + GENESIS_ROUND, + "No block should have been created other than genesis" + ); + + // create a DAG of 12 rounds + let mut dag_builder = DagBuilder::new(core.context.clone()); + dag_builder.layers(1..=12).build(); + + // Store all blocks up to round 6 which should be enough to decide up to leader 4 + dag_builder.print(); + let blocks = dag_builder.blocks(1..=6); + + for block in blocks { + core.dag_state.write().accept_block(block); + } + + // Get all the committed sub dags up to round 10 + let sub_dags_and_commits = dag_builder.get_sub_dag_and_certified_commits(1..=10); + + // Now try to commit up to the latest leader (round = 4). Do not provide any certified commits. + let committed_sub_dags = core.try_commit(vec![]).unwrap(); + + // We should have committed up to round 4 + assert_eq!(committed_sub_dags.len(), 4); + + let last_commit = store + .read_last_commit() + .unwrap() + .expect("Last commit should be set"); + assert_eq!(last_commit.reference().index, 4); + + println!("Case 1. Provide no certified commits. No commit should happen."); + + let last_commit = store + .read_last_commit() + .unwrap() + .expect("Last commit should be set"); + assert_eq!(last_commit.reference().index, 4); + + println!("Case 2. Provide certified commits that before and after the last committed round and also there are additional blocks so can run the direct decide rule as well."); + + // The commits of leader rounds 5-8 should be committed via the certified commits. + let certified_commits = sub_dags_and_commits + .iter() + .skip(3) + .take(5) + .map(|(_, c)| c.clone()) + .collect::>(); + + // Now only add the blocks of rounds 8..=12. The blocks up to round 7 should be accepted via the certified commits processing. + let blocks = dag_builder.blocks(8..=12); + for block in blocks { + core.dag_state.write().accept_block(block); + } + + // The corresponding blocks of the certified commits should be accepted and stored before linearizing and committing the DAG. + core.add_certified_commits(certified_commits.clone()) + .expect("Should not fail"); + + let commits = store.scan_commits((6..=10).into()).unwrap(); + + // We expect all the sub dags up to leader round 10 to be committed. + assert_eq!(commits.len(), 5); + + for i in 6..=10 { + let commit = &commits[i - 6]; + assert_eq!(commit.reference().index, i as u32); + } + } + + #[tokio::test] + async fn try_commit_with_certified_commits_gced_blocks() { + const GC_DEPTH: u32 = 3; + telemetry_subscribers::init_for_testing(); + + let (mut context, mut key_pairs) = Context::new_for_test(5); + context + .protocol_config + .set_consensus_gc_depth_for_testing(GC_DEPTH); + //context.protocol_config.set_narwhal_new_leader_election_schedule_for_testing(val); + let context = Arc::new(context.with_parameters(Parameters { + sync_last_known_own_block_timeout: Duration::from_millis(2_000), + ..Default::default() + })); + + let store = Arc::new(MemStore::new()); + let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone()))); + + let block_manager = BlockManager::new( + context.clone(), + dag_state.clone(), + Arc::new(NoopBlockVerifier), + ); + let leader_schedule = Arc::new( + LeaderSchedule::from_store(context.clone(), dag_state.clone()) + .with_num_commits_per_schedule(10), + ); + + let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone()); + let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone()); + let (signals, signal_receivers) = CoreSignals::new(context.clone()); + // Need at least one subscriber to the block broadcast channel. + let _block_receiver = signal_receivers.block_broadcast_receiver(); + + let (commit_consumer, _commit_receiver, _transaction_receiver) = CommitConsumer::new(0); + let commit_observer = CommitObserver::new( + context.clone(), + commit_consumer, + dag_state.clone(), + store.clone(), + leader_schedule.clone(), + ); + + let mut core = Core::new( + context.clone(), + leader_schedule, + transaction_consumer, + block_manager, + true, + commit_observer, + signals, + key_pairs.remove(context.own_index.value()).1, + dag_state.clone(), + true, + ); + + // No new block should have been produced + assert_eq!( + core.last_proposed_round(), + GENESIS_ROUND, + "No block should have been created other than genesis" + ); + + let dag_str = "DAG { + Round 0 : { 5 }, + Round 1 : { * }, + Round 2 : { + A -> [-E1], + B -> [-E1], + C -> [-E1], + D -> [-E1], + }, + Round 3 : { + A -> [*], + B -> [*], + C -> [*], + D -> [*], + }, + Round 4 : { + A -> [*], + B -> [*], + C -> [*], + D -> [*], + }, + Round 5 : { + A -> [*], + B -> [*], + C -> [*], + D -> [*], + E -> [A4, B4, C4, D4, E1] + }, + Round 6 : { * }, + Round 7 : { * }, + }"; + + let (_, mut dag_builder) = parse_dag(dag_str).expect("Invalid dag"); + dag_builder.print(); + + // Now get all the committed sub dags from the DagBuilder + let (_sub_dags, certified_commits): (Vec<_>, Vec<_>) = dag_builder + .get_sub_dag_and_certified_commits(1..=5) + .into_iter() + .unzip(); + + // Now try to commit up to the latest leader (round = 5) with the provided certified commits. Not that we have not accepted any + // blocks. That should happen during the commit process. + let committed_sub_dags = core.try_commit(certified_commits).unwrap(); + + // We should have committed up to round 4 + assert_eq!(committed_sub_dags.len(), 4); + for (index, committed_sub_dag) in committed_sub_dags.iter().enumerate() { + assert_eq!(committed_sub_dag.commit_ref.index as usize, index + 1); + + // ensure that block from E1 node has not been committed + for block in committed_sub_dag.blocks.iter() { + if block.round() == 1 && block.author() == AuthorityIndex::new_for_test(5) { + panic!("Did not expect to commit block E1"); + } + } + } + } + #[tokio::test(flavor = "current_thread", start_paused = true)] async fn test_commit_on_leader_schedule_change_boundary_without_multileader() { parameterized_test_commit_on_leader_schedule_change_boundary(Some(1)).await; @@ -3184,6 +3686,42 @@ mod test { assert_eq!(all_stored_commits.len(), 6); } + #[tokio::test] + async fn try_decide_certified() { + // GIVEN + telemetry_subscribers::init_for_testing(); + + let (context, _) = Context::new_for_test(4); + + let authority_index = AuthorityIndex::new_for_test(0); + let core = CoreTextFixture::new(context.clone(), vec![1, 1, 1, 1], authority_index, true); + let mut core = core.core; + + let mut dag_builder = DagBuilder::new(Arc::new(context.clone())); + dag_builder.layers(1..=12).build(); + + let limit = 2; + + let blocks = dag_builder.blocks(1..=12); + + for block in blocks { + core.dag_state.write().accept_block(block); + } + + // WHEN + let sub_dags_and_commits = dag_builder.get_sub_dag_and_certified_commits(1..=4); + let mut certified_commits = sub_dags_and_commits + .into_iter() + .map(|(_, commit)| commit) + .collect::>(); + + let leaders = core.try_decide_certified(&mut certified_commits, limit); + + // THEN + assert_eq!(leaders.len(), 2); + assert_eq!(certified_commits.len(), 2); + } + pub(crate) async fn receive(timeout: Duration, mut receiver: watch::Receiver) -> T { tokio::time::timeout(timeout, receiver.changed()) .await diff --git a/consensus/core/src/core_thread.rs b/consensus/core/src/core_thread.rs index 87d1ea5215d38..b10983806852d 100644 --- a/consensus/core/src/core_thread.rs +++ b/consensus/core/src/core_thread.rs @@ -22,6 +22,7 @@ use tracing::warn; use crate::{ block::{BlockRef, Round, VerifiedBlock}, + commit::CertifiedCommit, context::Context, core::Core, core_thread::CoreError::Shutdown, @@ -38,6 +39,8 @@ enum CoreThreadCommand { AddBlocks(Vec, oneshot::Sender>), /// Checks if block refs exist locally and sync missing ones. CheckBlockRefs(Vec, oneshot::Sender>), + /// Add committed sub dag blocks for processing and acceptance. + AddCertifiedCommits(Vec, oneshot::Sender>), /// Called when the min round has passed or the leader timeout occurred and a block should be produced. /// When the command is called with `force = true`, then the block will be created for `round` skipping /// any checks (ex leader existence of previous round). More information can be found on the `Core` component. @@ -64,6 +67,11 @@ pub trait CoreThreadDispatcher: Sync + Send + 'static { block_refs: Vec, ) -> Result, CoreError>; + async fn add_certified_commits( + &self, + commits: Vec, + ) -> Result, CoreError>; + async fn new_block(&self, round: Round, force: bool) -> Result<(), CoreError>; async fn get_missing_blocks(&self) -> Result, CoreError>; @@ -133,6 +141,11 @@ impl CoreThread { let missing_block_refs = self.core.check_block_refs(blocks)?; sender.send(missing_block_refs).ok(); } + CoreThreadCommand::AddCertifiedCommits(commits, sender) => { + let _scope = monitored_scope("CoreThread::loop::add_certified_commits"); + let missing_block_refs = self.core.add_certified_commits(commits)?; + sender.send(missing_block_refs).ok(); + } CoreThreadCommand::NewBlock(round, sender, force) => { let _scope = monitored_scope("CoreThread::loop::new_block"); self.core.new_block(round, force)?; @@ -278,6 +291,7 @@ impl ChannelCoreThreadDispatcher { } } +#[async_trait] #[async_trait] impl CoreThreadDispatcher for ChannelCoreThreadDispatcher { async fn add_blocks( @@ -310,6 +324,23 @@ impl CoreThreadDispatcher for ChannelCoreThreadDispatcher { Ok(missing_block_refs) } + async fn add_certified_commits( + &self, + commits: Vec, + ) -> Result, CoreError> { + for commit in &commits { + for block in commit.blocks() { + self.highest_received_rounds[block.author()] + .fetch_max(block.round(), Ordering::AcqRel); + } + } + let (sender, receiver) = oneshot::channel(); + self.send(CoreThreadCommand::AddCertifiedCommits(commits, sender)) + .await; + let missing_block_refs = receiver.await.map_err(|e| Shutdown(e.to_string()))?; + Ok(missing_block_refs) + } + async fn new_block(&self, round: Round, force: bool) -> Result<(), CoreError> { let (sender, receiver) = oneshot::channel(); self.send(CoreThreadCommand::NewBlock(round, sender, force)) @@ -397,6 +428,7 @@ impl MockCoreThreadDispatcher { #[cfg(test)] #[async_trait] +#[async_trait] impl CoreThreadDispatcher for MockCoreThreadDispatcher { async fn add_blocks( &self, @@ -414,6 +446,13 @@ impl CoreThreadDispatcher for MockCoreThreadDispatcher { Ok(BTreeSet::new()) } + async fn add_certified_commits( + &self, + _commits: Vec, + ) -> Result, CoreError> { + todo!() + } + async fn new_block(&self, _round: Round, _force: bool) -> Result<(), CoreError> { Ok(()) } diff --git a/consensus/core/src/dag_state.rs b/consensus/core/src/dag_state.rs index c57439857742b..19174bb39428c 100644 --- a/consensus/core/src/dag_state.rs +++ b/consensus/core/src/dag_state.rs @@ -891,10 +891,14 @@ impl DagState { /// There is no meaning accepting any blocks with round <= gc_round. The Garbage Collection (GC) round is calculated based on the latest /// committed leader round. When GC is disabled that will return the genesis round. pub(crate) fn gc_round(&self) -> Round { + self.calculate_gc_round(self.last_commit_round()) + } + + pub(crate) fn calculate_gc_round(&self, commit_round: Round) -> Round { let gc_depth = self.context.protocol_config.gc_depth(); if gc_depth > 0 { // GC is enabled, only then calculate the diff - self.last_commit_round().saturating_sub(gc_depth) + commit_round.saturating_sub(gc_depth) } else { // Otherwise just return genesis round. That also acts as a safety mechanism so we never attempt to truncate anything // even accidentally. diff --git a/consensus/core/src/error.rs b/consensus/core/src/error.rs index 4ec000d60152f..8b284f1c147f9 100644 --- a/consensus/core/src/error.rs +++ b/consensus/core/src/error.rs @@ -161,6 +161,14 @@ pub(crate) enum ConsensusError { received: BlockRef, }, + #[error( + "Unexpected certified commit index and last committed index. Expected next commit index to be {expected_commit_index}, but found {commit_index}" + )] + UnexpectedCertifiedCommitIndex { + expected_commit_index: CommitIndex, + commit_index: CommitIndex, + }, + #[error("RocksDB failure: {0}")] RocksDBFailure(#[from] TypedStoreError), diff --git a/consensus/core/src/leader_timeout.rs b/consensus/core/src/leader_timeout.rs index 9e4c8898d6657..adf5b1e637613 100644 --- a/consensus/core/src/leader_timeout.rs +++ b/consensus/core/src/leader_timeout.rs @@ -129,6 +129,7 @@ mod tests { use tokio::time::{sleep, Instant}; use crate::block::{BlockRef, Round, VerifiedBlock}; + use crate::commit::CertifiedCommit; use crate::context::Context; use crate::core::CoreSignals; use crate::core_thread::{CoreError, CoreThreadDispatcher}; @@ -164,6 +165,13 @@ mod tests { todo!() } + async fn add_certified_commits( + &self, + _commit: Vec, + ) -> Result, CoreError> { + todo!() + } + async fn new_block(&self, round: Round, force: bool) -> Result<(), CoreError> { self.new_block_calls .lock() diff --git a/consensus/core/src/round_prober.rs b/consensus/core/src/round_prober.rs index ee43f2462043f..32049223ecc84 100644 --- a/consensus/core/src/round_prober.rs +++ b/consensus/core/src/round_prober.rs @@ -361,7 +361,7 @@ mod test { use super::QuorumRound; use crate::{ block::BlockRef, - commit::CommitRange, + commit::{CertifiedCommit, CommitRange}, context::Context, core_thread::{CoreError, CoreThreadDispatcher}, dag_state::DagState, @@ -418,6 +418,13 @@ mod test { unimplemented!() } + async fn add_certified_commits( + &self, + _commit: Vec, + ) -> Result, CoreError> { + unimplemented!() + } + async fn new_block(&self, _round: Round, _force: bool) -> Result<(), CoreError> { unimplemented!() } diff --git a/consensus/core/src/test_dag_builder.rs b/consensus/core/src/test_dag_builder.rs index 11b6cb21383a6..2ed4c3c372bca 100644 --- a/consensus/core/src/test_dag_builder.rs +++ b/consensus/core/src/test_dag_builder.rs @@ -16,12 +16,12 @@ use crate::{ genesis_blocks, BlockAPI, BlockDigest, BlockRef, BlockTimestampMs, Round, Slot, TestBlock, VerifiedBlock, }, - commit::{CommitDigest, TrustedCommit, DEFAULT_WAVE_LENGTH}, + commit::{CertifiedCommit, CommitDigest, TrustedCommit, DEFAULT_WAVE_LENGTH}, context::Context, dag_state::DagState, leader_schedule::{LeaderSchedule, LeaderSwapTable}, linearizer::{BlockStoreAPI, Linearizer}, - CommittedSubDag, + CommitRef, CommittedSubDag, }; /// DagBuilder API @@ -141,15 +141,15 @@ impl DagBuilder { &mut self, leader_rounds: RangeInclusive, ) -> Vec<(CommittedSubDag, TrustedCommit)> { - let (last_leader_round, mut last_commit_index, mut last_timestamp_ms) = + let (last_leader_round, mut last_commit_ref, mut last_timestamp_ms) = if let Some((sub_dag, _)) = self.committed_sub_dags.last() { ( sub_dag.leader.round, - sub_dag.commit_ref.index, + sub_dag.commit_ref, sub_dag.timestamp_ms, ) } else { - (0, 0, 0) + (0, CommitRef::new(0, CommitDigest::MIN), 0) }; struct BlockStorage { @@ -218,7 +218,6 @@ impl DagBuilder { .saturating_sub(self.context.protocol_config.gc_depth()); let leader_block_ref = leader_block.reference(); - last_commit_index += 1; last_timestamp_ms = leader_block.timestamp_ms().max(last_timestamp_ms); let (to_commit, rejected_transactions) = Linearizer::linearize_sub_dag( @@ -235,8 +234,8 @@ impl DagBuilder { } let commit = TrustedCommit::new_for_test( - last_commit_index, - CommitDigest::MIN, + last_commit_ref.index + 1, + last_commit_ref.digest, last_timestamp_ms, leader_block_ref, to_commit @@ -245,6 +244,8 @@ impl DagBuilder { .collect::>(), ); + last_commit_ref = commit.reference(); + let sub_dag = CommittedSubDag::new( leader_block_ref, to_commit, @@ -264,6 +265,21 @@ impl DagBuilder { .collect() } + pub(crate) fn get_sub_dag_and_certified_commits( + &mut self, + leader_rounds: RangeInclusive, + ) -> Vec<(CommittedSubDag, CertifiedCommit)> { + let commits = self.get_sub_dag_and_commits(leader_rounds); + commits + .into_iter() + .map(|(sub_dag, commit)| { + let certified_commit = + CertifiedCommit::new_certified(commit, sub_dag.blocks.clone()); + (sub_dag, certified_commit) + }) + .collect() + } + pub(crate) fn leader_blocks( &self, rounds: RangeInclusive, diff --git a/consensus/core/src/universal_committer.rs b/consensus/core/src/universal_committer.rs index e78a0a8a915a0..672e5e991d6e1 100644 --- a/consensus/core/src/universal_committer.rs +++ b/consensus/core/src/universal_committer.rs @@ -67,6 +67,7 @@ impl UniversalCommitter { for committer in self.committers.iter().rev() { // Skip committers that don't have a leader for this round. let Some(slot) = committer.elect_leader(round) else { + tracing::debug!("No leader for round {round}, skipping"); continue; }; @@ -102,7 +103,7 @@ impl UniversalCommitter { let Some(decided_leader) = leader.into_decided_leader() else { break; }; - self.update_metrics(&decided_leader, decision); + Self::update_metrics(&self.context, &decided_leader, decision); decided_leaders.push(decided_leader); } tracing::debug!("Decided {decided_leaders:?}"); @@ -120,22 +121,25 @@ impl UniversalCommitter { } /// Update metrics. - fn update_metrics(&self, decided_leader: &DecidedLeader, decision: Decision) { - let decision_str = if decision == Decision::Direct { - "direct" - } else { - "indirect" + pub(crate) fn update_metrics( + context: &Context, + decided_leader: &DecidedLeader, + decision: Decision, + ) { + let decision_str = match decision { + Decision::Direct => "direct", + Decision::Indirect => "indirect", + Decision::Certified => "certified", }; let status = match decided_leader { DecidedLeader::Commit(..) => format!("{decision_str}-commit"), DecidedLeader::Skip(..) => format!("{decision_str}-skip"), }; - let leader_host = &self - .context + let leader_host = &context .committee .authority(decided_leader.slot().authority) .hostname; - self.context + context .metrics .node_metrics .committed_leaders_total From abc2e8a0553c772011cb4885d234a6be7ce1bcb2 Mon Sep 17 00:00:00 2001 From: Anastasios Kichidis Date: Wed, 19 Feb 2025 15:45:48 +0000 Subject: [PATCH 2/3] [chore] adding metric for the gced blocks. --- consensus/core/src/block_manager.rs | 13 +++++++++++++ consensus/core/src/metrics.rs | 7 +++++++ 2 files changed, 20 insertions(+) diff --git a/consensus/core/src/block_manager.rs b/consensus/core/src/block_manager.rs index 316b1e6094720..3389be7450486 100644 --- a/consensus/core/src/block_manager.rs +++ b/consensus/core/src/block_manager.rs @@ -589,6 +589,19 @@ impl BlockManager { blocks_gc_ed += 1; + let hostname = self + .context + .committee + .authority(block_ref.author) + .hostname + .as_str(); + self.context + .metrics + .node_metrics + .block_manager_gced_blocks + .with_label_values(&[hostname]) + .inc(); + assert!(!self.suspended_blocks.contains_key(block_ref), "Block should not be suspended, as we are causally GC'ing and no suspended block should exist for a missing ancestor."); // Also remove it from the missing list - we don't want to keep looking for it. diff --git a/consensus/core/src/metrics.rs b/consensus/core/src/metrics.rs index 453345d9bd5af..fd5681e01d2ae 100644 --- a/consensus/core/src/metrics.rs +++ b/consensus/core/src/metrics.rs @@ -165,6 +165,7 @@ pub(crate) struct NodeMetrics { pub(crate) block_manager_missing_blocks: IntGauge, pub(crate) block_manager_missing_blocks_by_authority: IntCounterVec, pub(crate) block_manager_missing_ancestors_by_authority: IntCounterVec, + pub(crate) block_manager_gced_blocks: IntCounterVec, pub(crate) block_manager_gc_unsuspended_blocks: IntCounterVec, pub(crate) block_manager_skipped_blocks: IntCounterVec, pub(crate) threshold_clock_round: IntGauge, @@ -590,6 +591,12 @@ impl NodeMetrics { &["authority"], registry, ).unwrap(), + block_manager_gced_blocks: register_int_counter_vec_with_registry!( + "block_manager_gced_blocks", + "The number of blocks that garbage collected and did not get accepted, counted by block's source authority", + &["authority"], + registry, + ).unwrap(), block_manager_gc_unsuspended_blocks: register_int_counter_vec_with_registry!( "block_manager_gc_unsuspended_blocks", "The number of blocks unsuspended because their missing ancestors are garbage collected by the block manager, counted by block's source authority", From 41bc2379a604ce6eea19caa07bc228087fd0e3dd Mon Sep 17 00:00:00 2001 From: Anastasios Kichidis Date: Wed, 19 Feb 2025 18:24:28 +0000 Subject: [PATCH 3/3] [review] address review comments. --- consensus/core/src/core_thread.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/consensus/core/src/core_thread.rs b/consensus/core/src/core_thread.rs index b10983806852d..8969ba867cd1e 100644 --- a/consensus/core/src/core_thread.rs +++ b/consensus/core/src/core_thread.rs @@ -291,7 +291,6 @@ impl ChannelCoreThreadDispatcher { } } -#[async_trait] #[async_trait] impl CoreThreadDispatcher for ChannelCoreThreadDispatcher { async fn add_blocks( @@ -428,7 +427,6 @@ impl MockCoreThreadDispatcher { #[cfg(test)] #[async_trait] -#[async_trait] impl CoreThreadDispatcher for MockCoreThreadDispatcher { async fn add_blocks( &self,