Skip to content

Commit

Permalink
[test] adding more tests for the block manager and the core commit rule.
Browse files Browse the repository at this point in the history
  • Loading branch information
akichidis committed Jan 28, 2025
1 parent 78f5022 commit fd30e73
Show file tree
Hide file tree
Showing 4 changed files with 249 additions and 8 deletions.
63 changes: 63 additions & 0 deletions consensus/core/src/block_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,11 @@ impl BlockManager {
&mut self,
mut blocks: Vec<VerifiedBlock>,
) -> Vec<VerifiedBlock> {
assert!(
self.dag_state.read().gc_enabled(),
"GC should be enabled when accepting committed blocks"
);

// Just accept the blocks
let _s = monitored_scope("BlockManager::try_accept_committed_blocks");
let mut accepted_blocks = vec![];
Expand Down Expand Up @@ -1094,6 +1099,64 @@ mod tests {
}
}

#[rstest]
#[tokio::test]
async fn try_accept_committed_blocks() {
// GIVEN
let (mut context, _key_pairs) = Context::new_for_test(4);
// We set the gc depth to 4
context
.protocol_config
.set_consensus_gc_depth_for_testing(4);
let context = Arc::new(context);
let store = Arc::new(MemStore::new());
let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));

// We "fake" the commit for round 6, so GC round moves to (commit_round - gc_depth = 6 - 4 = 2)
let last_commit = TrustedCommit::new_for_test(
10,
CommitDigest::MIN,
context.clock.timestamp_utc_ms(),
BlockRef::new(6, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
vec![],
);
dag_state.write().set_last_commit(last_commit);
assert_eq!(
dag_state.read().gc_round(),
2,
"GC round should have moved to round 2"
);

let mut block_manager =
BlockManager::new(context.clone(), dag_state, Arc::new(NoopBlockVerifier));

// create a DAG of 12 rounds
let mut dag_builder = DagBuilder::new(context.clone());
dag_builder.layers(1..=12).build();

// Now try to accept via the normal acceptance block path the blocks of rounds 7 ~ 12. None of them should be accepted
let blocks = dag_builder.blocks(7..=12);
let (accepted_blocks, missing) = block_manager.try_accept_blocks(blocks.clone());
assert!(accepted_blocks.is_empty());
assert_eq!(missing.len(), 4);

// Now try to accept via the committed blocks path the blocks of rounds 3 ~ 6. All of them should be accepted and also the blocks
// of rounds 7 ~ 12 should be unsuspended and accepted as well.
let blocks = dag_builder.blocks(3..=6);

// WHEN
let mut accepted_blocks = block_manager.try_accept_committed_blocks(blocks);

// THEN
accepted_blocks.sort_by_key(|b| b.reference());

let mut all_blocks = dag_builder.blocks(3..=12);
all_blocks.sort_by_key(|b| b.reference());

assert_eq!(accepted_blocks, all_blocks);
assert!(block_manager.is_empty());
}

struct TestBlockVerifier {
fail: BTreeSet<BlockRef>,
}
Expand Down
178 changes: 177 additions & 1 deletion consensus/core/src/core.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::{collections::BTreeSet, iter, sync::Arc, time::Duration, vec};
use std::{
collections::{BTreeMap, BTreeSet},
iter,
sync::Arc,
time::Duration,
vec,
};

