Skip to content

Commit

Permalink
[refactor] allow blocks synced via commit syncer to be accepted when …
Browse files Browse the repository at this point in the history
…above the last committed leader's gc.
  • Loading branch information
akichidis committed Feb 18, 2025
1 parent 79cfdbb commit 537f02b
Show file tree
Hide file tree
Showing 13 changed files with 919 additions and 85 deletions.
9 changes: 8 additions & 1 deletion consensus/core/src/authority_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -686,7 +686,7 @@ mod tests {
use crate::{
authority_service::AuthorityService,
block::{BlockAPI, BlockRef, SignedBlock, TestBlock, VerifiedBlock},
commit::CommitRange,
commit::{CertifiedCommit, CommitRange},
commit_vote_monitor::CommitVoteMonitor,
context::Context,
core_thread::{CoreError, CoreThreadDispatcher},
Expand Down Expand Up @@ -743,6 +743,13 @@ mod tests {
Ok(BTreeSet::new())
}

async fn add_certified_commits(
&self,
_commits: Vec<CertifiedCommit>,
) -> Result<BTreeSet<BlockRef>, CoreError> {
todo!()
}

async fn new_block(&self, _round: Round, _force: bool) -> Result<(), CoreError> {
Ok(())
}
Expand Down
1 change: 1 addition & 0 deletions consensus/core/src/base_committer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::{collections::HashMap, fmt::Display, sync::Arc};

use consensus_config::{AuthorityIndex, Stake};
use parking_lot::RwLock;
use tracing::warn;

use crate::{
block::{BlockAPI, BlockRef, Round, Slot, VerifiedBlock},
Expand Down
200 changes: 171 additions & 29 deletions consensus/core/src/block_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

use std::{
collections::{BTreeMap, BTreeSet},
iter,
sync::Arc,
time::Instant,
};
Expand Down Expand Up @@ -84,11 +83,46 @@ impl BlockManager {
/// Tries to accept the provided blocks assuming that all their causal history exists. The method
/// returns all the blocks that have been successfully processed in round ascending order, that includes also previously
/// suspended blocks that have now been able to get accepted. Method also returns a set with the missing ancestor blocks.
#[tracing::instrument(skip_all)]
pub(crate) fn try_accept_blocks(
&mut self,
mut blocks: Vec<VerifiedBlock>,
blocks: Vec<VerifiedBlock>,
) -> (Vec<VerifiedBlock>, BTreeSet<BlockRef>) {
let _s = monitored_scope("BlockManager::try_accept_blocks");
self.try_accept_blocks_internal(blocks, false)
}

// Tries to accept blocks that have been committed. Returns all the blocks that have been accepted, both from the ones
// provided and any children blocks.
#[tracing::instrument(skip_all)]
pub(crate) fn try_accept_committed_blocks(
&mut self,
blocks: Vec<VerifiedBlock>,
) -> Vec<VerifiedBlock> {
// If GC is disabled then should not run any of this logic.
if !self.dag_state.read().gc_enabled() {
return Vec::new();
}

// Just accept the blocks
let _s = monitored_scope("BlockManager::try_accept_committed_blocks");
let (accepted_blocks, missing_blocks) = self.try_accept_blocks_internal(blocks, true);
assert!(
missing_blocks.is_empty(),
"No missing blocks should be returned for committed blocks"
);

accepted_blocks
}

/// Attempts to accept the provided blocks. When `committed = true` then the blocks are considered to be committed via certified commits and
/// are handled differently.
fn try_accept_blocks_internal(
&mut self,
mut blocks: Vec<VerifiedBlock>,
committed: bool,
) -> (Vec<VerifiedBlock>, BTreeSet<BlockRef>) {
let _s = monitored_scope("BlockManager::try_accept_blocks_internal");

blocks.sort_by_key(|b| b.round());
debug!(
Expand All @@ -104,46 +138,82 @@ impl BlockManager {

// Try to accept the input block.
let block_ref = block.reference();
let block = match self.try_accept_one_block(block) {
TryAcceptResult::Accepted(block) => block,
TryAcceptResult::Suspended(ancestors_to_fetch) => {
debug!(
"Missing ancestors to fetch for block {block_ref}: {}",
ancestors_to_fetch.iter().map(|b| b.to_string()).join(",")
);
missing_blocks.extend(ancestors_to_fetch);
continue;
}
TryAcceptResult::Processed | TryAcceptResult::Skipped => continue,

let mut to_verify_timestamps_and_accept = vec![];
if committed {
match self.try_accept_one_committed_block(block) {
TryAcceptResult::Accepted(block) => {
// As this is a committed block, then it's already accepted and there is no need to verify its timestamps.
// Just add it to the accepted blocks list.
accepted_blocks.push(block);
}
TryAcceptResult::Processed => continue,
TryAcceptResult::Suspended(_) | TryAcceptResult::Skipped => panic!(
"Did not expect to suspend or skip a committed block: {:?}",
block_ref
),
};
} else {
match self.try_accept_one_block(block) {
TryAcceptResult::Accepted(block) => {
to_verify_timestamps_and_accept.push(block);
}
TryAcceptResult::Suspended(ancestors_to_fetch) => {
debug!(
"Missing ancestors to fetch for block {block_ref}: {}",
ancestors_to_fetch.iter().map(|b| b.to_string()).join(",")
);
missing_blocks.extend(ancestors_to_fetch);
continue;
}
TryAcceptResult::Processed | TryAcceptResult::Skipped => continue,
};
};

// If the block is accepted, try to unsuspend its children blocks if any.
let unsuspended_blocks = self.try_unsuspend_children_blocks(block.reference());
let unsuspended_blocks = self.try_unsuspend_children_blocks(block_ref);
to_verify_timestamps_and_accept.extend(unsuspended_blocks);

// Verify block timestamps
let blocks_to_accept = self
.verify_block_timestamps_and_accept(iter::once(block).chain(unsuspended_blocks));
let blocks_to_accept =
self.verify_block_timestamps_and_accept(to_verify_timestamps_and_accept);
accepted_blocks.extend(blocks_to_accept);
}

let metrics = &self.context.metrics.node_metrics;
metrics
.missing_blocks_total
.inc_by(missing_blocks.len() as u64);
metrics
.block_manager_suspended_blocks
.set(self.suspended_blocks.len() as i64);
metrics
.block_manager_missing_ancestors
.set(self.missing_ancestors.len() as i64);
metrics
.block_manager_missing_blocks
.set(self.missing_blocks.len() as i64);
self.update_stats(missing_blocks.len() as u64);

// Figure out the new missing blocks
(accepted_blocks, missing_blocks)
}

fn try_accept_one_committed_block(&mut self, block: VerifiedBlock) -> TryAcceptResult {
if self.dag_state.read().contains_block(&block.reference()) {
return TryAcceptResult::Processed;
}

// Remove the block from missing and suspended blocks
self.missing_blocks.remove(&block.reference());

// If the block has been already fetched and parked as suspended block, then remove it. Also find all the references of missing
// ancestors to remove those as well. If we don't do that then it's possible once the missing ancestor is fetched to cause a panic
// when trying to unsuspend this children as it won't be found in the suspended blocks map.
if let Some(suspended_block) = self.suspended_blocks.remove(&block.reference()) {
suspended_block
.missing_ancestors
.iter()
.for_each(|ancestor| {
if let Some(references) = self.missing_ancestors.get_mut(ancestor) {
references.remove(&block.reference());
}
});
}

// Accept this block before any unsuspended children blocks
self.dag_state.write().accept_blocks(vec![block.clone()]);

TryAcceptResult::Accepted(block)
}

/// Tries to find the provided block_refs in DagState and BlockManager,
/// and returns missing block refs.
pub(crate) fn try_find_blocks(&mut self, block_refs: Vec<BlockRef>) -> BTreeSet<BlockRef> {
Expand Down Expand Up @@ -563,6 +633,20 @@ impl BlockManager {
self.missing_blocks.clone()
}

fn update_stats(&mut self, missing_blocks: u64) {
let metrics = &self.context.metrics.node_metrics;
metrics.missing_blocks_total.inc_by(missing_blocks);
metrics
.block_manager_suspended_blocks
.set(self.suspended_blocks.len() as i64);
metrics
.block_manager_missing_ancestors
.set(self.missing_ancestors.len() as i64);
metrics
.block_manager_missing_blocks
.set(self.missing_blocks.len() as i64);
}

fn update_block_received_metrics(&mut self, block: &VerifiedBlock) {
let (min_round, max_round) =
if let Some((curr_min, curr_max)) = self.received_block_rounds[block.author()] {
Expand Down Expand Up @@ -1047,6 +1131,64 @@ mod tests {
}
}

#[rstest]
#[tokio::test]
async fn try_accept_committed_blocks() {
// GIVEN
let (mut context, _key_pairs) = Context::new_for_test(4);
// We set the gc depth to 4
context
.protocol_config
.set_consensus_gc_depth_for_testing(4);
let context = Arc::new(context);
let store = Arc::new(MemStore::new());
let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));

// We "fake" the commit for round 6, so GC round moves to (commit_round - gc_depth = 6 - 4 = 2)
let last_commit = TrustedCommit::new_for_test(
10,
CommitDigest::MIN,
context.clock.timestamp_utc_ms(),
BlockRef::new(6, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
vec![],
);
dag_state.write().set_last_commit(last_commit);
assert_eq!(
dag_state.read().gc_round(),
2,
"GC round should have moved to round 2"
);

let mut block_manager =
BlockManager::new(context.clone(), dag_state, Arc::new(NoopBlockVerifier));

// create a DAG of 12 rounds
let mut dag_builder = DagBuilder::new(context.clone());
dag_builder.layers(1..=12).build();

// Now try to accept via the normal acceptance block path the blocks of rounds 7 ~ 12. None of them should be accepted
let blocks = dag_builder.blocks(7..=12);
let (accepted_blocks, missing) = block_manager.try_accept_blocks(blocks.clone());
assert!(accepted_blocks.is_empty());
assert_eq!(missing.len(), 4);

// Now try to accept via the committed blocks path the blocks of rounds 3 ~ 6. All of them should be accepted and also the blocks
// of rounds 7 ~ 12 should be unsuspended and accepted as well.
let blocks = dag_builder.blocks(3..=6);

// WHEN
let mut accepted_blocks = block_manager.try_accept_committed_blocks(blocks);

// THEN
accepted_blocks.sort_by_key(|b| b.reference());

let mut all_blocks = dag_builder.blocks(3..=12);
all_blocks.sort_by_key(|b| b.reference());

assert_eq!(accepted_blocks, all_blocks);
assert!(block_manager.is_empty());
}

struct TestBlockVerifier {
fail: BTreeSet<BlockRef>,
}
Expand Down
28 changes: 28 additions & 0 deletions consensus/core/src/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,33 @@ impl Deref for TrustedCommit {
}
}

#[derive(Clone, Debug)]
pub(crate) struct CertifiedCommit {
commit: Arc<TrustedCommit>,
blocks: Vec<VerifiedBlock>,
}

impl CertifiedCommit {
pub(crate) fn new_certified(commit: TrustedCommit, blocks: Vec<VerifiedBlock>) -> Self {
Self {
commit: Arc::new(commit),
blocks,
}
}

pub fn blocks(&self) -> &[VerifiedBlock] {
&self.blocks
}
}

impl Deref for CertifiedCommit {
type Target = TrustedCommit;

fn deref(&self) -> &Self::Target {
&self.commit
}
}

/// Digest of a consensus commit.
#[derive(Clone, Copy, Default, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
pub struct CommitDigest([u8; consensus_config::DIGEST_LENGTH]);
Expand Down Expand Up @@ -411,6 +438,7 @@ pub fn load_committed_subdag_from_store(
pub(crate) enum Decision {
Direct,
Indirect,
Certified, // This is a commit certified leader so no commit decision was made locally.
}

/// The status of a leader slot from the direct and indirect commit rules.
Expand Down
Loading

0 comments on commit 537f02b

Please sign in to comment.