Skip to content

Commit

Permalink
delay notify_randomness_in_checkpoint until checkpoint is committed
Browse files Browse the repository at this point in the history
  • Loading branch information
mystenmark committed Jan 31, 2025
1 parent eaf1f6a commit 5116b25
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 21 deletions.
61 changes: 41 additions & 20 deletions crates/sui-core/src/checkpoints/checkpoint_executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ type CheckpointExecutionBuffer = FuturesOrdered<
Option<Accumulator>,
Option<CheckpointData>,
Vec<TransactionDigest>,
Vec<RandomnessRound>,
)>,
>;

Expand Down Expand Up @@ -309,10 +310,10 @@ impl CheckpointExecutor {
// watermark accordingly. Note that given that checkpoints are guaranteed to
// be processed (added to FuturesOrdered) in seq_number order, using FuturesOrdered
// guarantees that we will also ratchet the watermarks in order.
Some(Ok((checkpoint, checkpoint_acc, checkpoint_data, tx_digests))) = pending.next() => {
Some(Ok((checkpoint, checkpoint_acc, checkpoint_data, tx_digests, randomness_rounds))) = pending.next() => {
let _process_scope = mysten_metrics::monitored_scope("ProcessExecutedCheckpoint");

self.process_executed_checkpoint(&epoch_store, &checkpoint, checkpoint_acc, checkpoint_data, &tx_digests).await;
self.process_executed_checkpoint(&epoch_store, &checkpoint, checkpoint_acc, checkpoint_data, &tx_digests, randomness_rounds).await;
self.backpressure_manager.update_highest_executed_checkpoint(*checkpoint.sequence_number());
highest_executed = Some(checkpoint.clone());

Expand Down Expand Up @@ -444,6 +445,7 @@ impl CheckpointExecutor {
checkpoint_acc: Option<Accumulator>,
checkpoint_data: Option<CheckpointData>,
all_tx_digests: &[TransactionDigest],
randomness_rounds: Vec<RandomnessRound>,
) {
// Commit all transaction effects to disk
let cache_commit = self.state.get_cache_commit();
Expand All @@ -463,6 +465,21 @@ impl CheckpointExecutor {
.handle_committed_transactions(all_tx_digests)
.expect("cannot fail");

// Once the checkpoint is finalized, we know that any randomness contained in this checkpoint has
// been successfully included in a checkpoint certified by quorum of validators.
// (RandomnessManager/RandomnessReporter is only present on validators.)
if let Some(randomness_reporter) = epoch_store.randomness_reporter() {
for round in randomness_rounds {
debug!(
?round,
"notifying RandomnessReporter that randomness update was executed in checkpoint"
);
randomness_reporter
.notify_randomness_in_checkpoint(round)
.expect("epoch cannot have ended");
}
}

if let Some(checkpoint_data) = checkpoint_data {
self.commit_index_updates_and_enqueue_to_subscription_service(checkpoint_data)
.await;
Expand Down Expand Up @@ -579,7 +596,7 @@ impl CheckpointExecutor {

pending.push_back(spawn_monitored_task!(async move {
let epoch_store = epoch_store.clone();
let (tx_digests, checkpoint_acc, checkpoint_data) = loop {
let (tx_digests, checkpoint_acc, checkpoint_data, randomness_rounds) = loop {
match execute_checkpoint(
checkpoint.clone(),
&state,
Expand All @@ -603,12 +620,23 @@ impl CheckpointExecutor {
tokio::time::sleep(Duration::from_secs(1)).await;
metrics.checkpoint_exec_errors.inc();
}
Ok((tx_digests, checkpoint_acc, checkpoint_data)) => {
break (tx_digests, checkpoint_acc, checkpoint_data)
Ok((tx_digests, checkpoint_acc, checkpoint_data, randomness_rounds)) => {
break (
tx_digests,
checkpoint_acc,
checkpoint_data,
randomness_rounds,
)
}
}
};
(checkpoint, checkpoint_acc, checkpoint_data, tx_digests)
(
checkpoint,
checkpoint_acc,
checkpoint_data,
tx_digests,
randomness_rounds,
)
}));
}

Expand Down Expand Up @@ -786,6 +814,7 @@ async fn execute_checkpoint(
Vec<TransactionDigest>,
Option<Accumulator>,
Option<CheckpointData>,
Vec<RandomnessRound>,
)> {
debug!("Preparing checkpoint for execution",);
let prepare_start = Instant::now();
Expand Down Expand Up @@ -829,20 +858,12 @@ async fn execute_checkpoint(
)
.await?;

// Once execution is complete, we know that any randomness contained in this checkpoint has
// been successfully included in a checkpoint certified by quorum of validators.
// (RandomnessManager/RandomnessReporter is only present on validators.)
if let Some(randomness_reporter) = epoch_store.randomness_reporter() {
for round in randomness_rounds {
debug!(
?round,
"notifying RandomnessReporter that randomness update was executed in checkpoint"
);
randomness_reporter.notify_randomness_in_checkpoint(round)?;
}
}

Ok((all_tx_digests, checkpoint_acc, checkpoint_data))
Ok((
all_tx_digests,
checkpoint_acc,
checkpoint_data,
randomness_rounds,
))
}

#[instrument(level = "error", skip_all, fields(seq = ?checkpoint.sequence_number(), epoch = ?epoch_store.epoch()))]
Expand Down
5 changes: 4 additions & 1 deletion crates/sui-core/src/consensus_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use arc_swap::ArcSwap;
use consensus_config::Committee as ConsensusCommittee;
use consensus_core::{CommitConsumerMonitor, TransactionIndex, VerifiedBlock};
use lru::LruCache;
use mysten_common::debug_fatal;
use mysten_metrics::{
monitored_future,
monitored_mpsc::{self, UnboundedReceiver},
Expand Down Expand Up @@ -353,7 +354,9 @@ impl<C: CheckpointServiceNotify + Send + Sync> ConsensusHandler<C> {
&parsed.transaction.kind
{
// These are deprecated and we should never see them. Log an error and eat the tx if one appears.
error!("BUG: saw deprecated RandomnessStateUpdate tx for commit round {round:?}, randomness round {randomness_round:?}")
debug_fatal!(
"BUG: saw deprecated RandomnessStateUpdate tx for commit round {round:?}, randomness round {randomness_round:?}"
);
} else {
let transaction =
SequencedConsensusTransactionKind::External(parsed.transaction);
Expand Down

0 comments on commit 5116b25

Please sign in to comment.