Skip to content

Commit

Permalink
[consensus] advance threshold clock from dag state (#20906)
Browse files Browse the repository at this point in the history
## Description 

Updates threshold clock when blocks are accepted into dag state, so
consistency between threshold clock and dag state are easily maintained.

## Test plan 

CI

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] gRPC:
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
  • Loading branch information
mwtian authored Jan 17, 2025
1 parent a16c942 commit 5af9664
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 83 deletions.
89 changes: 52 additions & 37 deletions consensus/core/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ use crate::{
leader_schedule::LeaderSchedule,
round_prober::QuorumRound,
stake_aggregator::{QuorumThreshold, StakeAggregator},
threshold_clock::ThresholdClock,
transaction::TransactionConsumer,
universal_committer::{
universal_committer_builder::UniversalCommitterBuilder, UniversalCommitter,
Expand All @@ -51,8 +50,6 @@ const MAX_COMMIT_VOTES_PER_BLOCK: usize = 100;

pub(crate) struct Core {
context: Arc<Context>,
/// The threshold clock that is used to keep track of the current round
threshold_clock: ThresholdClock,
/// The consumer to use in order to pull transactions to be included for the next proposals
transaction_consumer: TransactionConsumer,
/// The block manager which is responsible for keeping track of the DAG dependencies when processing new blocks
Expand All @@ -72,6 +69,8 @@ pub(crate) struct Core {

/// Used to make commit decisions for leader blocks in the dag.
committer: UniversalCommitter,
/// The last new round for which core has sent out a signal.
last_signaled_round: Round,
/// The blocks of the last included ancestors per authority. This vector is basically used as a
/// watermark in order to include in the next block proposal only ancestors of higher rounds.
/// By default, is initialised with `None` values.
Expand Down Expand Up @@ -133,6 +132,8 @@ impl Core {

let last_proposed_block = dag_state.read().get_last_proposed_block();

let last_signaled_round = last_proposed_block.round();

// Recover the last included ancestor rounds based on the last proposed block. That will allow
// to perform the next block proposal by using ancestor blocks of higher rounds and avoid
// re-including blocks that have been already included in the last (or earlier) block proposal.
Expand Down Expand Up @@ -162,8 +163,8 @@ impl Core {
ancestor_state_manager.set_propagation_scores(propagation_scores);

Self {
context: context.clone(),
threshold_clock: ThresholdClock::new(0, context.clone()),
context,
last_signaled_round,
last_included_ancestors,
last_decided_leader,
leader_schedule,
Expand Down Expand Up @@ -206,9 +207,7 @@ impl Core {
);
std::thread::sleep(Duration::from_millis(wait_ms));
}
// Recover the last available quorum to correctly advance the threshold clock.
let last_quorum = self.dag_state.read().last_quorum();
self.add_accepted_blocks(last_quorum);

// Try to commit and propose, since they may not have run after the last storage write.
self.try_commit().unwrap();

Expand All @@ -227,6 +226,11 @@ impl Core {
last_proposed_block
};

// Try to set up leader timeout if needed.
// This needs to be called after try_commit() and try_propose(), which may
// have advanced the threshold clock round.
self.try_signal_new_round();

info!(
"Core recovery completed with last proposed block {:?}",
last_proposed_block
Expand Down Expand Up @@ -269,13 +273,16 @@ impl Core {
.join(",")
);

// Now add accepted blocks to the threshold clock and pending ancestors list.
self.add_accepted_blocks(accepted_blocks);

// Try to commit the new blocks if possible.
self.try_commit()?;

// Try to propose now since there are new blocks accepted.
self.try_propose(false)?;

// Now set up leader timeout if needed.
// This needs to be called after try_commit() and try_propose(), which may
// have advanced the threshold clock round.
self.try_signal_new_round();
};

if !missing_block_refs.is_empty() {
Expand All @@ -288,24 +295,26 @@ impl Core {
Ok(missing_block_refs)
}

/// Adds/processed all the newly `accepted_blocks`. We basically try to move the threshold clock and add them to the
/// pending ancestors list.
fn add_accepted_blocks(&mut self, accepted_blocks: Vec<VerifiedBlock>) {
// Advance the threshold clock. If advanced to a new round then send a signal that a new quorum has been received.
if let Some(new_round) = self
.threshold_clock
.add_blocks(accepted_blocks.iter().map(|b| b.reference()).collect())
{
// notify that threshold clock advanced to new round
self.signals.new_round(new_round);
/// If needed, signals a new clock round and sets up leader timeout.
fn try_signal_new_round(&mut self) {
// Signal only when the threshold clock round is more advanced than the last signaled round.
//
// NOTE: a signal is still sent even when a block has been proposed at the new round.
// We can consider changing this in the future.
let new_clock_round = self.dag_state.read().threshold_clock_round();
if new_clock_round <= self.last_signaled_round {
return;
}
// Then send a signal to set up leader timeout.
self.signals.new_round(new_clock_round);
self.last_signaled_round = new_clock_round;

// Report the threshold clock round
self.context
.metrics
.node_metrics
.threshold_clock_round
.set(self.threshold_clock.get_round() as i64);
.set(new_clock_round as i64);
}

/// Creating a new block for the dictated round. This is used when a leader timeout occurs, either
Expand All @@ -324,7 +333,10 @@ impl Core {
.leader_timeout_total
.with_label_values(&[&format!("{force}")])
.inc();
return self.try_propose(force);
let result = self.try_propose(force);
// The threshold clock round may have advanced, so a signal needs to be sent.
self.try_signal_new_round();
return result;
}
Ok(None)
}
Expand Down Expand Up @@ -359,13 +371,18 @@ impl Core {
.with_label_values(&["Core::try_new_block"])
.start_timer();

let clock_round = self.threshold_clock.get_round();
if clock_round <= self.last_proposed_round() {
return None;
}
// Ensure the new block has a higher round than the last proposed block.
let clock_round = {
let dag_state = self.dag_state.read();
let clock_round = dag_state.threshold_clock_round();
if clock_round <= dag_state.get_last_proposed_block().round() {
return None;
}
clock_round
};

// There must be a quorum of blocks from the previous round.
let quorum_round = self.threshold_clock.get_round().saturating_sub(1);
let quorum_round = clock_round.saturating_sub(1);

// Create a new block either because we want to "forcefully" propose a block due to a leader timeout,
// or because we are actually ready to produce the block (leader exists and min delay has passed).
Expand Down Expand Up @@ -428,7 +445,7 @@ impl Core {
.with_label_values(&[leader_authority])
.inc_by(
Instant::now()
.saturating_duration_since(self.threshold_clock.get_quorum_ts())
.saturating_duration_since(self.dag_state.read().threshold_clock_quorum_ts())
.as_millis() as u64,
);
self.context
Expand Down Expand Up @@ -527,9 +544,6 @@ impl Core {
assert_eq!(accepted_blocks.len(), 1);
assert!(missing.is_empty());

// Internally accept the block to move the threshold clock etc
self.add_accepted_blocks(vec![verified_block.clone()]);

// Ensure the new block and its ancestors are persisted, before broadcasting it.
self.dag_state.write().flush();

Expand Down Expand Up @@ -736,7 +750,7 @@ impl Core {

/// Whether the core should propose new blocks.
pub(crate) fn should_propose(&self) -> bool {
let clock_round = self.threshold_clock.get_round();
let clock_round = self.dag_state.read().threshold_clock_round();
let core_skipped_proposals = &self.context.metrics.node_metrics.core_skipped_proposals;

if !self.subscriber_exists {
Expand Down Expand Up @@ -1388,7 +1402,7 @@ mod test {
for round in 1..=4 {
let mut this_round_blocks = Vec::new();

// For round 4 only produce f+1 blocks only skip our validator and that of position 1 from creating blocks.
// For round 4 only produce f+1 blocks. Skip our validator 0 and that of position 1 from creating blocks.
let authorities_to_skip = if round == 4 {
context.committee.validity_threshold() as usize
} else {
Expand Down Expand Up @@ -1454,11 +1468,12 @@ mod test {
false,
);

// New round should be 4
// Clock round should have advanced to 5 during recovery because
// a quorum has formed in round 4.
let mut new_round = signal_receivers.new_round_receiver();
assert_eq!(*new_round.borrow_and_update(), 4);
assert_eq!(*new_round.borrow_and_update(), 5);

// When trying to propose now we should propose block for round 4
// During recovery, round 4 block should have been proposed.
let proposed_block = block_receiver
.recv()
.await
Expand Down
89 changes: 54 additions & 35 deletions consensus/core/src/dag_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use std::{

use consensus_config::AuthorityIndex;
use itertools::Itertools as _;
use tokio::time::Instant;
use tracing::{debug, error, info};

use crate::{
Expand All @@ -25,8 +26,8 @@ use crate::{
},
context::Context,
leader_scoring::{ReputationScores, ScoringSubdag},
stake_aggregator::{QuorumThreshold, StakeAggregator},
storage::{Store, WriteBatch},
threshold_clock::ThresholdClock,
CommittedSubDag,
};

Expand Down Expand Up @@ -56,6 +57,9 @@ pub(crate) struct DagState {
// Vec position corresponds to the authority index.
recent_refs_by_authority: Vec<BTreeSet<BlockRef>>,

// Keeps track of the threshold clock for proposing blocks.
threshold_clock: ThresholdClock,

// Keeps track of the highest round that has been evicted for each authority. Any blocks that are of round <= evict_round
// should be considered evicted, and if any exist we should not consider the causauly complete in the order they appear.
// The `evicted_rounds` size should be the same as the committee size.
Expand Down Expand Up @@ -114,6 +118,8 @@ impl DagState {
.map(|block| (block.reference(), block))
.collect();

let threshold_clock = ThresholdClock::new(1, context.clone());

let last_commit = store
.read_last_commit()
.unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e));
Expand Down Expand Up @@ -168,6 +174,7 @@ impl DagState {
genesis,
recent_blocks: BTreeMap::new(),
recent_refs_by_authority: vec![BTreeSet::new(); num_authorities],
threshold_clock,
highest_accepted_round: 0,
last_commit: last_commit.clone(),
last_commit_round_advancement_time: None,
Expand Down Expand Up @@ -334,6 +341,7 @@ impl DagState {
self.recent_blocks
.insert(block_ref, BlockInfo::new(block.clone()));
self.recent_refs_by_authority[block_ref.author].insert(block_ref);
self.threshold_clock.add_block(block_ref);
self.highest_accepted_round = max(self.highest_accepted_round, block.round());
self.context
.metrics
Expand Down Expand Up @@ -518,11 +526,6 @@ impl DagState {
.collect()
}

pub(crate) fn contains_block(&self, block_ref: &BlockRef) -> bool {
let blocks = self.contains_blocks(vec![*block_ref]);
blocks.first().cloned().unwrap()
}

/// Gets the last proposed block from this authority.
/// If no block is proposed yet, returns the genesis block.
pub(crate) fn get_last_proposed_block(&self) -> VerifiedBlock {
Expand Down Expand Up @@ -728,6 +731,19 @@ impl DagState {
exist
}

pub(crate) fn contains_block(&self, block_ref: &BlockRef) -> bool {
let blocks = self.contains_blocks(vec![*block_ref]);
blocks.first().cloned().unwrap()
}

pub(crate) fn threshold_clock_round(&self) -> Round {
self.threshold_clock.get_round()
}

pub(crate) fn threshold_clock_quorum_ts(&self) -> Instant {
self.threshold_clock.get_quorum_ts()
}

pub(crate) fn highest_accepted_round(&self) -> Round {
self.highest_accepted_round
}
Expand Down Expand Up @@ -959,31 +975,6 @@ impl DagState {
);
}

/// Detects and returns the blocks of the round that forms the last quorum. The method will return
/// the quorum even if that's genesis.
pub(crate) fn last_quorum(&self) -> Vec<VerifiedBlock> {
// the quorum should exist either on the highest accepted round or the one before. If we fail to detect
// a quorum then it means that our DAG has advanced with missing causal history.
for round in
(self.highest_accepted_round.saturating_sub(1)..=self.highest_accepted_round).rev()
{
if round == GENESIS_ROUND {
return self.genesis_blocks();
}
let mut quorum = StakeAggregator::<QuorumThreshold>::new();

// Since the minimum wave length is 3 we expect to find a quorum in the uncommitted rounds.
let blocks = self.get_uncommitted_blocks_at_round(round);
for block in &blocks {
if quorum.add(block.author(), &self.context.committee) {
return blocks;
}
}
}

panic!("Fatal error, no quorum has been detected in our DAG on the last two rounds.");
}

pub(crate) fn recover_last_commit_info(&self) -> Option<(CommitRef, CommitInfo)> {
self.store
.read_last_commit_info()
Expand Down Expand Up @@ -1039,10 +1030,6 @@ impl DagState {
.end()
}

pub(crate) fn genesis_blocks(&self) -> Vec<VerifiedBlock> {
self.genesis.values().cloned().collect()
}

/// The last round that should get evicted after a cache clean up operation. After this round we are
/// guaranteed to have all the produced blocks from that authority. For any round that is
/// <= `last_evicted_round` we don't have such guarantees as out of order blocks might exist.
Expand Down Expand Up @@ -1072,6 +1059,38 @@ impl DagState {
gc_round.min(last_round.saturating_sub(cached_rounds))
}

/// Detects and returns the blocks of the round that forms the last quorum. The method will return
/// the quorum even if that's genesis.
#[cfg(test)]
pub(crate) fn last_quorum(&self) -> Vec<VerifiedBlock> {
// the quorum should exist either on the highest accepted round or the one before. If we fail to detect
// a quorum then it means that our DAG has advanced with missing causal history.
for round in
(self.highest_accepted_round.saturating_sub(1)..=self.highest_accepted_round).rev()
{
if round == GENESIS_ROUND {
return self.genesis_blocks();
}
use crate::stake_aggregator::{QuorumThreshold, StakeAggregator};
let mut quorum = StakeAggregator::<QuorumThreshold>::new();

// Since the minimum wave length is 3 we expect to find a quorum in the uncommitted rounds.
let blocks = self.get_uncommitted_blocks_at_round(round);
for block in &blocks {
if quorum.add(block.author(), &self.context.committee) {
return blocks;
}
}
}

panic!("Fatal error, no quorum has been detected in our DAG on the last two rounds.");
}

#[cfg(test)]
pub(crate) fn genesis_blocks(&self) -> Vec<VerifiedBlock> {
self.genesis.values().cloned().collect()
}

#[cfg(test)]
pub(crate) fn set_last_commit(&mut self, commit: TrustedCommit) {
self.last_commit = Some(commit);
Expand Down
Loading

0 comments on commit 5af9664

Please sign in to comment.