Skip to content

Commit

Permalink
Put upgrade specific code behind an epoch flag. Restore checkpoint re…
Browse files Browse the repository at this point in the history
…-execution at startup when upgrading
  • Loading branch information
mystenmark committed Feb 5, 2025
1 parent 7d2a59d commit 66566c7
Show file tree
Hide file tree
Showing 5 changed files with 165 additions and 23 deletions.
25 changes: 15 additions & 10 deletions crates/sui-core/src/authority/authority_per_epoch_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4042,16 +4042,21 @@ impl AuthorityPerEpochStore {
&self,
last: Option<CheckpointHeight>,
) -> SuiResult<Vec<(CheckpointHeight, PendingCheckpointV2)>> {
// TODO: Delete the db reads.
// Reading from the db table is only need when upgrading to data quarantining
// for the first time.
let tables = self.tables()?;
let mut db_iter = tables.pending_checkpoints_v2.unbounded_iter();
if let Some(last_processed_height) = last {
db_iter = db_iter.skip_to(&(last_processed_height + 1))?;
}

let db_results: Vec<_> = db_iter.collect();
let db_results = if !self
.epoch_start_config()
.is_data_quarantine_active_from_beginning_of_epoch()
{
// Reading from the db table is only need when upgrading to data quarantining
// for the first time.
let tables = self.tables()?;
let mut db_iter = tables.pending_checkpoints_v2.unbounded_iter();
if let Some(last_processed_height) = last {
db_iter = db_iter.skip_to(&(last_processed_height + 1))?;
}
db_iter.collect()
} else {
vec![]
};

let mut quarantine_results = self
.consensus_quarantine
Expand Down
31 changes: 20 additions & 11 deletions crates/sui-core/src/authority/consensus_quarantine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -362,22 +362,31 @@ impl ConsensusOutputCache {
tables: &AuthorityEpochTables,
metrics: Arc<EpochMetrics>,
) -> Self {
let shared_version_assignments =
Self::get_all_shared_version_assignments(epoch_start_configuration, tables);

let deferred_transactions = tables
.get_all_deferred_transactions()
.expect("load deferred transactions cannot fail");

let user_signatures_for_checkpoints = tables
.get_all_user_signatures_for_checkpoints()
.expect("load user signatures for checkpoints cannot fail");
if !epoch_start_configuration.is_data_quarantine_active_from_beginning_of_epoch() {
let shared_version_assignments =
Self::get_all_shared_version_assignments(epoch_start_configuration, tables);

Self {
shared_version_assignments: shared_version_assignments.into_iter().collect(),
deferred_transactions: Mutex::new(deferred_transactions),
user_signatures_for_checkpoints: Mutex::new(user_signatures_for_checkpoints),
metrics,
let user_signatures_for_checkpoints = tables
.get_all_user_signatures_for_checkpoints()
.expect("load user signatures for checkpoints cannot fail");

Self {
shared_version_assignments: shared_version_assignments.into_iter().collect(),
deferred_transactions: Mutex::new(deferred_transactions),
user_signatures_for_checkpoints: Mutex::new(user_signatures_for_checkpoints),
metrics,
}
} else {
Self {
shared_version_assignments: Default::default(),
deferred_transactions: Mutex::new(deferred_transactions),
user_signatures_for_checkpoints: Default::default(),
metrics,
}
}
}

Expand Down
17 changes: 16 additions & 1 deletion crates/sui-core/src/authority/epoch_start_configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ pub trait EpochStartConfigTrait {
self.flags()
.contains(&EpochFlag::UseVersionAssignmentTablesV3)
}

fn is_data_quarantine_active_from_beginning_of_epoch(&self) -> bool {
self.flags()
.contains(&EpochFlag::DataQuarantineFromBeginningOfEpoch)
}
}

// IMPORTANT: Assign explicit values to each variant to ensure that the values are stable.
Expand All @@ -59,6 +64,10 @@ pub enum EpochFlag {
_ExecutedInEpochTableDeprecated = 7,

UseVersionAssignmentTablesV3 = 8,

// This flag indicates whether data quarantining has been enabled from the
// beginning of the epoch.
DataQuarantineFromBeginningOfEpoch = 9,
}

impl EpochFlag {
Expand All @@ -74,7 +83,10 @@ impl EpochFlag {
}

fn default_flags_impl() -> Vec<Self> {
vec![EpochFlag::UseVersionAssignmentTablesV3]
vec![
EpochFlag::UseVersionAssignmentTablesV3,
EpochFlag::DataQuarantineFromBeginningOfEpoch,
]
}
}

Expand Down Expand Up @@ -109,6 +121,9 @@ impl fmt::Display for EpochFlag {
EpochFlag::UseVersionAssignmentTablesV3 => {
write!(f, "UseVersionAssignmentTablesV3")
}
EpochFlag::DataQuarantineFromBeginningOfEpoch => {
write!(f, "DataQuarantineFromBeginningOfEpoch")
}
}
}
}
Expand Down
104 changes: 104 additions & 0 deletions crates/sui-core/src/checkpoints/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use serde::{Deserialize, Serialize};
use sui_macros::fail_point;
use sui_network::default_mysten_network_config;
use sui_types::base_types::ConciseableName;
use sui_types::executable_transaction::VerifiedExecutableTransaction;
use sui_types::messages_checkpoint::CheckpointCommitment;
use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
use tokio::sync::watch;
Expand Down Expand Up @@ -746,6 +747,109 @@ impl CheckpointStore {
self.watermarks.rocksdb.flush()?;
Ok(())
}

