-
Notifications
You must be signed in to change notification settings - Fork 11.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Re-execution of pending certs must happen concurrently with consensus handling, since there may be dependencies in either direction. #21000
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -436,7 +436,7 @@ impl CheckpointExecutor { | |
|
||
/// Post processing and plumbing after we executed a checkpoint. This function is guaranteed | ||
/// to be called in the order of checkpoint sequence number. | ||
#[instrument(level = "debug", skip_all)] | ||
#[instrument(level = "info", skip_all, fields(seq = ?checkpoint.sequence_number()))] | ||
async fn process_executed_checkpoint( | ||
&self, | ||
epoch_store: &AuthorityPerEpochStore, | ||
|
@@ -447,7 +447,7 @@ impl CheckpointExecutor { | |
) { | ||
// Commit all transaction effects to disk | ||
let cache_commit = self.state.get_cache_commit(); | ||
debug!(seq = ?checkpoint.sequence_number, "committing checkpoint transactions to disk"); | ||
debug!("committing checkpoint transactions to disk"); | ||
cache_commit | ||
.commit_transaction_outputs( | ||
epoch_store.epoch(), | ||
|
@@ -1040,8 +1040,8 @@ fn extract_end_of_epoch_tx( | |
let change_epoch_tx = VerifiedExecutableTransaction::new_from_checkpoint( | ||
(*change_epoch_tx.unwrap_or_else(|| | ||
panic!( | ||
"state-sync should have ensured that transaction with digest {:?} exists for checkpoint: {checkpoint:?}", | ||
digests.transaction, | ||
"state-sync should have ensured that transaction with digests {:?} exists for checkpoint: {checkpoint:?}", | ||
digests | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if you like - inline "digests" into the format str? |
||
) | ||
)).clone(), | ||
epoch_store.epoch(), | ||
|
@@ -1100,16 +1100,15 @@ fn get_unexecuted_transactions( | |
|
||
// Remove the change epoch transaction so that we can special case its execution. | ||
checkpoint.end_of_epoch_data.as_ref().tap_some(|_| { | ||
let change_epoch_tx_digest = execution_digests | ||
let digests = execution_digests | ||
.pop() | ||
.expect("Final checkpoint must have at least one transaction") | ||
.transaction; | ||
.expect("Final checkpoint must have at least one transaction"); | ||
|
||
let change_epoch_tx = cache_reader | ||
.get_transaction_block(&change_epoch_tx_digest) | ||
.get_transaction_block(&digests.transaction) | ||
.unwrap_or_else(|| | ||
panic!( | ||
"state-sync should have ensured that transaction with digest {change_epoch_tx_digest:?} exists for checkpoint: {}", | ||
"state-sync should have ensured that transaction with digests {digests:?} exists for checkpoint: {}", | ||
checkpoint.sequence_number() | ||
) | ||
); | ||
|
@@ -1138,7 +1137,7 @@ fn get_unexecuted_transactions( | |
let maybe_randomness_tx = cache_reader.get_transaction_block(&first_digest.transaction) | ||
.unwrap_or_else(|| | ||
panic!( | ||
"state-sync should have ensured that transaction with digest {first_digest:?} exists for checkpoint: {}", | ||
"state-sync should have ensured that transaction with digests {first_digest:?} exists for checkpoint: {}", | ||
checkpoint.sequence_number() | ||
) | ||
); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,12 +12,14 @@ use anyhow::Result; | |
use arc_swap::ArcSwap; | ||
use fastcrypto_zkp::bn254::zk_login::JwkId; | ||
use fastcrypto_zkp::bn254::zk_login::OIDCProvider; | ||
use futures::future::BoxFuture; | ||
use futures::TryFutureExt; | ||
use mysten_common::debug_fatal; | ||
use mysten_network::server::SUI_TLS_SERVER_NAME; | ||
use prometheus::Registry; | ||
use std::collections::{BTreeSet, HashMap, HashSet}; | ||
use std::fmt; | ||
use std::future::Future; | ||
use std::path::PathBuf; | ||
use std::str::FromStr; | ||
#[cfg(msim)] | ||
|
@@ -151,7 +153,7 @@ mod handle; | |
pub mod metrics; | ||
|
||
pub struct ValidatorComponents { | ||
validator_server_handle: JoinHandle<Result<()>>, | ||
validator_server_handle: SpawnOnce, | ||
validator_overload_monitor_handle: Option<JoinHandle<()>>, | ||
consensus_manager: ConsensusManager, | ||
consensus_store_pruner: ConsensusStorePruner, | ||
|
@@ -836,26 +838,30 @@ impl SuiNode { | |
let sui_node_metrics = Arc::new(SuiNodeMetrics::new(®istry_service.default_registry())); | ||
|
||
let validator_components = if state.is_validator(&epoch_store) { | ||
Self::reexecute_pending_consensus_certs(&epoch_store, &state).await; | ||
let (components, _) = futures::join!( | ||
Self::construct_validator_components( | ||
config.clone(), | ||
state.clone(), | ||
committee, | ||
epoch_store.clone(), | ||
checkpoint_store.clone(), | ||
state_sync_handle.clone(), | ||
randomness_handle.clone(), | ||
Arc::downgrade(&accumulator), | ||
backpressure_manager.clone(), | ||
connection_monitor_status.clone(), | ||
®istry_service, | ||
sui_node_metrics.clone(), | ||
), | ||
Self::reexecute_pending_consensus_certs(&epoch_store, &state,) | ||
); | ||
let mut components = components?; | ||
|
||
let components = Self::construct_validator_components( | ||
config.clone(), | ||
state.clone(), | ||
committee, | ||
epoch_store.clone(), | ||
checkpoint_store.clone(), | ||
state_sync_handle.clone(), | ||
randomness_handle.clone(), | ||
Arc::downgrade(&accumulator), | ||
backpressure_manager.clone(), | ||
connection_monitor_status.clone(), | ||
®istry_service, | ||
sui_node_metrics.clone(), | ||
) | ||
.await?; | ||
// This is only needed during cold start. | ||
components.consensus_adapter.submit_recovered(&epoch_store); | ||
|
||
// Start the gRPC server | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this seems subtle and not sure I get it, why does this now have to get pulled out and started earlier here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. its actually getting started later - we need to wait until all previously-executed txns have been re-executed before accepting user requests, so that we don't appear to have forgotten about a transaction |
||
components.validator_server_handle = components.validator_server_handle.start(); | ||
|
||
Some(components) | ||
} else { | ||
None | ||
|
@@ -1325,7 +1331,7 @@ impl SuiNode { | |
consensus_store_pruner: ConsensusStorePruner, | ||
accumulator: Weak<StateAccumulator>, | ||
backpressure_manager: Arc<BackpressureManager>, | ||
validator_server_handle: JoinHandle<Result<()>>, | ||
validator_server_handle: SpawnOnce, | ||
validator_overload_monitor_handle: Option<JoinHandle<()>>, | ||
checkpoint_metrics: Arc<CheckpointMetrics>, | ||
sui_node_metrics: Arc<SuiNodeMetrics>, | ||
|
@@ -1505,7 +1511,7 @@ impl SuiNode { | |
state: Arc<AuthorityState>, | ||
consensus_adapter: Arc<ConsensusAdapter>, | ||
prometheus_registry: &Registry, | ||
) -> Result<tokio::task::JoinHandle<Result<()>>> { | ||
) -> Result<SpawnOnce> { | ||
let validator_service = ValidatorService::new( | ||
state.clone(), | ||
consensus_adapter, | ||
|
@@ -1533,9 +1539,8 @@ impl SuiNode { | |
.map_err(|err| anyhow!(err.to_string()))?; | ||
let local_addr = server.local_addr(); | ||
info!("Listening to traffic on {local_addr}"); | ||
let grpc_server = spawn_monitored_task!(server.serve().map_err(Into::into)); | ||
|
||
Ok(grpc_server) | ||
Ok(SpawnOnce::new(server.serve().map_err(Into::into))) | ||
} | ||
|
||
async fn reexecute_pending_consensus_certs( | ||
|
@@ -1898,23 +1903,25 @@ impl SuiNode { | |
if self.state.is_validator(&new_epoch_store) { | ||
info!("Promoting the node from fullnode to validator, starting grpc server"); | ||
|
||
Some( | ||
Self::construct_validator_components( | ||
self.config.clone(), | ||
self.state.clone(), | ||
Arc::new(next_epoch_committee.clone()), | ||
new_epoch_store.clone(), | ||
self.checkpoint_store.clone(), | ||
self.state_sync_handle.clone(), | ||
self.randomness_handle.clone(), | ||
weak_accumulator, | ||
self.backpressure_manager.clone(), | ||
self.connection_monitor_status.clone(), | ||
&self.registry_service, | ||
self.metrics.clone(), | ||
) | ||
.await?, | ||
let mut components = Self::construct_validator_components( | ||
self.config.clone(), | ||
self.state.clone(), | ||
Arc::new(next_epoch_committee.clone()), | ||
new_epoch_store.clone(), | ||
self.checkpoint_store.clone(), | ||
self.state_sync_handle.clone(), | ||
self.randomness_handle.clone(), | ||
weak_accumulator, | ||
self.backpressure_manager.clone(), | ||
self.connection_monitor_status.clone(), | ||
&self.registry_service, | ||
self.metrics.clone(), | ||
) | ||
.await?; | ||
|
||
components.validator_server_handle = components.validator_server_handle.start(); | ||
|
||
Some(components) | ||
} else { | ||
None | ||
} | ||
|
@@ -2042,6 +2049,30 @@ impl SuiNode { | |
} | ||
} | ||
|
||
enum SpawnOnce { | ||
// Mutex is only needed to make SpawnOnce Send | ||
Unstarted(Mutex<BoxFuture<'static, Result<()>>>), | ||
#[allow(unused)] | ||
Started(JoinHandle<Result<()>>), | ||
} | ||
|
||
impl SpawnOnce { | ||
pub fn new(future: impl Future<Output = Result<()>> + Send + 'static) -> Self { | ||
Self::Unstarted(Mutex::new(Box::pin(future))) | ||
} | ||
|
||
pub fn start(self) -> Self { | ||
match self { | ||
Self::Unstarted(future) => { | ||
let future = future.into_inner(); | ||
let handle = tokio::spawn(future); | ||
Self::Started(handle) | ||
} | ||
Self::Started(_) => self, | ||
} | ||
} | ||
} | ||
|
||
/// Notify state-sync that a new list of trusted peers are now available. | ||
fn send_trusted_peer_change( | ||
config: &NodeConfig, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
did you mean to merge this "info" level bump?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, once-per-checkpoint spans are good candidates for info level imo