Skip to content

Commit

Permalink
[review] address review comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
akichidis committed Feb 14, 2025
1 parent 1822ed4 commit 6209b58
Show file tree
Hide file tree
Showing 3 changed files with 162 additions and 132 deletions.
5 changes: 4 additions & 1 deletion consensus/core/src/commit_syncer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,10 @@ impl<C: NetworkClient> CommitSyncer<C> {
let blocks = commit
.blocks()
.iter()
.map(|block_ref| blocks_map.remove(block_ref).expect("Block should exist"))
.map(|block_ref| {
// It's impossible for two different commits to sequence the same block so that operation should be safe.
blocks_map.remove(block_ref).expect("Block should exist")
})
.collect::<Vec<_>>();
CertifiedCommit::new_certified(commit, blocks)
})
Expand Down
281 changes: 150 additions & 131 deletions consensus/core/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,10 @@ impl Core {

// We want to enable the commit process logic when GC is enabled.
if self.dag_state.read().gc_enabled() {
let commits = self
.validate_certified_commits(commits)
.expect("Certified commits validation failed");

// Try to commit the new blocks. Take into account the trusted commit that has been provided.
self.try_commit(commits)?;

Expand Down Expand Up @@ -420,6 +424,43 @@ impl Core {
Ok(None)
}

/// Keeps only the certified commits that have a commit index > last commit index. It also ensures that the first commit in the list is the next one in line, otherwise it panics.
fn validate_certified_commits(
&mut self,
commits: Vec<CertifiedCommit>,
) -> ConsensusResult<Vec<CertifiedCommit>> {
// Filter out the commits that have been already locally committed and keep only anything that is above the last committed index.
let last_commit_index = self.dag_state.read().last_commit_index();
let commits = commits
.iter()
.filter(|commit| {
if commit.index() > last_commit_index {
true
} else {
tracing::debug!(
"Skip commit for index {} as it is already committed with last commit index {}",
commit.index(),
last_commit_index
);
false
}
})
.cloned()
.collect::<Vec<_>>();

// Make sure that the first commit we find is the next one in line and there is no gap.
if let Some(commit) = commits.first() {
if commit.index() != last_commit_index + 1 {
return Err(ConsensusError::UnexpectedCertifiedCommitIndex {
expected_commit_index: last_commit_index + 1,
commit_index: commit.index(),
});
}
}

Ok(commits)
}

// Attempts to create a new block, persist and propose it to all peers.
// When force is true, ignore if leader from the last round exists among ancestors and if
// the minimum round delay has passed.
Expand Down Expand Up @@ -968,35 +1009,6 @@ impl Core {

assert!(limit > 0, "limit should be greater than 0");

let last_commit_index = self.dag_state.read().last_commit_index();

// If there are certified committed leaders, check that the first certified committed leader which is higher than the last decided one has no gaps.
while !certified_commits.is_empty() {
let certified_commit = certified_commits
.first()
.expect("Synced commits should not be empty");
if certified_commit.index() <= last_commit_index {
tracing::debug!(
"Skip commit for index {} as it is already committed with last commit index {}",
certified_commit.index(),
last_commit_index
);
certified_commits.remove(0);
} else {
// Make sure that the first commit we find is the next one in line and there is no gap.
if certified_commit.index() != last_commit_index + 1 {
panic!("Gap found between the certified commits and the last committed index. Expected next commit index to be {}, but found {}", last_commit_index + 1, certified_commit.index());
}

// now break as we want to process the rest of the committed leaders
break;
}
}

if certified_commits.is_empty() {
return Vec::new();
}

let to_commit = if certified_commits.len() >= limit {
// We keep only the number of leaders as dictated by the `limit`
certified_commits.drain(..limit).collect::<Vec<_>>()
Expand Down Expand Up @@ -3257,55 +3269,18 @@ mod test {
}

#[tokio::test]
async fn try_commit_with_certified_commits() {
async fn test_validate_certified_commits() {
telemetry_subscribers::init_for_testing();

let (context, mut key_pairs) = Context::new_for_test(4);
let context = Arc::new(context.with_parameters(Parameters {
let (context, _key_pairs) = Context::new_for_test(4);
let context = context.with_parameters(Parameters {
sync_last_known_own_block_timeout: Duration::from_millis(2_000),
..Default::default()
}));

let store = Arc::new(MemStore::new());
let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));

let block_manager = BlockManager::new(
context.clone(),
dag_state.clone(),
Arc::new(NoopBlockVerifier),
);
let leader_schedule = Arc::new(
LeaderSchedule::from_store(context.clone(), dag_state.clone())
.with_num_commits_per_schedule(10),
);

let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
let (signals, signal_receivers) = CoreSignals::new(context.clone());
// Need at least one subscriber to the block broadcast channel.
let _block_receiver = signal_receivers.block_broadcast_receiver();

let (commit_consumer, _commit_receiver, _transaction_receiver) = CommitConsumer::new(0);
let commit_observer = CommitObserver::new(
context.clone(),
commit_consumer,
dag_state.clone(),
store.clone(),
leader_schedule.clone(),
);
});

let mut core = Core::new(
context.clone(),
leader_schedule,
transaction_consumer,
block_manager,
true,
commit_observer,
signals,
key_pairs.remove(context.own_index.value()).1,
dag_state.clone(),
true,
);
let authority_index = AuthorityIndex::new_for_test(0);
let core = CoreTextFixture::new(context, vec![1, 1, 1, 1], authority_index, true);
let mut core = core.core;

// No new block should have been produced
assert_eq!(
Expand All @@ -3315,15 +3290,15 @@ mod test {
);

// create a DAG of 12 rounds
let mut dag_builder = DagBuilder::new(context.clone());
let mut dag_builder = DagBuilder::new(core.context.clone());
dag_builder.layers(1..=12).build();

// Store all blocks up to round 6 which should be enough to decide up to leader 4
dag_builder.print();
let blocks = dag_builder.blocks(1..=6);

for block in blocks {
dag_state.write().accept_block(block);
core.dag_state.write().accept_block(block);
}

// Get all the committed sub dags up to round 10
Expand All @@ -3335,7 +3310,7 @@ mod test {
// We should have committed up to round 4
assert_eq!(committed_sub_dags.len(), 4);

// Now try to commit providing the certified commits. We'll try 3 different scenarios:
// Now validate the certified commits. We'll try 3 different scenarios:
println!("Case 1. Provide certified commits that are all before the last committed round.");

// Highest certified commit should be for leader of round 4.
Expand All @@ -3351,11 +3326,10 @@ mod test {
"Highest certified commit should older than the highest committed index."
);

// Try commit using the certified commits
let committed_sub_dags = core.try_commit(certified_commits).expect("Should not fail");
let certified_commits = core.validate_certified_commits(certified_commits).unwrap();

// Nothing should be committed
assert!(committed_sub_dags.is_empty());
// No commits should be processed
assert!(certified_commits.is_empty());

println!("Case 2. Provide certified commits that are all after the last committed round.");

Expand All @@ -3366,43 +3340,121 @@ mod test {
.map(|(_, c)| c.clone())
.collect::<Vec<_>>();

// Try commit using the certified commits
let committed_sub_dags = core
.try_commit(certified_commits.clone())
.expect("Should not fail");
let certified_commits = core
.validate_certified_commits(certified_commits.clone())
.unwrap();

// The sub dag of index 5 should be committed.
assert_eq!(committed_sub_dags.len(), 1);
let committed_sub_dag = committed_sub_dags.first().unwrap();
assert_eq!(committed_sub_dag.commit_ref.index, 5);
// The certified commit of index 5 should be processed.
assert_eq!(certified_commits.len(), 1);
assert_eq!(certified_commits.first().unwrap().reference().index, 5);

println!("Case 3. Provide certified commits that are all after the last committed round and also there are additional blocks so can run the direct decide rule as well.");
println!("Case 3. Provide certified commits where the first certified commit index is not the last_commited_index + 1.");

// The commits of leader rounds 5-8 should be committed via the certified commits.
// Highest certified commit should be for leader of round 4.
let certified_commits = sub_dags_and_commits
.iter()
.skip(5)
.take(3)
.take(1)
.map(|(_, c)| c.clone())
.collect::<Vec<_>>();

let err = core
.validate_certified_commits(certified_commits.clone())
.unwrap_err();
match err {
ConsensusError::UnexpectedCertifiedCommitIndex {
expected_commit_index: 5,
commit_index: 6,
} => (),
_ => panic!("Unexpected error: {:?}", err),
}
}

#[tokio::test]
async fn test_add_certified_commits() {
telemetry_subscribers::init_for_testing();

let (context, _key_pairs) = Context::new_for_test(4);
let context = context.with_parameters(Parameters {
sync_last_known_own_block_timeout: Duration::from_millis(2_000),
..Default::default()
});

let authority_index = AuthorityIndex::new_for_test(0);
let core = CoreTextFixture::new(context, vec![1, 1, 1, 1], authority_index, true);
let store = core.store.clone();
let mut core = core.core;

// No new block should have been produced
assert_eq!(
core.last_proposed_round(),
GENESIS_ROUND,
"No block should have been created other than genesis"
);

// create a DAG of 12 rounds
let mut dag_builder = DagBuilder::new(core.context.clone());
dag_builder.layers(1..=12).build();

// Store all blocks up to round 6 which should be enough to decide up to leader 4
dag_builder.print();
let blocks = dag_builder.blocks(1..=6);

for block in blocks {
core.dag_state.write().accept_block(block);
}

// Get all the committed sub dags up to round 10
let sub_dags_and_commits = dag_builder.get_sub_dag_and_certified_commits(1..=10);

// Now try to commit up to the latest leader (round = 4). Do not provide any certified commits.
let committed_sub_dags = core.try_commit(vec![]).unwrap();

// We should have committed up to round 4
assert_eq!(committed_sub_dags.len(), 4);

let last_commit = store
.read_last_commit()
.unwrap()
.expect("Last commit should be set");
assert_eq!(last_commit.reference().index, 4);

println!("Case 1. Provide no certified commits. No commit should happen.");

let last_commit = store
.read_last_commit()
.unwrap()
.expect("Last commit should be set");
assert_eq!(last_commit.reference().index, 4);

println!("Case 2. Provide certified commits that before and after the last committed round and also there are additional blocks so can run the direct decide rule as well.");

// The commits of leader rounds 5-8 should be committed via the certified commits.
let certified_commits = sub_dags_and_commits
.iter()
.skip(3)
.take(5)
.map(|(_, c)| c.clone())
.collect::<Vec<_>>();

// Now only add the blocks of rounds 8..=12. The blocks up to round 7 should be accepted via the certified commits processing.
let blocks = dag_builder.blocks(8..=12);
for block in blocks {
dag_state.write().accept_block(block);
core.dag_state.write().accept_block(block);
}

// Try commit using the certified commits. The corresponding blocks of the certified commits should be accepted and stored before linearizing and committing the DAG.
let committed_sub_dags = core
.try_commit(certified_commits.clone())
// The corresponding blocks of the certified commits should be accepted and stored before linearizing and committing the DAG.
core.add_certified_commits(certified_commits.clone())
.expect("Should not fail");

let commits = store.scan_commits((6..=10).into()).unwrap();

// We expect all the sub dags up to leader round 10 to be committed.
assert_eq!(committed_sub_dags.len(), 5);
assert_eq!(commits.len(), 5);

for i in 6..=10 {
let committed_sub_dag = &committed_sub_dags[i - 6];
assert_eq!(committed_sub_dag.commit_ref.index, i as u32);
let commit = &commits[i - 6];
assert_eq!(commit.reference().index, i as u32);
}
}

Expand Down Expand Up @@ -4040,39 +4092,6 @@ mod test {
assert_eq!(certified_commits.len(), 2);
}

#[tokio::test]
#[should_panic(
expected = "Gap found between the certified commits and the last committed index. Expected next commit index to be 1, but found 4"
)]
async fn try_decide_certified_gap_in_commits() {
// GIVEN
let (context, _) = Context::new_for_test(4);
let authority_index = AuthorityIndex::new_for_test(0);
let core = CoreTextFixture::new(context.clone(), vec![1, 1, 1, 1], authority_index, true);
let mut core = core.core;

let mut dag_builder = DagBuilder::new(core.context.clone());
dag_builder.layers(1..=12).build();

let limit = 2;

let blocks = dag_builder.blocks(1..=12);

for block in blocks {
core.dag_state.write().accept_block(block);
}

// WHEN
let sub_dags_and_commits = dag_builder.get_sub_dag_and_certified_commits(4..=5);
let mut certified_commits = sub_dags_and_commits
.into_iter()
.map(|(_, commit)| commit)
.collect::<Vec<_>>();

// This should panic as the last committed index is 1 and the first provided index is 4.
let _leaders = core.try_decide_certified(&mut certified_commits, limit);
}

pub(crate) async fn receive<T: Copy>(timeout: Duration, mut receiver: watch::Receiver<T>) -> T {
tokio::time::timeout(timeout, receiver.changed())
.await
Expand Down
Loading

0 comments on commit 6209b58

Please sign in to comment.