/// TODO: this is only needed while upgrading from non-dataquarantine to
/// dataquarantine. After that it can be deleted.
///
/// Re-executes all transactions from all local, uncertified checkpoints for crash recovery.
/// All transactions thus re-executed are guaranteed to not have any missing dependencies,
/// because we start from the highest executed checkpoint, and proceed through checkpoints in
/// order.
#[instrument(level = "debug", skip_all)]
pub async fn reexecute_local_checkpoints(
&self,
state: &AuthorityState,
epoch_store: &AuthorityPerEpochStore,
) {
info!("rexecuting locally computed checkpoints for crash recovery");
let epoch = epoch_store.epoch();
let highest_executed = self
.get_highest_executed_checkpoint_seq_number()
.expect("get_highest_executed_checkpoint_seq_number should not fail")
.unwrap_or(0);

let Some(highest_built) = self.get_latest_locally_computed_checkpoint() else {
info!("no locally built checkpoints to verify");
return;
};

for seq in highest_executed + 1..=*highest_built.sequence_number() {
info!(?seq, "Re-executing locally computed checkpoint");
let Some(checkpoint) = self
.get_locally_computed_checkpoint(seq)
.expect("get_locally_computed_checkpoint should not fail")
else {
panic!("locally computed checkpoint {:?} not found", seq);
};

let Some(contents) = self
.get_checkpoint_contents(&checkpoint.content_digest)
.expect("get_checkpoint_contents should not fail")
else {
panic!("checkpoint contents not found for locally computed checkpoint {:?} (digest: {:?})", seq, checkpoint.content_digest);
};

let cache = state.get_transaction_cache_reader();

let tx_digests: Vec<_> = contents.iter().map(|digests| digests.transaction).collect();
let fx_digests: Vec<_> = contents.iter().map(|digests| digests.effects).collect();
let txns = cache.multi_get_transaction_blocks(&tx_digests);
for (tx, digest) in txns.iter().zip(tx_digests.iter()) {
if tx.is_none() {
panic!("transaction {:?} not found", digest);
}
}

let txns: Vec<_> = txns
.into_iter()
.map(|tx| tx.unwrap())
.zip(fx_digests.into_iter())
// end of epoch transaction can only be executed by CheckpointExecutor
.filter(|(tx, _)| !tx.data().transaction_data().is_end_of_epoch_tx())
.map(|(tx, fx)| {
(
VerifiedExecutableTransaction::new_from_checkpoint(
(*tx).clone(),
epoch,
seq,
),
fx,
)
})
.collect();

let tx_digests: Vec<_> = txns.iter().map(|(tx, _)| *tx.digest()).collect();

info!(
?seq,
?tx_digests,
"Re-executing transactions for locally built checkpoint"
);
// this will panic if any re-execution diverges from the previously recorded effects digest
state.enqueue_with_expected_effects_digest(txns, epoch_store);

// a task that logs every so often until it is cancelled
// This should normally finish very quickly, so seeing this log more than once or twice is
// likely a sign of a problem.
let waiting_logger = tokio::task::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(1));
loop {
interval.tick().await;
warn!(?seq, "Still waiting for re-execution to complete");
}
});

cache
.notify_read_executed_effects_digests(&tx_digests)
.await;

waiting_logger.abort();
waiting_logger.await.ok();
info!(?seq, "Re-execution completed for locally built checkpoint");
}

info!("Re-execution of locally built checkpoints completed");
}
}

#[derive(Copy, Clone, Debug, Serialize, Deserialize)]
Expand Down
11 changes: 10 additions & 1 deletion crates/sui-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1339,7 +1339,7 @@ impl SuiNode {
let checkpoint_service = Self::build_checkpoint_service(
config,
consensus_adapter.clone(),
checkpoint_store,
checkpoint_store.clone(),
epoch_store.clone(),
state.clone(),
state_sync_handle,
Expand Down Expand Up @@ -1416,6 +1416,15 @@ impl SuiNode {
)
.await;

if !epoch_store
.epoch_start_config()
.is_data_quarantine_active_from_beginning_of_epoch()
{
checkpoint_store
.reexecute_local_checkpoints(&state, &epoch_store)
.await;
}

info!("Spawning checkpoint service");
let checkpoint_service_tasks = checkpoint_service.spawn().await;

Expand Down

0 comments on commit 66566c7

Please sign in to comment.