#[cfg(test)]
use consensus_config::{local_committee_and_keys, Stake};
Expand Down Expand Up @@ -669,6 +675,11 @@ impl Core {
.with_label_values(&["Core::try_commit"])
.start_timer();

let mut synced_commits_map = BTreeMap::new();
for commit in &synced_commits {
synced_commits_map.insert(commit.index(), commit.reference());
}

if !synced_commits.is_empty() {
info!(
"Will try to commit synced commits first : {:?}",
Expand Down Expand Up @@ -797,6 +808,16 @@ impl Core {
committed_sub_dags.extend(subdags);
}

// Sanity check: for commits that have been linearized using the synced commits, ensure that the same sub dag has been committed.
for sub_dag in &committed_sub_dags {
if let Some(commit_ref) = synced_commits_map.remove(&sub_dag.commit_ref.index) {
assert_eq!(
commit_ref, sub_dag.commit_ref,
"Synced commit has different reference than the committed sub dag"
);
}
}

// Notify about our own committed blocks
let committed_block_refs = committed_sub_dags
.iter()
Expand Down Expand Up @@ -1367,6 +1388,7 @@ mod test {
use crate::{
block::{genesis_blocks, TestBlock},
block_verifier::NoopBlockVerifier,
commit::CommitAPI,
leader_scoring::ReputationScores,
storage::{mem_store::MemStore, Store, WriteBatch},
test_dag_builder::DagBuilder,
Expand Down Expand Up @@ -2926,6 +2948,160 @@ mod test {
}
}

// This will test the following scenarios:
// * We do have synced commits
#[tokio::test]
async fn try_commit_with_synced_commits() {
telemetry_subscribers::init_for_testing();

let (context, mut key_pairs) = Context::new_for_test(4);
let context = Arc::new(context.with_parameters(Parameters {
sync_last_known_own_block_timeout: Duration::from_millis(2_000),
..Default::default()
}));

let store = Arc::new(MemStore::new());
let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));

let block_manager = BlockManager::new(
context.clone(),
dag_state.clone(),
Arc::new(NoopBlockVerifier),
);
let leader_schedule = Arc::new(
LeaderSchedule::from_store(context.clone(), dag_state.clone())
.with_num_commits_per_schedule(10),
);

let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
let (signals, signal_receivers) = CoreSignals::new(context.clone());
// Need at least one subscriber to the block broadcast channel.
let _block_receiver = signal_receivers.block_broadcast_receiver();

let (commit_consumer, _commit_receiver, _transaction_receiver) = CommitConsumer::new(0);
let commit_observer = CommitObserver::new(
context.clone(),
commit_consumer,
dag_state.clone(),
store.clone(),
leader_schedule.clone(),
);

let mut core = Core::new(
context.clone(),
leader_schedule,
transaction_consumer,
block_manager,
true,
commit_observer,
signals,
key_pairs.remove(context.own_index.value()).1,
dag_state.clone(),
true,
);

// No new block should have been produced
assert_eq!(
core.last_proposed_round(),
GENESIS_ROUND,
"No block should have been created other than genesis"
);

// create a DAG of 12 rounds
let mut dag_builder = DagBuilder::new(context.clone());
dag_builder.layers(1..=12).build();

// Store all blocks up to round 6 which should be enough to decide up to leader 4
dag_builder.print();
let blocks = dag_builder.blocks(1..=6);

for block in blocks {
dag_state.write().accept_block(block);
}

// Get all the committed sub dags up to round 10
let sub_dags_and_commits = dag_builder.get_sub_dag_and_commits(1..=10);

// Now try to commit up to the latest leader (round = 4). Do not provide any synced commits.
let committed_sub_dags = core.try_commit(vec![]).unwrap();

// We should have committed up to round 4
assert_eq!(committed_sub_dags.len(), 4);

// Now try to commit providing the synced commits. We'll try 3 different scenarios
// 1. Provide sync commits that are all before the last committed round
// 2. Provide sync commits that are before and after the last committed round
// 3. Provide sync commits that are all after the last committed round and also there are additional blocks so can run the direct decide rule as well

println!("Case 1. Provide sync commits that are all before the last committed round.");

// Highest synced commit should be for leader of round 4.
let synced_commits = sub_dags_and_commits
.iter()
.take(4)
.map(|(_, c)| c.clone())
.collect::<Vec<_>>();
assert!(
synced_commits.last().unwrap().index()
<= committed_sub_dags.last().unwrap().commit_ref.index,
"Highest synced commit should older than the highest committed index."
);

// Try commit using the synced commits
let committed_sub_dags = core.try_commit(synced_commits).expect("Should not fail");

// Nothing should be committed
assert!(committed_sub_dags.is_empty());

println!("Case 2. Provide sync commits that are all after the last committed round");

// Highest synced commit should be for leader of round 4.
let synced_commits = sub_dags_and_commits
.iter()
.take(5)
.map(|(_, c)| c.clone())
.collect::<Vec<_>>();

// Try commit using the synced commits
let committed_sub_dags = core
.try_commit(synced_commits.clone())
.expect("Should not fail");

// The sub dag of index 5 should be committed.
assert_eq!(committed_sub_dags.len(), 1);
let committed_sub_dag = committed_sub_dags.first().unwrap();
assert_eq!(committed_sub_dag.commit_ref.index, 5);

println!("Case 3. Provide sync commits that are all after the last committed round and also there are additional blocks so can run the direct decide rule as well");

let synced_commits = sub_dags_and_commits
.iter()
.skip(5)
.take(1)
.map(|(_, c)| c.clone())
.collect::<Vec<_>>();

// Now also add the blocks up to round 12
let blocks = dag_builder.blocks(7..=12);
for block in blocks {
dag_state.write().accept_block(block);
}

// Try commit using the synced commits
let committed_sub_dags = core
.try_commit(synced_commits.clone())
.expect("Should not fail");

// We expect all the sub dags up to leader round 10 to be committed.
assert_eq!(committed_sub_dags.len(), 5);

for i in 6..=10 {
let committed_sub_dag = &committed_sub_dags[i - 6];
assert_eq!(committed_sub_dag.commit_ref.index, i as u32);
}
}

#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn test_commit_on_leader_schedule_change_boundary_without_multileader() {
parameterized_test_commit_on_leader_schedule_change_boundary(Some(1)).await;
Expand Down
15 changes: 8 additions & 7 deletions consensus/core/src/test_dag_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::{
dag_state::DagState,
leader_schedule::{LeaderSchedule, LeaderSwapTable},
linearizer::{BlockStoreAPI, Linearizer},
CommittedSubDag,
CommitRef, CommittedSubDag,
};

/// DagBuilder API
Expand Down Expand Up @@ -141,15 +141,15 @@ impl DagBuilder {
&mut self,
leader_rounds: RangeInclusive<Round>,
) -> Vec<(CommittedSubDag, TrustedCommit)> {
let (last_leader_round, mut last_commit_index, mut last_timestamp_ms) =
let (last_leader_round, mut last_commit_ref, mut last_timestamp_ms) =
if let Some((sub_dag, _)) = self.committed_sub_dags.last() {
(
sub_dag.leader.round,
sub_dag.commit_ref.index,
sub_dag.commit_ref,
sub_dag.timestamp_ms,
)
} else {
(0, 0, 0)
(0, CommitRef::new(0, CommitDigest::MIN), 0)
};

struct BlockStorage {
Expand Down Expand Up @@ -218,7 +218,6 @@ impl DagBuilder {
.saturating_sub(self.context.protocol_config.gc_depth());

let leader_block_ref = leader_block.reference();
last_commit_index += 1;
last_timestamp_ms = leader_block.timestamp_ms().max(last_timestamp_ms);

let (to_commit, rejected_transactions) = Linearizer::linearize_sub_dag(
Expand All @@ -235,8 +234,8 @@ impl DagBuilder {
}

let commit = TrustedCommit::new_for_test(
last_commit_index,
CommitDigest::MIN,
last_commit_ref.index + 1,
last_commit_ref.digest,
last_timestamp_ms,
leader_block_ref,
to_commit
Expand All @@ -245,6 +244,8 @@ impl DagBuilder {
.collect::<Vec<_>>(),
);

last_commit_ref = commit.reference();

let sub_dag = CommittedSubDag::new(
leader_block_ref,
to_commit,
Expand Down
1 change: 1 addition & 0 deletions consensus/core/src/universal_committer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ impl UniversalCommitter {
for committer in self.committers.iter().rev() {
// Skip committers that don't have a leader for this round.
let Some(slot) = committer.elect_leader(round) else {
tracing::debug!("No leader for round {round}, skipping");
continue;
};

Expand Down

0 comments on commit fd30e73

Please sign in to comment.