Skip to content

Commit

Permalink
delay notify_randomness_in_checkpoint until checkpoint is committed (#…
Browse files Browse the repository at this point in the history
…21031)

This ensures that we do not stop random partial sig transmission until
the randomness update is durably committed to disk
  • Loading branch information
mystenmark authored Jan 31, 2025
1 parent 8f1ef98 commit 2de66e1
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 21 deletions.
62 changes: 42 additions & 20 deletions crates/sui-core/src/checkpoints/checkpoint_executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ type CheckpointExecutionBuffer = FuturesOrdered<
Option<Accumulator>,
Option<CheckpointData>,
Vec<TransactionDigest>,
Vec<RandomnessRound>,
)>,
>;

Expand Down Expand Up @@ -309,10 +310,10 @@ impl CheckpointExecutor {
// watermark accordingly. Note that given that checkpoints are guaranteed to
// be processed (added to FuturesOrdered) in seq_number order, using FuturesOrdered
// guarantees that we will also ratchet the watermarks in order.
Some(Ok((checkpoint, checkpoint_acc, checkpoint_data, tx_digests))) = pending.next() => {
Some(Ok((checkpoint, checkpoint_acc, checkpoint_data, tx_digests, randomness_rounds))) = pending.next() => {
let _process_scope = mysten_metrics::monitored_scope("ProcessExecutedCheckpoint");

self.process_executed_checkpoint(&epoch_store, &checkpoint, checkpoint_acc, checkpoint_data, &tx_digests).await;
self.process_executed_checkpoint(&epoch_store, &checkpoint, checkpoint_acc, checkpoint_data, &tx_digests, randomness_rounds).await;
self.backpressure_manager.update_highest_executed_checkpoint(*checkpoint.sequence_number());
highest_executed = Some(checkpoint.clone());

Expand Down Expand Up @@ -444,6 +445,7 @@ impl CheckpointExecutor {
checkpoint_acc: Option<Accumulator>,
checkpoint_data: Option<CheckpointData>,
all_tx_digests: &[TransactionDigest],
randomness_rounds: Vec<RandomnessRound>,
) {
// Commit all transaction effects to disk
let cache_commit = self.state.get_cache_commit();
Expand All @@ -463,6 +465,21 @@ impl CheckpointExecutor {
.handle_committed_transactions(all_tx_digests)
.expect("cannot fail");

// Once the checkpoint is finalized, we know that any randomness contained in this checkpoint has
// been successfully included in a checkpoint certified by quorum of validators.
// (RandomnessManager/RandomnessReporter is only present on validators.)
if let Some(randomness_reporter) = epoch_store.randomness_reporter() {
for round in randomness_rounds {
debug!(
?round,
"notifying RandomnessReporter that randomness update was executed in checkpoint"
);
randomness_reporter
.notify_randomness_in_checkpoint(round)
.expect("epoch cannot have ended");
}
}

if let Some(checkpoint_data) = checkpoint_data {
self.commit_index_updates_and_enqueue_to_subscription_service(checkpoint_data)
.await;
Expand Down Expand Up @@ -579,7 +596,7 @@ impl CheckpointExecutor {

pending.push_back(spawn_monitored_task!(async move {
let epoch_store = epoch_store.clone();
let (tx_digests, checkpoint_acc, checkpoint_data) = loop {
let (tx_digests, checkpoint_acc, checkpoint_data, randomness_rounds) = loop {
match execute_checkpoint(
checkpoint.clone(),
&state,
Expand All @@ -603,12 +620,23 @@ impl CheckpointExecutor {
tokio::time::sleep(Duration::from_secs(1)).await;
metrics.checkpoint_exec_errors.inc();
}
Ok((tx_digests, checkpoint_acc, checkpoint_data)) => {
break (tx_digests, checkpoint_acc, checkpoint_data)
Ok((tx_digests, checkpoint_acc, checkpoint_data, randomness_rounds)) => {
break (
tx_digests,
checkpoint_acc,
checkpoint_data,
randomness_rounds,
)
}
}
};
(checkpoint, checkpoint_acc, checkpoint_data, tx_digests)
(
checkpoint,
checkpoint_acc,
checkpoint_data,
tx_digests,
randomness_rounds,
)
}));
}

Expand Down Expand Up @@ -769,6 +797,7 @@ impl CheckpointExecutor {

// Logs within the function are annotated with the checkpoint sequence number and epoch,
// from schedule_checkpoint().
#[allow(clippy::type_complexity)]
#[instrument(level = "debug", skip_all, fields(seq = ?checkpoint.sequence_number(), epoch = ?epoch_store.epoch()))]
async fn execute_checkpoint(
checkpoint: VerifiedCheckpoint,
Expand All @@ -786,6 +815,7 @@ async fn execute_checkpoint(
Vec<TransactionDigest>,
Option<Accumulator>,
Option<CheckpointData>,
Vec<RandomnessRound>,
)> {
debug!("Preparing checkpoint for execution",);
let prepare_start = Instant::now();
Expand Down Expand Up @@ -829,20 +859,12 @@ async fn execute_checkpoint(
)
.await?;

// Once execution is complete, we know that any randomness contained in this checkpoint has
// been successfully included in a checkpoint certified by quorum of validators.
// (RandomnessManager/RandomnessReporter is only present on validators.)
if let Some(randomness_reporter) = epoch_store.randomness_reporter() {
for round in randomness_rounds {
debug!(
?round,
"notifying RandomnessReporter that randomness update was executed in checkpoint"
);
randomness_reporter.notify_randomness_in_checkpoint(round)?;
}
}

Ok((all_tx_digests, checkpoint_acc, checkpoint_data))
Ok((
all_tx_digests,
checkpoint_acc,
checkpoint_data,
randomness_rounds,
))
}

#[instrument(level = "error", skip_all, fields(seq = ?checkpoint.sequence_number(), epoch = ?epoch_store.epoch()))]
Expand Down
5 changes: 4 additions & 1 deletion crates/sui-core/src/consensus_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use arc_swap::ArcSwap;
use consensus_config::Committee as ConsensusCommittee;
use consensus_core::{CommitConsumerMonitor, TransactionIndex, VerifiedBlock};
use lru::LruCache;
use mysten_common::debug_fatal;
use mysten_metrics::{
monitored_future,
monitored_mpsc::{self, UnboundedReceiver},
Expand Down Expand Up @@ -353,7 +354,9 @@ impl<C: CheckpointServiceNotify + Send + Sync> ConsensusHandler<C> {
&parsed.transaction.kind
{
// These are deprecated and we should never see them. Log an error and eat the tx if one appears.
error!("BUG: saw deprecated RandomnessStateUpdate tx for commit round {round:?}, randomness round {randomness_round:?}")
debug_fatal!(
"BUG: saw deprecated RandomnessStateUpdate tx for commit round {round:?}, randomness round {randomness_round:?}"
);
} else {
let transaction =
SequencedConsensusTransactionKind::External(parsed.transaction);
Expand Down

0 comments on commit 2de66e1

Please sign in to comment.