Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Consensus] process sync committed sub dags #20956

Merged
merged 3 commits into from
Feb 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion consensus/core/src/authority_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -686,7 +686,7 @@ mod tests {
use crate::{
authority_service::AuthorityService,
block::{BlockAPI, BlockRef, SignedBlock, TestBlock, VerifiedBlock},
commit::CommitRange,
commit::{CertifiedCommit, CommitRange},
commit_vote_monitor::CommitVoteMonitor,
context::Context,
core_thread::{CoreError, CoreThreadDispatcher},
Expand Down Expand Up @@ -743,6 +743,13 @@ mod tests {
Ok(BTreeSet::new())
}

async fn add_certified_commits(
&self,
_commits: Vec<CertifiedCommit>,
) -> Result<BTreeSet<BlockRef>, CoreError> {
todo!()
}

async fn new_block(&self, _round: Round, _force: bool) -> Result<(), CoreError> {
Ok(())
}
Expand Down
1 change: 1 addition & 0 deletions consensus/core/src/base_committer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::{collections::HashMap, fmt::Display, sync::Arc};

use consensus_config::{AuthorityIndex, Stake};
use parking_lot::RwLock;
use tracing::warn;

use crate::{
block::{BlockAPI, BlockRef, Round, Slot, VerifiedBlock},
Expand Down
213 changes: 184 additions & 29 deletions consensus/core/src/block_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

use std::{
collections::{BTreeMap, BTreeSet},
iter,
sync::Arc,
time::Instant,
};
Expand Down Expand Up @@ -84,11 +83,46 @@ impl BlockManager {
/// Tries to accept the provided blocks assuming that all their causal history exists. The method
/// returns all the blocks that have been successfully processed in round ascending order, that includes also previously
/// suspended blocks that have now been able to get accepted. Method also returns a set with the missing ancestor blocks.
#[tracing::instrument(skip_all)]
pub(crate) fn try_accept_blocks(
&mut self,
mut blocks: Vec<VerifiedBlock>,
blocks: Vec<VerifiedBlock>,
) -> (Vec<VerifiedBlock>, BTreeSet<BlockRef>) {
let _s = monitored_scope("BlockManager::try_accept_blocks");
self.try_accept_blocks_internal(blocks, false)
}

// Tries to accept blocks that have been committed. Returns all the blocks that have been accepted, both from the ones
// provided and any children blocks.
#[tracing::instrument(skip_all)]
pub(crate) fn try_accept_committed_blocks(
&mut self,
blocks: Vec<VerifiedBlock>,
) -> Vec<VerifiedBlock> {
// If GC is disabled then should not run any of this logic.
if !self.dag_state.read().gc_enabled() {
return Vec::new();
}

// Just accept the blocks
let _s = monitored_scope("BlockManager::try_accept_committed_blocks");
let (accepted_blocks, missing_blocks) = self.try_accept_blocks_internal(blocks, true);
assert!(
missing_blocks.is_empty(),
"No missing blocks should be returned for committed blocks"
);

accepted_blocks
}

/// Attempts to accept the provided blocks. When `committed = true` then the blocks are considered to be committed via certified commits and
/// are handled differently.
fn try_accept_blocks_internal(
&mut self,
mut blocks: Vec<VerifiedBlock>,
committed: bool,
) -> (Vec<VerifiedBlock>, BTreeSet<BlockRef>) {
let _s = monitored_scope("BlockManager::try_accept_blocks_internal");

blocks.sort_by_key(|b| b.round());
debug!(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this moved somewhere?

Expand All @@ -104,46 +138,82 @@ impl BlockManager {

// Try to accept the input block.
let block_ref = block.reference();
let block = match self.try_accept_one_block(block) {
TryAcceptResult::Accepted(block) => block,
TryAcceptResult::Suspended(ancestors_to_fetch) => {
debug!(
"Missing ancestors to fetch for block {block_ref}: {}",
ancestors_to_fetch.iter().map(|b| b.to_string()).join(",")
);
missing_blocks.extend(ancestors_to_fetch);
continue;
}
TryAcceptResult::Processed | TryAcceptResult::Skipped => continue,

let mut to_verify_timestamps_and_accept = vec![];
if committed {
match self.try_accept_one_committed_block(block) {
TryAcceptResult::Accepted(block) => {
// As this is a committed block, then it's already accepted and there is no need to verify its timestamps.
// Just add it to the accepted blocks list.
accepted_blocks.push(block);
}
TryAcceptResult::Processed => continue,
TryAcceptResult::Suspended(_) | TryAcceptResult::Skipped => panic!(
"Did not expect to suspend or skip a committed block: {:?}",
block_ref
),
};
} else {
match self.try_accept_one_block(block) {
TryAcceptResult::Accepted(block) => {
to_verify_timestamps_and_accept.push(block);
}
TryAcceptResult::Suspended(ancestors_to_fetch) => {
debug!(
"Missing ancestors to fetch for block {block_ref}: {}",
ancestors_to_fetch.iter().map(|b| b.to_string()).join(",")
);
missing_blocks.extend(ancestors_to_fetch);
continue;
}
TryAcceptResult::Processed | TryAcceptResult::Skipped => continue,
};
};

// If the block is accepted, try to unsuspend its children blocks if any.
let unsuspended_blocks = self.try_unsuspend_children_blocks(block.reference());
let unsuspended_blocks = self.try_unsuspend_children_blocks(block_ref);
to_verify_timestamps_and_accept.extend(unsuspended_blocks);

// Verify block timestamps
let blocks_to_accept = self
.verify_block_timestamps_and_accept(iter::once(block).chain(unsuspended_blocks));
let blocks_to_accept =
self.verify_block_timestamps_and_accept(to_verify_timestamps_and_accept);
accepted_blocks.extend(blocks_to_accept);
}

let metrics = &self.context.metrics.node_metrics;
metrics
.missing_blocks_total
.inc_by(missing_blocks.len() as u64);
metrics
.block_manager_suspended_blocks
.set(self.suspended_blocks.len() as i64);
metrics
.block_manager_missing_ancestors
.set(self.missing_ancestors.len() as i64);
metrics
.block_manager_missing_blocks
.set(self.missing_blocks.len() as i64);
self.update_stats(missing_blocks.len() as u64);

// Figure out the new missing blocks
(accepted_blocks, missing_blocks)
}

fn try_accept_one_committed_block(&mut self, block: VerifiedBlock) -> TryAcceptResult {
if self.dag_state.read().contains_block(&block.reference()) {
return TryAcceptResult::Processed;
}

// Remove the block from missing and suspended blocks
self.missing_blocks.remove(&block.reference());

// If the block has been already fetched and parked as suspended block, then remove it. Also find all the references of missing
// ancestors to remove those as well. If we don't do that then it's possible once the missing ancestor is fetched to cause a panic
// when trying to unsuspend this children as it won't be found in the suspended blocks map.
if let Some(suspended_block) = self.suspended_blocks.remove(&block.reference()) {
suspended_block
.missing_ancestors
.iter()
.for_each(|ancestor| {
if let Some(references) = self.missing_ancestors.get_mut(ancestor) {
references.remove(&block.reference());
}
});
}

// Accept this block before any unsuspended children blocks
self.dag_state.write().accept_blocks(vec![block.clone()]);

TryAcceptResult::Accepted(block)
}

/// Tries to find the provided block_refs in DagState and BlockManager,
/// and returns missing block refs.
pub(crate) fn try_find_blocks(&mut self, block_refs: Vec<BlockRef>) -> BTreeSet<BlockRef> {
Expand Down Expand Up @@ -519,6 +589,19 @@ impl BlockManager {

blocks_gc_ed += 1;

let hostname = self
.context
.committee
.authority(block_ref.author)
.hostname
.as_str();
self.context
.metrics
.node_metrics
.block_manager_gced_blocks
.with_label_values(&[hostname])
.inc();

assert!(!self.suspended_blocks.contains_key(block_ref), "Block should not be suspended, as we are causally GC'ing and no suspended block should exist for a missing ancestor.");

// Also remove it from the missing list - we don't want to keep looking for it.
Expand Down Expand Up @@ -563,6 +646,20 @@ impl BlockManager {
self.missing_blocks.clone()
}

fn update_stats(&mut self, missing_blocks: u64) {
let metrics = &self.context.metrics.node_metrics;
metrics.missing_blocks_total.inc_by(missing_blocks);
metrics
.block_manager_suspended_blocks
.set(self.suspended_blocks.len() as i64);
metrics
.block_manager_missing_ancestors
.set(self.missing_ancestors.len() as i64);
metrics
.block_manager_missing_blocks
.set(self.missing_blocks.len() as i64);
}

fn update_block_received_metrics(&mut self, block: &VerifiedBlock) {
let (min_round, max_round) =
if let Some((curr_min, curr_max)) = self.received_block_rounds[block.author()] {
Expand Down Expand Up @@ -1047,6 +1144,64 @@ mod tests {
}
}

#[rstest]
#[tokio::test]
async fn try_accept_committed_blocks() {
// GIVEN
let (mut context, _key_pairs) = Context::new_for_test(4);
// We set the gc depth to 4
context
.protocol_config
.set_consensus_gc_depth_for_testing(4);
let context = Arc::new(context);
let store = Arc::new(MemStore::new());
let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));

// We "fake" the commit for round 6, so GC round moves to (commit_round - gc_depth = 6 - 4 = 2)
let last_commit = TrustedCommit::new_for_test(
10,
CommitDigest::MIN,
context.clock.timestamp_utc_ms(),
BlockRef::new(6, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
vec![],
);
dag_state.write().set_last_commit(last_commit);
assert_eq!(
dag_state.read().gc_round(),
2,
"GC round should have moved to round 2"
);

let mut block_manager =
BlockManager::new(context.clone(), dag_state, Arc::new(NoopBlockVerifier));

// create a DAG of 12 rounds
let mut dag_builder = DagBuilder::new(context.clone());
dag_builder.layers(1..=12).build();

// Now try to accept via the normal acceptance block path the blocks of rounds 7 ~ 12. None of them should be accepted
let blocks = dag_builder.blocks(7..=12);
let (accepted_blocks, missing) = block_manager.try_accept_blocks(blocks.clone());
assert!(accepted_blocks.is_empty());
assert_eq!(missing.len(), 4);

// Now try to accept via the committed blocks path the blocks of rounds 3 ~ 6. All of them should be accepted and also the blocks
// of rounds 7 ~ 12 should be unsuspended and accepted as well.
let blocks = dag_builder.blocks(3..=6);

// WHEN
let mut accepted_blocks = block_manager.try_accept_committed_blocks(blocks);

// THEN
accepted_blocks.sort_by_key(|b| b.reference());

let mut all_blocks = dag_builder.blocks(3..=12);
all_blocks.sort_by_key(|b| b.reference());

assert_eq!(accepted_blocks, all_blocks);
assert!(block_manager.is_empty());
}

struct TestBlockVerifier {
fail: BTreeSet<BlockRef>,
}
Expand Down
28 changes: 28 additions & 0 deletions consensus/core/src/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,33 @@ impl Deref for TrustedCommit {
}
}

#[derive(Clone, Debug)]
pub(crate) struct CertifiedCommit {
commit: Arc<TrustedCommit>,
blocks: Vec<VerifiedBlock>,
}

impl CertifiedCommit {
pub(crate) fn new_certified(commit: TrustedCommit, blocks: Vec<VerifiedBlock>) -> Self {
Self {
commit: Arc::new(commit),
blocks,
}
}

pub fn blocks(&self) -> &[VerifiedBlock] {
&self.blocks
}
}

impl Deref for CertifiedCommit {
type Target = TrustedCommit;

fn deref(&self) -> &Self::Target {
&self.commit
}
}

/// Digest of a consensus commit.
#[derive(Clone, Copy, Default, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
pub struct CommitDigest([u8; consensus_config::DIGEST_LENGTH]);
Expand Down Expand Up @@ -411,6 +438,7 @@ pub fn load_committed_subdag_from_store(
pub(crate) enum Decision {
Direct,
Indirect,
Certified, // This is a commit certified leader so no commit decision was made locally.
}

/// The status of a leader slot from the direct and indirect commit rules.
Expand Down
Loading
Loading