Skip to content

Commit

Permalink
[Consensus] process sync committed sub dags (#20956)
Browse files Browse the repository at this point in the history
## Description 

The PR is changing the way we are processing the synchronized committed
sub dags when GC is enabled. This was a necessity given that it's
possible during GC for committed sub dags to drop dependencies that are
necessary to accept vote and decision round blocks, effectively leading
us to having to fetch the missing blocks via the synchronizer. By
empirical examination there is a correlation between the missing blocks
and the gc depth - the lower the gc depth the higher the number of
missing blocks that need to be fetched.

With this new approach we attempt to commit the synchronized committed
sub dag without waiting for the commit rule to run (hence wait to have
accepted blocks to form the voting & decision rounds). Attention needs
to be given that to avoid edge cases we need to make sure that the
synchronized sub dags needs to be processed first on their own , and
then run on a next iteration the commit rule for any other leaders. That
guarantees that any accepted blocks due to commit sync will not run into
"missing ancestor" issues.

## Test plan 

CI/PT

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] gRPC:
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
  • Loading branch information
akichidis authored Feb 21, 2025
1 parent 4f0b3aa commit 5c1fdc3
Show file tree
Hide file tree
Showing 14 changed files with 937 additions and 85 deletions.
9 changes: 8 additions & 1 deletion consensus/core/src/authority_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -686,7 +686,7 @@ mod tests {
use crate::{
authority_service::AuthorityService,
block::{BlockAPI, BlockRef, SignedBlock, TestBlock, VerifiedBlock},
commit::CommitRange,
commit::{CertifiedCommit, CommitRange},
commit_vote_monitor::CommitVoteMonitor,
context::Context,
core_thread::{CoreError, CoreThreadDispatcher},
Expand Down Expand Up @@ -743,6 +743,13 @@ mod tests {
Ok(BTreeSet::new())
}

async fn add_certified_commits(
&self,
_commits: Vec<CertifiedCommit>,
) -> Result<BTreeSet<BlockRef>, CoreError> {
todo!()
}

async fn new_block(&self, _round: Round, _force: bool) -> Result<(), CoreError> {
Ok(())
}
Expand Down
1 change: 1 addition & 0 deletions consensus/core/src/base_committer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::{collections::HashMap, fmt::Display, sync::Arc};

use consensus_config::{AuthorityIndex, Stake};
use parking_lot::RwLock;
use tracing::warn;

use crate::{
block::{BlockAPI, BlockRef, Round, Slot, VerifiedBlock},
Expand Down
213 changes: 184 additions & 29 deletions consensus/core/src/block_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

use std::{
collections::{BTreeMap, BTreeSet},
iter,
sync::Arc,
time::Instant,
};
Expand Down Expand Up @@ -84,11 +83,46 @@ impl BlockManager {
/// Tries to accept the provided blocks assuming that all their causal history exists. The method
/// returns all the blocks that have been successfully processed in round ascending order, that includes also previously
/// suspended blocks that have now been able to get accepted. Method also returns a set with the missing ancestor blocks.
#[tracing::instrument(skip_all)]
pub(crate) fn try_accept_blocks(
&mut self,
mut blocks: Vec<VerifiedBlock>,
blocks: Vec<VerifiedBlock>,
) -> (Vec<VerifiedBlock>, BTreeSet<BlockRef>) {
let _s = monitored_scope("BlockManager::try_accept_blocks");
self.try_accept_blocks_internal(blocks, false)
}

// Tries to accept blocks that have been committed. Returns all the blocks that have been accepted, both from the ones
// provided and any children blocks.
#[tracing::instrument(skip_all)]
pub(crate) fn try_accept_committed_blocks(
&mut self,
blocks: Vec<VerifiedBlock>,
) -> Vec<VerifiedBlock> {
// If GC is disabled then should not run any of this logic.
if !self.dag_state.read().gc_enabled() {
return Vec::new();
}

// Just accept the blocks
let _s = monitored_scope("BlockManager::try_accept_committed_blocks");
let (accepted_blocks, missing_blocks) = self.try_accept_blocks_internal(blocks, true);
assert!(
missing_blocks.is_empty(),
"No missing blocks should be returned for committed blocks"
);

accepted_blocks
}

/// Attempts to accept the provided blocks. When `committed = true` then the blocks are considered to be committed via certified commits and
/// are handled differently.
fn try_accept_blocks_internal(
&mut self,
mut blocks: Vec<VerifiedBlock>,
committed: bool,
) -> (Vec<VerifiedBlock>, BTreeSet<BlockRef>) {
let _s = monitored_scope("BlockManager::try_accept_blocks_internal");

blocks.sort_by_key(|b| b.round());
debug!(
Expand All @@ -104,46 +138,82 @@ impl BlockManager {

// Try to accept the input block.
let block_ref = block.reference();
let block = match self.try_accept_one_block(block) {
TryAcceptResult::Accepted(block) => block,
TryAcceptResult::Suspended(ancestors_to_fetch) => {
debug!(
"Missing ancestors to fetch for block {block_ref}: {}",
ancestors_to_fetch.iter().map(|b| b.to_string()).join(",")
);
missing_blocks.extend(ancestors_to_fetch);
continue;
}
TryAcceptResult::Processed | TryAcceptResult::Skipped => continue,

let mut to_verify_timestamps_and_accept = vec![];
if committed {
match self.try_accept_one_committed_block(block) {
TryAcceptResult::Accepted(block) => {
// As this is a committed block, then it's already accepted and there is no need to verify its timestamps.
// Just add it to the accepted blocks list.
accepted_blocks.push(block);
}
TryAcceptResult::Processed => continue,
TryAcceptResult::Suspended(_) | TryAcceptResult::Skipped => panic!(
"Did not expect to suspend or skip a committed block: {:?}",
block_ref
),
};
} else {
match self.try_accept_one_block(block) {
TryAcceptResult::Accepted(block) => {
to_verify_timestamps_and_accept.push(block);
}
TryAcceptResult::Suspended(ancestors_to_fetch) => {
debug!(
"Missing ancestors to fetch for block {block_ref}: {}",
ancestors_to_fetch.iter().map(|b| b.to_string()).join(",")
);
missing_blocks.extend(ancestors_to_fetch);
continue;
}
TryAcceptResult::Processed | TryAcceptResult::Skipped => continue,
};
};

// If the block is accepted, try to unsuspend its children blocks if any.
let unsuspended_blocks = self.try_unsuspend_children_blocks(block.reference());
let unsuspended_blocks = self.try_unsuspend_children_blocks(block_ref);
to_verify_timestamps_and_accept.extend(unsuspended_blocks);

// Verify block timestamps
let blocks_to_accept = self
.verify_block_timestamps_and_accept(iter::once(block).chain(unsuspended_blocks));
let blocks_to_accept =
self.verify_block_timestamps_and_accept(to_verify_timestamps_and_accept);
accepted_blocks.extend(blocks_to_accept);
}

let metrics = &self.context.metrics.node_metrics;
metrics
.missing_blocks_total
.inc_by(missing_blocks.len() as u64);
metrics
.block_manager_suspended_blocks
.set(self.suspended_blocks.len() as i64);
metrics
.block_manager_missing_ancestors
.set(self.missing_ancestors.len() as i64);
metrics
.block_manager_missing_blocks
.set(self.missing_blocks.len() as i64);
self.update_stats(missing_blocks.len() as u64);

// Figure out the new missing blocks
(accepted_blocks, missing_blocks)
}

fn try_accept_one_committed_block(&mut self, block: VerifiedBlock) -> TryAcceptResult {
if self.dag_state.read().contains_block(&block.reference()) {
return TryAcceptResult::Processed;
}

// Remove the block from missing and suspended blocks
self.missing_blocks.remove(&block.reference());

// If the block has been already fetched and parked as suspended block, then remove it. Also find all the references of missing
// ancestors to remove those as well. If we don't do that then it's possible once the missing ancestor is fetched to cause a panic
// when trying to unsuspend this children as it won't be found in the suspended blocks map.
if let Some(suspended_block) = self.suspended_blocks.remove(&block.reference()) {
suspended_block
.missing_ancestors
.iter()
.for_each(|ancestor| {
if let Some(references) = self.missing_ancestors.get_mut(ancestor) {
references.remove(&block.reference());
}
});
}

// Accept this block before any unsuspended children blocks
self.dag_state.write().accept_blocks(vec![block.clone()]);

TryAcceptResult::Accepted(block)
}

/// Tries to find the provided block_refs in DagState and BlockManager,
/// and returns missing block refs.
pub(crate) fn try_find_blocks(&mut self, block_refs: Vec<BlockRef>) -> BTreeSet<BlockRef> {
Expand Down Expand Up @@ -519,6 +589,19 @@ impl BlockManager {

blocks_gc_ed += 1;

let hostname = self
.context
.committee
.authority(block_ref.author)
.hostname
.as_str();
self.context
.metrics
.node_metrics
.block_manager_gced_blocks
.with_label_values(&[hostname])
.inc();

assert!(!self.suspended_blocks.contains_key(block_ref), "Block should not be suspended, as we are causally GC'ing and no suspended block should exist for a missing ancestor.");

// Also remove it from the missing list - we don't want to keep looking for it.
Expand Down Expand Up @@ -563,6 +646,20 @@ impl BlockManager {
self.missing_blocks.clone()
}

fn update_stats(&mut self, missing_blocks: u64) {
let metrics = &self.context.metrics.node_metrics;
metrics.missing_blocks_total.inc_by(missing_blocks);
metrics
.block_manager_suspended_blocks
.set(self.suspended_blocks.len() as i64);
metrics
.block_manager_missing_ancestors
.set(self.missing_ancestors.len() as i64);
metrics
.block_manager_missing_blocks
.set(self.missing_blocks.len() as i64);
}

fn update_block_received_metrics(&mut self, block: &VerifiedBlock) {
let (min_round, max_round) =
if let Some((curr_min, curr_max)) = self.received_block_rounds[block.author()] {
Expand Down Expand Up @@ -1047,6 +1144,64 @@ mod tests {
}
}

#[rstest]
#[tokio::test]
async fn try_accept_committed_blocks() {
// GIVEN
let (mut context, _key_pairs) = Context::new_for_test(4);
// We set the gc depth to 4
context
.protocol_config
.set_consensus_gc_depth_for_testing(4);
let context = Arc::new(context);
let store = Arc::new(MemStore::new());
let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));

// We "fake" the commit for round 6, so GC round moves to (commit_round - gc_depth = 6 - 4 = 2)
let last_commit = TrustedCommit::new_for_test(
10,
CommitDigest::MIN,
context.clock.timestamp_utc_ms(),
BlockRef::new(6, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
vec![],
);
dag_state.write().set_last_commit(last_commit);
assert_eq!(
dag_state.read().gc_round(),
2,
"GC round should have moved to round 2"
);

let mut block_manager =
BlockManager::new(context.clone(), dag_state, Arc::new(NoopBlockVerifier));

// create a DAG of 12 rounds
let mut dag_builder = DagBuilder::new(context.clone());
dag_builder.layers(1..=12).build();

// Now try to accept via the normal acceptance block path the blocks of rounds 7 ~ 12. None of them should be accepted
let blocks = dag_builder.blocks(7..=12);
let (accepted_blocks, missing) = block_manager.try_accept_blocks(blocks.clone());
assert!(accepted_blocks.is_empty());
assert_eq!(missing.len(), 4);

// Now try to accept via the committed blocks path the blocks of rounds 3 ~ 6. All of them should be accepted and also the blocks
// of rounds 7 ~ 12 should be unsuspended and accepted as well.
let blocks = dag_builder.blocks(3..=6);

// WHEN
let mut accepted_blocks = block_manager.try_accept_committed_blocks(blocks);

// THEN
accepted_blocks.sort_by_key(|b| b.reference());

let mut all_blocks = dag_builder.blocks(3..=12);
all_blocks.sort_by_key(|b| b.reference());

assert_eq!(accepted_blocks, all_blocks);
assert!(block_manager.is_empty());
}

struct TestBlockVerifier {
fail: BTreeSet<BlockRef>,
}
Expand Down
28 changes: 28 additions & 0 deletions consensus/core/src/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,33 @@ impl Deref for TrustedCommit {
}
}

#[derive(Clone, Debug)]
pub(crate) struct CertifiedCommit {
commit: Arc<TrustedCommit>,
blocks: Vec<VerifiedBlock>,
}

impl CertifiedCommit {
pub(crate) fn new_certified(commit: TrustedCommit, blocks: Vec<VerifiedBlock>) -> Self {
Self {
commit: Arc::new(commit),
blocks,
}
}

pub fn blocks(&self) -> &[VerifiedBlock] {
&self.blocks
}
}

impl Deref for CertifiedCommit {
type Target = TrustedCommit;

fn deref(&self) -> &Self::Target {
&self.commit
}
}

/// Digest of a consensus commit.
#[derive(Clone, Copy, Default, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
pub struct CommitDigest([u8; consensus_config::DIGEST_LENGTH]);
Expand Down Expand Up @@ -411,6 +438,7 @@ pub fn load_committed_subdag_from_store(
pub(crate) enum Decision {
Direct,
Indirect,
Certified, // This is a commit certified leader so no commit decision was made locally.
}

/// The status of a leader slot from the direct and indirect commit rules.
Expand Down
Loading

0 comments on commit 5c1fdc3

Please sign in to comment.