From 288eafa87a303ce8895c5f5562617ff94aa42df4 Mon Sep 17 00:00:00 2001 From: Xun Li Date: Tue, 23 Aug 2022 22:53:25 -0700 Subject: [PATCH 1/2] [reconfig] Allow sync last checkpoint tx when validator is halted --- crates/sui-core/src/authority.rs | 39 +++- .../authority_active/checkpoint_driver/mod.rs | 82 ++++--- crates/sui-core/src/checkpoints/mod.rs | 4 +- crates/sui-core/src/node_sync/node_state.rs | 49 +++- .../src/unit_tests/authority_tests.rs | 2 +- crates/sui/tests/reconfiguration_tests.rs | 220 +++++++++++++----- crates/test-utils/src/authority.rs | 17 +- 7 files changed, 301 insertions(+), 112 deletions(-) diff --git a/crates/sui-core/src/authority.rs b/crates/sui-core/src/authority.rs index 2fa798e537ba0..bd49c2b1ebe47 100644 --- a/crates/sui-core/src/authority.rs +++ b/crates/sui-core/src/authority.rs @@ -513,10 +513,13 @@ impl AuthorityState { } } - /// We cannot use handle_certificate in fullnode to execute a certificate because there is no - /// consensus engine to assign locks for shared objects. Hence we need special handling here. + /// Execute a certificate that's know to have already finalized (i.e. executed by a quorum of + /// validators). For such certificate, we don't have to wait for consensus to set shared object + /// locks because we already know the shared object versions based on the effects. + /// This function can be called either by a fullnode after seeing a quorum of signed effects, + /// or by a validator after seeing the certificate included by a certified checkpoint. #[instrument(level = "trace", skip_all)] - pub async fn handle_node_sync_certificate( + pub async fn handle_finalized_certificate( &self, certificate: CertifiedTransaction, // Signed effects is signed by only one validator, it is not a @@ -528,7 +531,7 @@ impl AuthorityState { ) -> SuiResult { let _metrics_guard = start_timer(self.metrics.handle_node_sync_certificate_latency.clone()); let digest = *certificate.digest(); - debug!(?digest, "handle_node_sync_transaction"); + debug!(?digest, "handle_finalized_certificate"); fp_ensure!( signed_effects.effects.transaction_digest == digest, SuiError::ErrorWhileProcessingConfirmationTransaction { @@ -547,7 +550,7 @@ impl AuthorityState { } let resp = self - .process_certificate(tx_guard, &certificate) + .process_certificate(tx_guard, &certificate, true) .await .tap_err(|e| debug!(?digest, "process_certificate failed: {}", e))?; @@ -571,7 +574,23 @@ impl AuthorityState { certificate: CertifiedTransaction, ) -> SuiResult { let _metrics_guard = start_timer(self.metrics.handle_certificate_latency.clone()); + self.handle_certificate_impl(certificate, false).await + } + + #[instrument(level = "trace", skip_all)] + pub async fn handle_certificate_bypass_validator_halt( + &self, + certificate: CertifiedTransaction, + ) -> SuiResult { + self.handle_certificate_impl(certificate, true).await + } + #[instrument(level = "trace", skip_all)] + async fn handle_certificate_impl( + &self, + certificate: CertifiedTransaction, + bypass_validator_halt: bool, + ) -> SuiResult { self.metrics.total_cert_attempts.inc(); if self.is_fullnode() { return Err(SuiError::GenericStorageError( @@ -603,7 +622,7 @@ impl AuthorityState { .instrument(span) .await?; - self.process_certificate(tx_guard, &certificate) + self.process_certificate(tx_guard, &certificate, bypass_validator_halt) .await .tap_err(|e| debug!(?tx_digest, "process_certificate failed: {}", e)) } @@ -670,6 +689,7 @@ impl AuthorityState { &self, tx_guard: CertTxGuard<'_>, certificate: &CertifiedTransaction, + bypass_validator_halt: bool, ) -> SuiResult { let digest = *certificate.digest(); // The cert could have been processed by a concurrent attempt of the same cert, so check if @@ -679,7 +699,10 @@ impl AuthorityState { return Ok(info); } - if self.is_halted() && !certificate.signed_data.data.kind.is_system_tx() { + if self.is_halted() + && !bypass_validator_halt + && !certificate.signed_data.data.kind.is_system_tx() + { tx_guard.release(); // TODO: Do we want to include the new validator set? return Err(SuiError::ValidatorHaltedAtEpochEnd); @@ -1386,7 +1409,7 @@ impl AuthorityState { continue; } - if let Err(e) = self.process_certificate(tx_guard, &cert).await { + if let Err(e) = self.process_certificate(tx_guard, &cert, false).await { warn!(?digest, "Failed to process in-progress certificate: {}", e); } } else { diff --git a/crates/sui-core/src/authority_active/checkpoint_driver/mod.rs b/crates/sui-core/src/authority_active/checkpoint_driver/mod.rs index 9b39a3c933534..9ca94c26d61b9 100644 --- a/crates/sui-core/src/authority_active/checkpoint_driver/mod.rs +++ b/crates/sui-core/src/authority_active/checkpoint_driver/mod.rs @@ -639,14 +639,14 @@ where &available_authorities, ) .await?; - state_checkpoints - .lock() - .process_new_checkpoint_certificate( - checkpoint, - &contents, - committee, - active_authority.state.database.clone(), - )?; + process_new_checkpoint_certificate( + active_authority, + state_checkpoints, + committee, + checkpoint, + &contents, + ) + .await?; info!( cp_seq=?checkpoint.summary.sequence_number(), "Stored new checkpoint certificate", @@ -700,37 +700,55 @@ where let (past, contents) = get_one_checkpoint_with_contents(net.clone(), seq, &available_authorities).await?; - let errors = active_authority - .node_sync_handle() - .sync_checkpoint_cert_transactions(&contents) - .await? - .zip(futures::stream::iter(contents.iter())) - .filter_map(|(r, digests)| async move { - r.map_err(|e| { - info!(?digests, "failed to execute digest from checkpoint: {}", e); - e - }) - .err() - }) - .collect::>() - .await; - - if !errors.is_empty() { - let error = "Failed to sync transactions in checkpoint".to_string(); - error!(?seq, "{}", error); - return Err(SuiError::CheckpointingError { error }); - } - - checkpoint_db.lock().process_synced_checkpoint_certificate( + process_new_checkpoint_certificate( + active_authority, + &checkpoint_db, + &net.committee, &past, &contents, - &net.committee, - )?; + ) + .await?; } Ok(()) } +async fn process_new_checkpoint_certificate( + active_authority: &ActiveAuthority, + checkpoint_db: &Arc>, + committee: &Committee, + checkpoint_cert: &CertifiedCheckpointSummary, + contents: &CheckpointContents, +) -> SuiResult +where + A: AuthorityAPI + Send + Sync + 'static + Clone, +{ + let errors = active_authority + .node_sync_handle() + .sync_checkpoint_cert_transactions(contents) + .await? + .zip(futures::stream::iter(contents.iter())) + .filter_map(|(r, digests)| async move { + r.map_err(|e| { + info!(?digests, "failed to execute digest from checkpoint: {}", e); + e + }) + .err() + }) + .collect::>() + .await; + + if !errors.is_empty() { + let error = "Failed to sync transactions in checkpoint".to_string(); + error!(cp_seq=?checkpoint_cert.summary.sequence_number, "{}", error); + return Err(SuiError::CheckpointingError { error }); + } + + checkpoint_db + .lock() + .process_synced_checkpoint_certificate(checkpoint_cert, contents, committee) +} + pub async fn get_one_checkpoint_with_contents( net: Arc>, sequence_number: CheckpointSequenceNumber, diff --git a/crates/sui-core/src/checkpoints/mod.rs b/crates/sui-core/src/checkpoints/mod.rs index 93c89fd5901ae..4832d80b20f67 100644 --- a/crates/sui-core/src/checkpoints/mod.rs +++ b/crates/sui-core/src/checkpoints/mod.rs @@ -506,8 +506,10 @@ impl CheckpointStore { // Send to consensus for sequencing. if let Some(sender) = &self.sender { - debug!("Send fragment: {} -- {}", self.name, other_name); + let seq = fragment.proposer.summary.sequence_number; + debug!(cp_seq=?seq, "Sending fragment: {} -- {}", self.name, other_name); sender.send_to_consensus(fragment.clone())?; + debug!(cp_seq=?seq, "Fragment successfully sent: {} -- {}", self.name, other_name); } else { return Err(SuiError::from("No consensus sender configured")); } diff --git a/crates/sui-core/src/node_sync/node_state.rs b/crates/sui-core/src/node_sync/node_state.rs index 02c4cefbb54e1..ab329c27f15d7 100644 --- a/crates/sui-core/src/node_sync/node_state.rs +++ b/crates/sui-core/src/node_sync/node_state.rs @@ -102,6 +102,13 @@ impl DigestsMessage { } } + fn new_for_pending_ckpt(digest: &TransactionDigest, tx: oneshot::Sender) -> Self { + Self { + sync_arg: SyncArg::PendingCheckpoint(*digest), + tx: Some(tx), + } + } + fn new_for_exec_driver(digest: &TransactionDigest, tx: oneshot::Sender) -> Self { Self { sync_arg: SyncArg::ExecDriver(*digest), @@ -140,6 +147,13 @@ pub enum SyncArg { /// In checkpoint mode, all txes are known to be final. Checkpoint(ExecutionDigests), + /// Transactions in the current checkpoint to be signed/stored. + /// We don't have the effect digest since we may not have it when constructing the checkpoint. + /// The primary difference between PendingCheckpoint and ExecDriver is that PendingCheckpoint + /// sync can by-pass validator halting. This is to ensure that the last checkpoint of the epoch + /// can always be formed when there are missing transactions. + PendingCheckpoint(TransactionDigest), + /// Used by the execution driver to execute pending certs. No effects digest is provided, /// because this mode is used on validators only, who must compute the effects digest /// themselves - they cannot trust some other validator's version of the effects because that @@ -165,7 +179,9 @@ impl SyncArg { effects, }, ) => (transaction, Some(effects)), - SyncArg::Parent(digest) | SyncArg::ExecDriver(digest) => (digest, None), + SyncArg::Parent(digest) + | SyncArg::ExecDriver(digest) + | SyncArg::PendingCheckpoint(digest) => (digest, None), } } } @@ -417,7 +433,7 @@ where match self .state - .handle_node_sync_certificate(cert.clone(), effects.clone()) + .handle_finalized_certificate(cert.clone(), effects.clone()) .await { Ok(_) => Ok(SyncStatus::CertExecuted), @@ -429,11 +445,19 @@ where &self, permit: OwnedSemaphorePermit, digest: &TransactionDigest, + bypass_validator_halt: bool, ) -> SyncResult { trace!(?digest, "validator pending execution requested"); let cert = self.get_cert(digest).await?; - match self.state.handle_certificate(cert.clone()).await { + let result = if bypass_validator_halt { + self.state + .handle_certificate_bypass_validator_halt(cert.clone()) + .await + } else { + self.state.handle_certificate(cert.clone()).await + }; + match result { Ok(_) => Ok(SyncStatus::CertExecuted), Err(SuiError::ObjectNotFound { .. }) | Err(SuiError::ObjectErrors { .. }) @@ -451,7 +475,13 @@ where // Parents have been executed, so this should now succeed. debug!(?digest, "parents executed, re-attempting cert"); - self.state.handle_certificate(cert.clone()).await?; + if bypass_validator_halt { + self.state + .handle_certificate_bypass_validator_halt(cert.clone()) + .await + } else { + self.state.handle_certificate(cert.clone()).await + }?; Ok(SyncStatus::CertExecuted) } Err(e) => Err(e), @@ -490,7 +520,12 @@ where // down. let (digests, authorities_with_cert) = match arg { SyncArg::ExecDriver(digest) => { - return self.process_exec_driver_digest(permit, &digest).await; + return self + .process_exec_driver_digest(permit, &digest, false) + .await; + } + SyncArg::PendingCheckpoint(digest) => { + return self.process_exec_driver_digest(permit, &digest, true).await; } SyncArg::Parent(digest) => { // digest is known to be final because it appeared in the dependencies list of a @@ -566,7 +601,7 @@ where .await?; self.state - .handle_node_sync_certificate(cert, effects.clone()) + .handle_finalized_certificate(cert, effects.clone()) .await?; Ok(SyncStatus::CertExecuted) @@ -869,7 +904,7 @@ impl NodeSyncHandle { let mut futures = FuturesOrdered::new(); for digests in transactions { let (tx, rx) = oneshot::channel(); - let msg = DigestsMessage::new_for_exec_driver(&digests.transaction, tx); + let msg = DigestsMessage::new_for_pending_ckpt(&digests.transaction, tx); Self::send_msg_with_tx(self.sender.clone(), msg).await?; futures.push_back(Self::map_rx(rx)); } diff --git a/crates/sui-core/src/unit_tests/authority_tests.rs b/crates/sui-core/src/unit_tests/authority_tests.rs index 4fc16756683e9..e95d521d88870 100644 --- a/crates/sui-core/src/unit_tests/authority_tests.rs +++ b/crates/sui-core/src/unit_tests/authority_tests.rs @@ -2425,7 +2425,7 @@ async fn test_consensus_message_processed() { handle_cert(&authority2, &certificate).await } else { authority2 - .handle_node_sync_certificate(certificate.clone(), effects1.clone()) + .handle_finalized_certificate(certificate.clone(), effects1.clone()) .await .unwrap(); authority2 diff --git a/crates/sui/tests/reconfiguration_tests.rs b/crates/sui/tests/reconfiguration_tests.rs index 06d286b1c9b1b..36ccd5fb0927b 100644 --- a/crates/sui/tests/reconfiguration_tests.rs +++ b/crates/sui/tests/reconfiguration_tests.rs @@ -4,28 +4,26 @@ use futures::future::join_all; use multiaddr::Multiaddr; use prometheus::Registry; +use std::time::Duration; use sui_config::ValidatorInfo; use sui_core::authority_active::checkpoint_driver::{ checkpoint_process_step, CheckpointProcessControl, }; -use sui_core::authority_client::AuthorityAPI; -use sui_core::safe_client::SafeClient; use sui_node::SuiNode; -use sui_types::base_types::{ObjectID, ObjectRef}; +use sui_types::base_types::{ObjectRef, SequenceNumber, SuiAddress}; use sui_types::crypto::{ generate_proof_of_possession, get_key_pair, AuthorityKeyPair, AuthoritySignature, KeypairTraits, NetworkKeyPair, }; use sui_types::error::SuiResult; -use sui_types::messages::ObjectInfoResponse; -use sui_types::messages::{CallArg, ObjectArg, ObjectInfoRequest, TransactionEffects}; +use sui_types::messages::{CallArg, ObjectArg, TransactionEffects}; use sui_types::object::Object; use sui_types::SUI_SYSTEM_STATE_OBJECT_ID; -use test_utils::authority::test_authority_configs; -use test_utils::messages::move_transaction; +use test_utils::authority::{get_object, test_authority_configs}; +use test_utils::messages::{make_transfer_sui_transaction, move_transaction}; use test_utils::objects::{generate_gas_object_with_balance, test_gas_objects}; use test_utils::test_account_keys; -use test_utils::transaction::submit_shared_object_transaction; +use test_utils::transaction::{submit_shared_object_transaction, submit_single_owner_transaction}; #[tokio::test(flavor = "current_thread")] async fn reconfig_end_to_end_tests() { @@ -86,8 +84,99 @@ async fn reconfig_end_to_end_tests() { let new_committee_size = sui_system_state.validators.next_epoch_validators.len(); assert_eq!(old_committee_size + 1, new_committee_size); - let mut checkpoint_processes = vec![]; + fast_forward_to_ready_for_reconfig_start(&nodes).await; + + // Start epoch change and halt all validators. + for node in &nodes { + node.active().start_epoch_change().await.unwrap(); + } + + fast_forward_to_ready_for_reconfig_finish(&nodes).await; + + let results: Vec<_> = nodes + .iter() + .map(|node| async { + node.active().finish_epoch_change().await.unwrap(); + }) + .collect(); + + futures::future::join_all(results).await; + + // refresh the system state and network addresses + let sui_system_state = states[0].get_sui_system_state_object().await.unwrap(); + assert_eq!(sui_system_state.epoch, 1); + // We should now have one more active validator. + assert_eq!(sui_system_state.validators.active_validators.len(), 5); +} + +#[tokio::test(flavor = "current_thread")] +async fn reconfig_last_checkpoint_sync_missing_tx() { + telemetry_subscribers::init_for_testing(); + + let mut configs = test_authority_configs(); + for c in configs.validator_configs.iter_mut() { + // Turn off checkpoint process so that we can have fine control over it in the test. + c.enable_checkpoint = false; + } + let validator_info = configs.validator_set(); + let mut gas_objects = test_gas_objects(); + let mut states = Vec::new(); + let mut nodes = Vec::new(); + for validator in configs.validator_configs() { + let node = SuiNode::start(validator, Registry::new()).await.unwrap(); + let state = node.state(); + + for gas in gas_objects.clone() { + state.insert_genesis_object(gas).await; + } + states.push(state); + nodes.push(node); + } + + fast_forward_to_ready_for_reconfig_start(&nodes).await; + + let (sender, key_pair) = test_account_keys().pop().unwrap(); + let object_ref = gas_objects.pop().unwrap().compute_object_reference(); + let transaction = make_transfer_sui_transaction( + object_ref, + SuiAddress::random_for_testing_only(), + None, + sender, + &key_pair, + ); + // Only send the transaction to validator 0, but not other validators. + // Since gossip is disabled by default, validator 1-3 will not see it. + submit_single_owner_transaction(transaction, &validator_info[0..1]).await; + tokio::time::sleep(Duration::from_secs(10)).await; + for (idx, validator) in validator_info.iter().enumerate() { + // Check that the object is mutated on validator 0 only. + assert_eq!( + get_object(validator, object_ref.0).await.version(), + SequenceNumber::from(if idx == 0 { 1 } else { 0 }) + ); + } + + // Start epoch change and halt all validators. for node in &nodes { + node.active().start_epoch_change().await.unwrap(); + } + + // Create a proposal on validator 0, which ensures that the transaction above will be included + // in the checkpoint. + nodes[0] + .state() + .checkpoints + .as_ref() + .unwrap() + .lock() + .set_proposal(0) + .unwrap(); + let mut checkpoint_processes = vec![]; + // Only validator 1 and 2 will participate the checkpoint progress, which will use fragments + // involving validator 0, 1, 2. Since validator 1 and 2 don't have the above transaction + // executed, they will actively sync and execute it. This exercises the code path where we can + // execute a transaction from a pending checkpoint even when validator is halted. + for node in &nodes[1..3] { let active = node.active().clone(); let handle = tokio::spawn(async move { while !active @@ -96,7 +185,7 @@ async fn reconfig_end_to_end_tests() { .as_ref() .unwrap() .lock() - .is_ready_to_start_epoch_change() + .is_ready_to_finish_epoch_change() { let _ = checkpoint_process_step(&active, &CheckpointProcessControl::default()).await; @@ -107,44 +196,24 @@ async fn reconfig_end_to_end_tests() { // Wait for all validators to be ready for epoch change. join_all(checkpoint_processes).await; - let results: Vec<_> = nodes - .iter() - .map(|node| async { - let active = node.active().clone(); - active.start_epoch_change().await.unwrap(); - while !active - .state - .checkpoints - .as_ref() - .unwrap() - .lock() - .is_ready_to_finish_epoch_change() - { - let _ = - checkpoint_process_step(&active, &CheckpointProcessControl::default()).await; - } - }) - .collect(); - - join_all(results).await; - - let results: Vec<_> = nodes - .iter() - .map(|node| async { - node.active().finish_epoch_change().await.unwrap(); - }) - .collect(); - - futures::future::join_all(results).await; - - // refresh the system state and network addresses - let sui_system_state = states[0].get_sui_system_state_object().await.unwrap(); - assert_eq!(sui_system_state.epoch, 1); - // We should now have one more active validator. - assert_eq!(sui_system_state.validators.active_validators.len(), 5); + // Now that we have a new checkpoint cert formed for the last checkpoint, check that + // validator 3 is able to also sync and execute the above transaction and finish epoch change. + // This exercises the code path where a validator can execute transactions from a checkpoint + // cert even when the validator is halted. + while !nodes[3] + .state() + .checkpoints + .as_ref() + .unwrap() + .lock() + .is_ready_to_finish_epoch_change() + { + let _ = + checkpoint_process_step(nodes[3].active(), &CheckpointProcessControl::default()).await; + } } -pub async fn create_and_register_new_validator( +async fn create_and_register_new_validator( framework_pkg: ObjectRef, gas_objects: &mut Vec, validator_stake: ObjectRef, @@ -199,21 +268,48 @@ pub fn get_new_validator() -> (ValidatorInfo, AuthoritySignature) { ) } -#[allow(dead_code)] -pub async fn get_latest_ref(authority: &SafeClient, object_id: ObjectID) -> ObjectRef -where - A: AuthorityAPI + Send + Sync + Clone + 'static, -{ - if let Ok(ObjectInfoResponse { - requested_object_reference: Some(object_ref), - .. - }) = authority - .handle_object_info_request(ObjectInfoRequest::latest_object_info_request( - object_id, None, - )) - .await - { - return object_ref; +async fn fast_forward_to_ready_for_reconfig_start(nodes: &[SuiNode]) { + let mut checkpoint_processes = vec![]; + for node in nodes { + let active = node.active().clone(); + let handle = tokio::spawn(async move { + while !active + .state + .checkpoints + .as_ref() + .unwrap() + .lock() + .is_ready_to_start_epoch_change() + { + let _ = + checkpoint_process_step(&active, &CheckpointProcessControl::default()).await; + } + }); + checkpoint_processes.push(handle); + } + // Wait for all validators to be ready for epoch change. + join_all(checkpoint_processes).await; +} + +async fn fast_forward_to_ready_for_reconfig_finish(nodes: &[SuiNode]) { + let mut checkpoint_processes = vec![]; + for node in nodes { + let active = node.active().clone(); + let handle = tokio::spawn(async move { + while !active + .state + .checkpoints + .as_ref() + .unwrap() + .lock() + .is_ready_to_finish_epoch_change() + { + let _ = + checkpoint_process_step(&active, &CheckpointProcessControl::default()).await; + } + }); + checkpoint_processes.push(handle); } - panic!("Object not found!"); + // Wait for all validators to be ready for epoch change. + join_all(checkpoint_processes).await; } diff --git a/crates/test-utils/src/authority.rs b/crates/test-utils/src/authority.rs index b91085747f0f4..e521aec51c4ac 100644 --- a/crates/test-utils/src/authority.rs +++ b/crates/test-utils/src/authority.rs @@ -7,7 +7,7 @@ use std::collections::BTreeMap; use std::sync::Arc; use std::time::Duration; use sui_config::{NetworkConfig, NodeConfig, ValidatorInfo}; -use sui_core::authority_client::NetworkAuthorityClientMetrics; +use sui_core::authority_client::{AuthorityAPI, NetworkAuthorityClientMetrics}; use sui_core::epoch::epoch_store::EpochStore; use sui_core::{ authority_active::{ @@ -21,6 +21,8 @@ use sui_core::{ use sui_types::{committee::Committee, object::Object}; pub use sui_node::SuiNode; +use sui_types::base_types::ObjectID; +use sui_types::messages::{ObjectInfoRequest, ObjectInfoRequestKind}; /// The default network buffer size of a test authority. pub const NETWORK_BUFFER_SIZE: usize = 65_000; @@ -140,3 +142,16 @@ pub fn get_client(config: &ValidatorInfo) -> NetworkAuthorityClient { ) .unwrap() } + +pub async fn get_object(config: &ValidatorInfo, object_id: ObjectID) -> Object { + get_client(config) + .handle_object_info_request(ObjectInfoRequest { + object_id, + request_kind: ObjectInfoRequestKind::LatestObjectInfo(None), + }) + .await + .unwrap() + .object() + .unwrap() + .clone() +} From 9654a7d4a83acd3a4954936e12c8a75bdfd587df Mon Sep 17 00:00:00 2001 From: Xun Li Date: Mon, 26 Sep 2022 14:29:41 -0700 Subject: [PATCH 2/2] Rename func --- crates/sui-core/src/authority.rs | 10 ++++++---- crates/sui-core/src/node_sync/node_state.rs | 4 ++-- crates/sui-core/src/unit_tests/authority_tests.rs | 2 +- 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/crates/sui-core/src/authority.rs b/crates/sui-core/src/authority.rs index bd49c2b1ebe47..10a3067e5d767 100644 --- a/crates/sui-core/src/authority.rs +++ b/crates/sui-core/src/authority.rs @@ -513,13 +513,15 @@ impl AuthorityState { } } - /// Execute a certificate that's know to have already finalized (i.e. executed by a quorum of - /// validators). For such certificate, we don't have to wait for consensus to set shared object + /// Execute a certificate that's known to have correct effects. + /// For such certificate, we don't have to wait for consensus to set shared object /// locks because we already know the shared object versions based on the effects. /// This function can be called either by a fullnode after seeing a quorum of signed effects, /// or by a validator after seeing the certificate included by a certified checkpoint. + /// TODO: down the road, we may want to execute a shared object tx on a validator when f+1 + /// validators have executed it. #[instrument(level = "trace", skip_all)] - pub async fn handle_finalized_certificate( + pub async fn handle_certificate_with_effects( &self, certificate: CertifiedTransaction, // Signed effects is signed by only one validator, it is not a @@ -531,7 +533,7 @@ impl AuthorityState { ) -> SuiResult { let _metrics_guard = start_timer(self.metrics.handle_node_sync_certificate_latency.clone()); let digest = *certificate.digest(); - debug!(?digest, "handle_finalized_certificate"); + debug!(?digest, "handle_certificate_with_effects"); fp_ensure!( signed_effects.effects.transaction_digest == digest, SuiError::ErrorWhileProcessingConfirmationTransaction { diff --git a/crates/sui-core/src/node_sync/node_state.rs b/crates/sui-core/src/node_sync/node_state.rs index ab329c27f15d7..0d999dbb30b22 100644 --- a/crates/sui-core/src/node_sync/node_state.rs +++ b/crates/sui-core/src/node_sync/node_state.rs @@ -433,7 +433,7 @@ where match self .state - .handle_finalized_certificate(cert.clone(), effects.clone()) + .handle_certificate_with_effects(cert.clone(), effects.clone()) .await { Ok(_) => Ok(SyncStatus::CertExecuted), @@ -601,7 +601,7 @@ where .await?; self.state - .handle_finalized_certificate(cert, effects.clone()) + .handle_certificate_with_effects(cert, effects.clone()) .await?; Ok(SyncStatus::CertExecuted) diff --git a/crates/sui-core/src/unit_tests/authority_tests.rs b/crates/sui-core/src/unit_tests/authority_tests.rs index e95d521d88870..2f9996f8992b6 100644 --- a/crates/sui-core/src/unit_tests/authority_tests.rs +++ b/crates/sui-core/src/unit_tests/authority_tests.rs @@ -2425,7 +2425,7 @@ async fn test_consensus_message_processed() { handle_cert(&authority2, &certificate).await } else { authority2 - .handle_finalized_certificate(certificate.clone(), effects1.clone()) + .handle_certificate_with_effects(certificate.clone(), effects1.clone()) .await .unwrap(); authority2