Skip to content

Commit

Permalink
[review] address review comments. Refactor block_manager to merge the…
Browse files Browse the repository at this point in the history
… processing of the committed blocks with the normal accept blocks path.
  • Loading branch information
akichidis committed Feb 12, 2025
1 parent ab44e74 commit 64eaa9c
Showing 1 changed file with 98 additions and 82 deletions.
180 changes: 98 additions & 82 deletions consensus/core/src/block_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

use std::{
collections::{BTreeMap, BTreeSet},
iter,
sync::Arc,
time::Instant,
};
Expand Down Expand Up @@ -84,11 +83,46 @@ impl BlockManager {
/// Tries to accept the provided blocks assuming that all their causal history exists. The method
/// returns all the blocks that have been successfully processed in round ascending order, that includes also previously
/// suspended blocks that have now been able to get accepted. Method also returns a set with the missing ancestor blocks.
#[tracing::instrument(skip_all)]
pub(crate) fn try_accept_blocks(
&mut self,
mut blocks: Vec<VerifiedBlock>,
blocks: Vec<VerifiedBlock>,
) -> (Vec<VerifiedBlock>, BTreeSet<BlockRef>) {
let _s = monitored_scope("BlockManager::try_accept_blocks");
self.try_accept_blocks_internal(blocks, false)
}

// Tries to accept blocks that have been committed. Returns all the blocks that have been accepted, both from the ones
// provided and any children blocks.
#[tracing::instrument(skip_all)]
pub(crate) fn try_accept_committed_blocks(
&mut self,
blocks: Vec<VerifiedBlock>,
) -> Vec<VerifiedBlock> {
// If GC is disabled then should not run any of this logic.
if !self.dag_state.read().gc_enabled() {
return Vec::new();
}

// Just accept the blocks
let _s = monitored_scope("BlockManager::try_accept_committed_blocks");
let (accepted_blocks, missing_blocks) = self.try_accept_blocks_internal(blocks, true);
assert!(
missing_blocks.is_empty(),
"No missing blocks should be returned for committed blocks"
);

accepted_blocks
}

/// Attempts to accept the provided blocks. When `committed = true` then the blocks are considered to be committed via certified commits and
/// are handled differently.
fn try_accept_blocks_internal(
&mut self,
mut blocks: Vec<VerifiedBlock>,
committed: bool,
) -> (Vec<VerifiedBlock>, BTreeSet<BlockRef>) {
let _s = monitored_scope("BlockManager::try_accept_blocks_internal");

blocks.sort_by_key(|b| b.round());
debug!(
Expand All @@ -104,25 +138,45 @@ impl BlockManager {

// Try to accept the input block.
let block_ref = block.reference();
let block = match self.try_accept_one_block(block) {
TryAcceptResult::Accepted(block) => block,
TryAcceptResult::Suspended(ancestors_to_fetch) => {
debug!(
"Missing ancestors to fetch for block {block_ref}: {}",
ancestors_to_fetch.iter().map(|b| b.to_string()).join(",")
);
missing_blocks.extend(ancestors_to_fetch);
continue;
}
TryAcceptResult::Processed | TryAcceptResult::Skipped => continue,

let mut to_verify_timestamps_and_accept = vec![];
if committed {
match self.try_accept_one_committed_block(block) {
TryAcceptResult::Accepted(block) => {
// As this is a committed block, then it's already accepted and there is no need to verify its timestamps.
// Just add it to the accepted blocks list.
accepted_blocks.push(block);
}
TryAcceptResult::Processed => continue,
TryAcceptResult::Suspended(_) | TryAcceptResult::Skipped => panic!(
"Did not expect to suspend or skip a committed block: {:?}",
block_ref
),
};
} else {
match self.try_accept_one_block(block) {
TryAcceptResult::Accepted(block) => {
to_verify_timestamps_and_accept.push(block);
}
TryAcceptResult::Suspended(ancestors_to_fetch) => {
debug!(
"Missing ancestors to fetch for block {block_ref}: {}",
ancestors_to_fetch.iter().map(|b| b.to_string()).join(",")
);
missing_blocks.extend(ancestors_to_fetch);
continue;
}
TryAcceptResult::Processed | TryAcceptResult::Skipped => continue,
};
};

// If the block is accepted, try to unsuspend its children blocks if any.
let unsuspended_blocks = self.try_unsuspend_children_blocks(block.reference());
let unsuspended_blocks = self.try_unsuspend_children_blocks(block_ref);
to_verify_timestamps_and_accept.extend(unsuspended_blocks);

// Verify block timestamps
let blocks_to_accept = self
.verify_block_timestamps_and_accept(iter::once(block).chain(unsuspended_blocks));
let blocks_to_accept =
self.verify_block_timestamps_and_accept(to_verify_timestamps_and_accept);
accepted_blocks.extend(blocks_to_accept);
}

Expand All @@ -132,6 +186,34 @@ impl BlockManager {
(accepted_blocks, missing_blocks)
}

fn try_accept_one_committed_block(&mut self, block: VerifiedBlock) -> TryAcceptResult {
if self.dag_state.read().contains_block(&block.reference()) {
return TryAcceptResult::Processed;
}

// Remove the block from missing and suspended blocks
self.missing_blocks.remove(&block.reference());

// If the block has been already fetched and parked as suspended block, then remove it. Also find all the references of missing
// ancestors to remove those as well. If we don't do that then it's possible once the missing ancestor is fetched to cause a panic
// when trying to unsuspend this children as it won't be found in the suspended blocks map.
if let Some(suspended_block) = self.suspended_blocks.remove(&block.reference()) {
suspended_block
.missing_ancestors
.iter()
.for_each(|ancestor| {
if let Some(references) = self.missing_ancestors.get_mut(ancestor) {
references.remove(&block.reference());
}
});
}

// Accept this block before any unsuspended children blocks
self.dag_state.write().accept_blocks(vec![block.clone()]);

TryAcceptResult::Accepted(block)
}

/// Tries to find the provided block_refs in DagState and BlockManager,
/// and returns missing block refs.
pub(crate) fn try_find_blocks(&mut self, mut block_refs: Vec<BlockRef>) -> BTreeSet<BlockRef> {
Expand Down Expand Up @@ -532,72 +614,6 @@ impl BlockManager {
);
}

// Tries to accept blocks that have been committed. Returns all the blocks that have been accepted, both from the ones
// provided and any children blocks.
#[tracing::instrument(skip_all)]
pub(crate) fn try_accept_committed_blocks(
&mut self,
mut blocks: Vec<VerifiedBlock>,
) -> Vec<VerifiedBlock> {
// If GC is disabled then should not run any of this logic.
if !self.dag_state.read().gc_enabled() {
return Vec::new();
}

// Just accept the blocks
let _s = monitored_scope("BlockManager::try_accept_committed_blocks");
let mut accepted_blocks = vec![];

// Keep only the blocks that have not been already accepted into the DAG.
blocks = self
.dag_state
.read()
.contains_blocks(blocks.iter().map(|b| b.reference()).collect::<Vec<_>>())
.into_iter()
.zip(blocks)
.filter_map(|(found, block)| (!found).then_some(block))
.collect::<Vec<_>>();

blocks.sort_by_key(|b| b.round());

for block in blocks {
self.update_block_received_metrics(&block);

// Remove the block from missing and suspended blocks
self.missing_blocks.remove(&block.reference());

// If the block has been already fetched and parked as suspended block, then remove it. Also find all the references of missing
// ancestors to remove those as well. If we don't do that then it's possible once the missing ancestor is fetched to cause a panic
// when trying to unsuspend this children as it won't be found in the suspended blocks map.
if let Some(suspended_block) = self.suspended_blocks.remove(&block.reference()) {
suspended_block
.missing_ancestors
.iter()
.for_each(|ancestor| {
if let Some(references) = self.missing_ancestors.get_mut(ancestor) {
references.remove(&block.reference());
}
});
}

// Accept this block before any unsuspended children blocks
self.dag_state.write().accept_blocks(vec![block.clone()]);

// Try to unsuspend its children blocks if any and accept them
let unsuspended_blocks = self.try_unsuspend_children_blocks(block.reference());

// Verify block timestamps
let blocks_to_accept = self.verify_block_timestamps_and_accept(unsuspended_blocks);

accepted_blocks.push(block);
accepted_blocks.extend(blocks_to_accept);
}

self.update_stats(0);

accepted_blocks
}

/// Returns all the blocks that are currently missing and needed in order to accept suspended
/// blocks.
pub(crate) fn missing_blocks(&self) -> BTreeSet<BlockRef> {
Expand Down

0 comments on commit 64eaa9c

Please sign in to comment.