Skip to content

Commit

Permalink
Re-execute single-writer pending consensus transactions at startup (#…
Browse files Browse the repository at this point in the history
…20974)

- Re-execute pending single writer transactions at startup
- Do not wait forever for checkpoints to be rebuilt (this is thoroughly
tested not to happen, but I realized that waiting forever here in prod
does not actually buy us anything).
  • Loading branch information
mystenmark authored Jan 25, 2025
1 parent 45094d8 commit 9595c00
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 13 deletions.
2 changes: 1 addition & 1 deletion crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3037,7 +3037,7 @@ impl AuthorityState {
.enqueue_certificates(certs, epoch_store)
}

pub(crate) fn enqueue_with_expected_effects_digest(
pub fn enqueue_with_expected_effects_digest(
&self,
certs: Vec<(VerifiedExecutableTransaction, TransactionEffectsDigest)>,
epoch_store: &AuthorityPerEpochStore,
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-core/src/authority/authority_per_epoch_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1298,7 +1298,7 @@ impl AuthorityPerEpochStore {
tx_digest: &TransactionDigest,
) -> SuiResult {
let tables = self.tables()?;
let mut batch = self.tables()?.effects_signatures.batch();
let mut batch = self.tables()?.executed_in_epoch.batch();

batch.insert_batch(&tables.executed_in_epoch, [(tx_digest, ())])?;

Expand Down
16 changes: 7 additions & 9 deletions crates/sui-core/src/checkpoints/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2346,15 +2346,13 @@ impl CheckpointService {
tasks.spawn(monitored_future!(builder.run()));
tasks.spawn(monitored_future!(aggregator.run()));

loop {
if tokio::time::timeout(Duration::from_secs(10), self.wait_for_rebuilt_checkpoints())
.await
.is_ok()
{
break;
} else {
debug_fatal!("Still waiting for checkpoints to be rebuilt");
}
// If this times out, the validator may still start up. The worst that can
// happen is that we will crash later on (due to missing transactions).
if tokio::time::timeout(Duration::from_secs(10), self.wait_for_rebuilt_checkpoints())
.await
.is_err()
{
debug_fatal!("Timed out waiting for checkpoints to be rebuilt");
}

tasks
Expand Down
71 changes: 69 additions & 2 deletions crates/sui-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use arc_swap::ArcSwap;
use fastcrypto_zkp::bn254::zk_login::JwkId;
use fastcrypto_zkp::bn254::zk_login::OIDCProvider;
use futures::TryFutureExt;
use mysten_common::debug_fatal;
use mysten_network::server::SUI_TLS_SERVER_NAME;
use prometheus::Registry;
use std::collections::{BTreeSet, HashMap, HashSet};
Expand Down Expand Up @@ -44,9 +45,12 @@ use sui_rpc_api::RpcMetrics;
use sui_types::base_types::ConciseableName;
use sui_types::crypto::RandomnessRound;
use sui_types::digests::ChainIdentifier;
use sui_types::executable_transaction::VerifiedExecutableTransaction;
use sui_types::full_checkpoint_content::CheckpointData;
use sui_types::messages_consensus::AuthorityCapabilitiesV2;
use sui_types::messages_consensus::ConsensusTransactionKind;
use sui_types::sui_system_state::SuiSystemState;
use sui_types::transaction::VerifiedCertificate;
use tap::tap::TapFallible;
use tokio::runtime::Handle;
use tokio::sync::{broadcast, mpsc, watch, Mutex};
Expand Down Expand Up @@ -832,6 +836,8 @@ impl SuiNode {
let sui_node_metrics = Arc::new(SuiNodeMetrics::new(&registry_service.default_registry()));

let validator_components = if state.is_validator(&epoch_store) {
Self::reexecute_pending_consensus_certs(&epoch_store, &state).await;

let components = Self::construct_validator_components(
config.clone(),
state.clone(),
Expand Down Expand Up @@ -1325,7 +1331,7 @@ impl SuiNode {
sui_node_metrics: Arc<SuiNodeMetrics>,
sui_tx_validator_metrics: Arc<SuiTxValidatorMetrics>,
) -> Result<ValidatorComponents> {
let checkpoint_service = Self::start_checkpoint_service(
let checkpoint_service = Self::build_checkpoint_service(
config,
consensus_adapter.clone(),
checkpoint_store,
Expand Down Expand Up @@ -1421,7 +1427,7 @@ impl SuiNode {
})
}

fn start_checkpoint_service(
fn build_checkpoint_service(
config: &NodeConfig,
consensus_adapter: Arc<ConsensusAdapter>,
checkpoint_store: Arc<CheckpointStore>,
Expand Down Expand Up @@ -1532,6 +1538,67 @@ impl SuiNode {
Ok(grpc_server)
}

async fn reexecute_pending_consensus_certs(
epoch_store: &Arc<AuthorityPerEpochStore>,
state: &Arc<AuthorityState>,
) {
let pending_consensus_certificates = epoch_store
.get_all_pending_consensus_transactions()
.into_iter()
.filter_map(|tx| {
match tx.kind {
// shared object txns will be re-executed by consensus replay
ConsensusTransactionKind::CertifiedTransaction(tx)
if !tx.contains_shared_object() =>
{
let tx = *tx;
// we only need to re-execute if we previously signed the effects (which indicates we
// returned the effects to a client).
if let Some(fx_digest) = epoch_store
.get_signed_effects_digest(tx.digest())
.expect("db error")
{
// new_unchecked is safe because we never submit a transaction to consensus
// without verifying it
let tx = VerifiedExecutableTransaction::new_from_certificate(
VerifiedCertificate::new_unchecked(tx),
);
Some((tx, fx_digest))
} else {
None
}
}
_ => None,
}
})
.collect::<Vec<_>>();

let digests = pending_consensus_certificates
.iter()
.map(|(tx, _)| *tx.digest())
.collect::<Vec<_>>();

info!("reexecuting pending consensus certificates: {:?}", digests);

state.enqueue_with_expected_effects_digest(pending_consensus_certificates, epoch_store);

// If this times out, the validator will still almost certainly start up fine. But, it is
// possible that it may temporarily "forget" about transactions that it had previously
// executed. This could confuse clients in some circumstances. However, the transactions
// are still in pending_consensus_certificates, so we cannot lose any finality guarantees.
if tokio::time::timeout(
std::time::Duration::from_secs(10),
state
.get_transaction_cache_reader()
.notify_read_executed_effects_digests(&digests),
)
.await
.is_err()
{
debug_fatal!("Timed out waiting for effects digests to be executed");
}
}

pub fn state(&self) -> Arc<AuthorityState> {
self.state.clone()
}
Expand Down

0 comments on commit 9595c00

Please sign in to comment.