Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Re-execution of pending certs must happen concurrently with consensus handling, since there may be dependencies in either direction. #21000

Merged
merged 2 commits into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1255,7 +1255,9 @@ impl AuthorityState {
)
.await
.tap_err(|e| info!("process_certificate failed: {e}"))
.tap_ok(|_| debug!("process_certificate succeeded"))
.tap_ok(
|(fx, _)| debug!(?tx_digest, fx_digest=?fx.digest(), "process_certificate succeeded"),
)
}

pub fn read_objects_for_execution(
Expand Down
19 changes: 9 additions & 10 deletions crates/sui-core/src/checkpoints/checkpoint_executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ impl CheckpointExecutor {

/// Post processing and plumbing after we executed a checkpoint. This function is guaranteed
/// to be called in the order of checkpoint sequence number.
#[instrument(level = "debug", skip_all)]
#[instrument(level = "info", skip_all, fields(seq = ?checkpoint.sequence_number()))]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did you mean to merge this "info" level bump?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, once-per-checkpoint spans are good candidates for info level imo

async fn process_executed_checkpoint(
&self,
epoch_store: &AuthorityPerEpochStore,
Expand All @@ -447,7 +447,7 @@ impl CheckpointExecutor {
) {
// Commit all transaction effects to disk
let cache_commit = self.state.get_cache_commit();
debug!(seq = ?checkpoint.sequence_number, "committing checkpoint transactions to disk");
debug!("committing checkpoint transactions to disk");
cache_commit
.commit_transaction_outputs(
epoch_store.epoch(),
Expand Down Expand Up @@ -1040,8 +1040,8 @@ fn extract_end_of_epoch_tx(
let change_epoch_tx = VerifiedExecutableTransaction::new_from_checkpoint(
(*change_epoch_tx.unwrap_or_else(||
panic!(
"state-sync should have ensured that transaction with digest {:?} exists for checkpoint: {checkpoint:?}",
digests.transaction,
"state-sync should have ensured that transaction with digests {:?} exists for checkpoint: {checkpoint:?}",
digests
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you like - inline "digests" into the format str?

)
)).clone(),
epoch_store.epoch(),
Expand Down Expand Up @@ -1100,16 +1100,15 @@ fn get_unexecuted_transactions(

// Remove the change epoch transaction so that we can special case its execution.
checkpoint.end_of_epoch_data.as_ref().tap_some(|_| {
let change_epoch_tx_digest = execution_digests
let digests = execution_digests
.pop()
.expect("Final checkpoint must have at least one transaction")
.transaction;
.expect("Final checkpoint must have at least one transaction");

let change_epoch_tx = cache_reader
.get_transaction_block(&change_epoch_tx_digest)
.get_transaction_block(&digests.transaction)
.unwrap_or_else(||
panic!(
"state-sync should have ensured that transaction with digest {change_epoch_tx_digest:?} exists for checkpoint: {}",
"state-sync should have ensured that transaction with digests {digests:?} exists for checkpoint: {}",
checkpoint.sequence_number()
)
);
Expand Down Expand Up @@ -1138,7 +1137,7 @@ fn get_unexecuted_transactions(
let maybe_randomness_tx = cache_reader.get_transaction_block(&first_digest.transaction)
.unwrap_or_else(||
panic!(
"state-sync should have ensured that transaction with digest {first_digest:?} exists for checkpoint: {}",
"state-sync should have ensured that transaction with digests {first_digest:?} exists for checkpoint: {}",
checkpoint.sequence_number()
)
);
Expand Down
107 changes: 69 additions & 38 deletions crates/sui-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@ use anyhow::Result;
use arc_swap::ArcSwap;
use fastcrypto_zkp::bn254::zk_login::JwkId;
use fastcrypto_zkp::bn254::zk_login::OIDCProvider;
use futures::future::BoxFuture;
use futures::TryFutureExt;
use mysten_common::debug_fatal;
use mysten_network::server::SUI_TLS_SERVER_NAME;
use prometheus::Registry;
use std::collections::{BTreeSet, HashMap, HashSet};
use std::fmt;
use std::future::Future;
use std::path::PathBuf;
use std::str::FromStr;
#[cfg(msim)]
Expand Down Expand Up @@ -151,7 +153,7 @@ mod handle;
pub mod metrics;

pub struct ValidatorComponents {
validator_server_handle: JoinHandle<Result<()>>,
validator_server_handle: SpawnOnce,
validator_overload_monitor_handle: Option<JoinHandle<()>>,
consensus_manager: ConsensusManager,
consensus_store_pruner: ConsensusStorePruner,
Expand Down Expand Up @@ -836,26 +838,30 @@ impl SuiNode {
let sui_node_metrics = Arc::new(SuiNodeMetrics::new(&registry_service.default_registry()));

let validator_components = if state.is_validator(&epoch_store) {
Self::reexecute_pending_consensus_certs(&epoch_store, &state).await;
let (components, _) = futures::join!(
Self::construct_validator_components(
config.clone(),
state.clone(),
committee,
epoch_store.clone(),
checkpoint_store.clone(),
state_sync_handle.clone(),
randomness_handle.clone(),
Arc::downgrade(&accumulator),
backpressure_manager.clone(),
connection_monitor_status.clone(),
&registry_service,
sui_node_metrics.clone(),
),
Self::reexecute_pending_consensus_certs(&epoch_store, &state,)
);
let mut components = components?;

let components = Self::construct_validator_components(
config.clone(),
state.clone(),
committee,
epoch_store.clone(),
checkpoint_store.clone(),
state_sync_handle.clone(),
randomness_handle.clone(),
Arc::downgrade(&accumulator),
backpressure_manager.clone(),
connection_monitor_status.clone(),
&registry_service,
sui_node_metrics.clone(),
)
.await?;
// This is only needed during cold start.
components.consensus_adapter.submit_recovered(&epoch_store);

// Start the gRPC server
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems subtle and not sure I get it, why does this now have to get pulled out and started earlier here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its actually getting started later - we need to wait until all previously-executed txns have been re-executed before accepting user requests, so that we don't appear to have forgotten about a transaction

components.validator_server_handle = components.validator_server_handle.start();

Some(components)
} else {
None
Expand Down Expand Up @@ -1325,7 +1331,7 @@ impl SuiNode {
consensus_store_pruner: ConsensusStorePruner,
accumulator: Weak<StateAccumulator>,
backpressure_manager: Arc<BackpressureManager>,
validator_server_handle: JoinHandle<Result<()>>,
validator_server_handle: SpawnOnce,
validator_overload_monitor_handle: Option<JoinHandle<()>>,
checkpoint_metrics: Arc<CheckpointMetrics>,
sui_node_metrics: Arc<SuiNodeMetrics>,
Expand Down Expand Up @@ -1505,7 +1511,7 @@ impl SuiNode {
state: Arc<AuthorityState>,
consensus_adapter: Arc<ConsensusAdapter>,
prometheus_registry: &Registry,
) -> Result<tokio::task::JoinHandle<Result<()>>> {
) -> Result<SpawnOnce> {
let validator_service = ValidatorService::new(
state.clone(),
consensus_adapter,
Expand Down Expand Up @@ -1533,9 +1539,8 @@ impl SuiNode {
.map_err(|err| anyhow!(err.to_string()))?;
let local_addr = server.local_addr();
info!("Listening to traffic on {local_addr}");
let grpc_server = spawn_monitored_task!(server.serve().map_err(Into::into));

Ok(grpc_server)
Ok(SpawnOnce::new(server.serve().map_err(Into::into)))
}

async fn reexecute_pending_consensus_certs(
Expand Down Expand Up @@ -1898,23 +1903,25 @@ impl SuiNode {
if self.state.is_validator(&new_epoch_store) {
info!("Promoting the node from fullnode to validator, starting grpc server");

Some(
Self::construct_validator_components(
self.config.clone(),
self.state.clone(),
Arc::new(next_epoch_committee.clone()),
new_epoch_store.clone(),
self.checkpoint_store.clone(),
self.state_sync_handle.clone(),
self.randomness_handle.clone(),
weak_accumulator,
self.backpressure_manager.clone(),
self.connection_monitor_status.clone(),
&self.registry_service,
self.metrics.clone(),
)
.await?,
let mut components = Self::construct_validator_components(
self.config.clone(),
self.state.clone(),
Arc::new(next_epoch_committee.clone()),
new_epoch_store.clone(),
self.checkpoint_store.clone(),
self.state_sync_handle.clone(),
self.randomness_handle.clone(),
weak_accumulator,
self.backpressure_manager.clone(),
self.connection_monitor_status.clone(),
&self.registry_service,
self.metrics.clone(),
)
.await?;

components.validator_server_handle = components.validator_server_handle.start();

Some(components)
} else {
None
}
Expand Down Expand Up @@ -2042,6 +2049,30 @@ impl SuiNode {
}
}

enum SpawnOnce {
// Mutex is only needed to make SpawnOnce Send
Unstarted(Mutex<BoxFuture<'static, Result<()>>>),
#[allow(unused)]
Started(JoinHandle<Result<()>>),
}

impl SpawnOnce {
pub fn new(future: impl Future<Output = Result<()>> + Send + 'static) -> Self {
Self::Unstarted(Mutex::new(Box::pin(future)))
}

pub fn start(self) -> Self {
match self {
Self::Unstarted(future) => {
let future = future.into_inner();
let handle = tokio::spawn(future);
Self::Started(handle)
}
Self::Started(_) => self,
}
}
}

/// Notify state-sync that a new list of trusted peers are now available.
fn send_trusted_peer_change(
config: &NodeConfig,
Expand Down
Loading