Skip to content

Commit

Permalink
[review] move the try_decide_certified method to Core.
Browse files Browse the repository at this point in the history
  • Loading branch information
akichidis committed Feb 12, 2025
1 parent 64eaa9c commit e0d776a
Show file tree
Hide file tree
Showing 3 changed files with 162 additions and 148 deletions.
158 changes: 152 additions & 6 deletions consensus/core/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

use std::{
collections::{BTreeMap, BTreeSet},
iter,
iter, mem,
sync::Arc,
time::Duration,
vec,
Expand Down Expand Up @@ -31,7 +31,7 @@ use crate::{
Slot, VerifiedBlock, GENESIS_ROUND,
},
block_manager::BlockManager,
commit::{CertifiedCommit, CommitAPI, CommittedSubDag, DecidedLeader},
commit::{CertifiedCommit, CommitAPI, CommittedSubDag, DecidedLeader, Decision},
commit_observer::CommitObserver,
context::Context,
dag_state::DagState,
Expand Down Expand Up @@ -737,7 +737,6 @@ impl Core {
Vec<DecidedLeader>,
Vec<CertifiedCommit>,
) = self
.committer
.try_decide_certified(&mut certified_commits, commits_until_update)
.into_iter()
.unzip();
Expand Down Expand Up @@ -953,6 +952,79 @@ impl Core {
true
}

// Try to decide which of the certified commits will have to be committed next respecting the `limit`. If provided `limit` is zero, it will panic.
// The function returns the list of decided leaders and updates in place the remaining certified commits. If empty vector is returned, it means that
// there are no certified commits to be committed as `certified_commits` is either empty or all of the certified commits are already committed.
#[tracing::instrument(skip_all)]
fn try_decide_certified(
&mut self,
certified_commits: &mut Vec<CertifiedCommit>,
limit: usize,
) -> Vec<(DecidedLeader, CertifiedCommit)> {
// If GC is disabled then should not run any of this logic.
if !self.dag_state.read().gc_enabled() {
return Vec::new();
}

assert!(limit > 0, "limit should be greater than 0");

let last_commit_index = self.dag_state.read().last_commit_index();

// If there are certified committed leaders, check that the first certified committed leader which is higher than the last decided one has no gaps.
while !certified_commits.is_empty() {
let certified_commit = certified_commits
.first()
.expect("Synced commits should not be empty");
if certified_commit.index() <= last_commit_index {
tracing::debug!(
"Skip commit for index {} as it is already committed with last commit index {}",
certified_commit.index(),
last_commit_index
);
certified_commits.remove(0);
} else {
// Make sure that the first commit we find is the next one in line and there is no gap.
if certified_commit.index() != last_commit_index + 1 {
panic!("Gap found between the certified commits and the last committed index. Expected next commit index to be {}, but found {}", last_commit_index + 1, certified_commit.index());
}

// now break as we want to process the rest of the committed leaders
break;
}
}

if certified_commits.is_empty() {
return Vec::new();
}

let to_commit = if certified_commits.len() >= limit {
// We keep only the number of leaders as dictated by the `limit`
certified_commits.drain(..limit).collect::<Vec<_>>()
} else {
// Otherwise just take all of them and leave the `synced_commits` empty.
mem::take(certified_commits)
};

tracing::info!(
"Decided {} certified leaders: {}",
to_commit.len(),
to_commit.iter().map(|c| c.leader().to_string()).join(",")
);

let sequenced_leaders = to_commit
.into_iter()
.map(|commit| {
let leader = commit.blocks().last().expect("Certified commit should have at least one block");
assert_eq!(leader.reference(), commit.leader(), "Last block of the committed sub dag should have the same digest as the leader of the commit");
let leader = DecidedLeader::Commit(leader.clone());
UniversalCommitter::update_metrics(&self.context, &leader, Decision::Certified);
(leader, commit)
})
.collect::<Vec<_>>();

sequenced_leaders
}

/// Retrieves the next ancestors to propose to form a block at `clock_round` round.
fn ancestors_to_propose(&mut self, clock_round: Round) -> Vec<VerifiedBlock> {
// Now take the ancestors before the clock_round (excluded) for each authority.
Expand Down Expand Up @@ -1310,7 +1382,7 @@ pub(crate) fn create_cores(context: Context, authorities: Vec<Stake>) -> Vec<Cor

for index in 0..authorities.len() {
let own_index = AuthorityIndex::new_for_test(index as u32);
let core = CoreTextFixture::new(context.clone(), authorities.clone(), own_index);
let core = CoreTextFixture::new(context.clone(), authorities.clone(), own_index, false);
cores.push(core);
}
cores
Expand All @@ -1328,7 +1400,12 @@ pub(crate) struct CoreTextFixture {

#[cfg(test)]
impl CoreTextFixture {
fn new(context: Context, authorities: Vec<Stake>, own_index: AuthorityIndex) -> Self {
fn new(
context: Context,
authorities: Vec<Stake>,
own_index: AuthorityIndex,
sync_last_known_own_block: bool,
) -> Self {
let (committee, mut signers) = local_committee_and_keys(0, authorities.clone());
let mut context = context.clone();
context = context
Expand Down Expand Up @@ -1378,7 +1455,7 @@ impl CoreTextFixture {
signals,
block_signer,
dag_state,
false,
sync_last_known_own_block,
);

Self {
Expand Down Expand Up @@ -3897,6 +3974,75 @@ mod test {
assert_eq!(all_stored_commits.len(), 6);
}

#[tokio::test]
async fn try_decide_certified() {
// GIVEN
telemetry_subscribers::init_for_testing();

let (context, _) = Context::new_for_test(4);

let authority_index = AuthorityIndex::new_for_test(0);
let core = CoreTextFixture::new(context.clone(), vec![1, 1, 1, 1], authority_index, true);
let mut core = core.core;

let mut dag_builder = DagBuilder::new(Arc::new(context.clone()));
dag_builder.layers(1..=12).build();

let limit = 2;

let blocks = dag_builder.blocks(1..=12);

for block in blocks {
core.dag_state.write().accept_block(block);
}

// WHEN
let sub_dags_and_commits = dag_builder.get_sub_dag_and_certified_commits(1..=4);
let mut certified_commits = sub_dags_and_commits
.into_iter()
.map(|(_, commit)| commit)
.collect::<Vec<_>>();

let leaders = core.try_decide_certified(&mut certified_commits, limit);

// THEN
assert_eq!(leaders.len(), 2);
assert_eq!(certified_commits.len(), 2);
}

#[tokio::test]
#[should_panic(
expected = "Gap found between the certified commits and the last committed index. Expected next commit index to be 1, but found 4"
)]
async fn try_decide_certified_gap_in_commits() {
// GIVEN
let (context, _) = Context::new_for_test(4);
let authority_index = AuthorityIndex::new_for_test(0);
let core = CoreTextFixture::new(context.clone(), vec![1, 1, 1, 1], authority_index, true);
let mut core = core.core;

let mut dag_builder = DagBuilder::new(core.context.clone());
dag_builder.layers(1..=12).build();

let limit = 2;

let blocks = dag_builder.blocks(1..=12);

for block in blocks {
core.dag_state.write().accept_block(block);
}

// WHEN
let sub_dags_and_commits = dag_builder.get_sub_dag_and_certified_commits(4..=5);
let mut certified_commits = sub_dags_and_commits
.into_iter()
.map(|(_, commit)| commit)
.collect::<Vec<_>>();

// This should panic as the last committed index is 1 and the first provided index is 4.
let _leaders = core.try_decide_certified(&mut certified_commits, limit);
}

pub(crate) async fn receive<T: Copy>(timeout: Duration, mut receiver: watch::Receiver<T>) -> T {
tokio::time::timeout(timeout, receiver.changed())
.await
Expand Down
61 changes: 0 additions & 61 deletions consensus/core/src/tests/pipelined_committer_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use crate::{
leader_schedule::{LeaderSchedule, LeaderSwapTable},
storage::mem_store::MemStore,
test_dag::{build_dag, build_dag_layer},
test_dag_builder::DagBuilder,
universal_committer::universal_committer_builder::UniversalCommitterBuilder,
};

Expand Down Expand Up @@ -766,66 +765,6 @@ async fn test_byzantine_validator() {
};
}

#[tokio::test]
async fn try_decide_certified() {
// GIVEN
let (context, dag_state, mut committer) = basic_test_setup();

let mut dag_builder = DagBuilder::new(context.clone());
dag_builder.layers(1..=12).build();

let limit = 2;

let blocks = dag_builder.blocks(1..=12);

for block in blocks {
dag_state.write().accept_block(block);
}

// WHEN
let sub_dags_and_commits = dag_builder.get_sub_dag_and_certified_commits(1..=4);
let mut certified_commits = sub_dags_and_commits
.into_iter()
.map(|(_, commit)| commit)
.collect::<Vec<_>>();

let leaders = committer.try_decide_certified(&mut certified_commits, limit);

// THEN
assert_eq!(leaders.len(), 2);
assert_eq!(certified_commits.len(), 2);
}

#[tokio::test]
#[should_panic(
expected = "Gap found between the certified commits and the last committed index. Expected next commit index to be 1, but found 4"
)]
async fn try_decide_certified_gap_in_commits() {
// GIVEN
let (context, dag_state, mut committer) = basic_test_setup();

let mut dag_builder = DagBuilder::new(context.clone());
dag_builder.layers(1..=12).build();

let limit = 2;

let blocks = dag_builder.blocks(1..=12);

for block in blocks {
dag_state.write().accept_block(block);
}

// WHEN
let sub_dags_and_commits = dag_builder.get_sub_dag_and_certified_commits(4..=5);
let mut certified_commits = sub_dags_and_commits
.into_iter()
.map(|(_, commit)| commit)
.collect::<Vec<_>>();

// This should panic as the last committed index is 1 and the first provided index is 4.
let _leaders = committer.try_decide_certified(&mut certified_commits, limit);
}

fn basic_test_setup() -> (
Arc<Context>,
Arc<RwLock<DagState>>,
Expand Down
Loading

0 comments on commit e0d776a

Please sign in to comment.