Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[reconfig] Allow sync last checkpoint tx when validator is halted #4493

Merged
merged 2 commits into from
Sep 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 33 additions & 8 deletions crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -513,10 +513,15 @@ impl AuthorityState {
}
}

/// We cannot use handle_certificate in fullnode to execute a certificate because there is no
/// consensus engine to assign locks for shared objects. Hence we need special handling here.
/// Execute a certificate that's known to have correct effects.
/// For such certificate, we don't have to wait for consensus to set shared object
/// locks because we already know the shared object versions based on the effects.
/// This function can be called either by a fullnode after seeing a quorum of signed effects,
/// or by a validator after seeing the certificate included by a certified checkpoint.
/// TODO: down the road, we may want to execute a shared object tx on a validator when f+1
/// validators have executed it.
#[instrument(level = "trace", skip_all)]
pub async fn handle_node_sync_certificate(
pub async fn handle_certificate_with_effects(
&self,
certificate: CertifiedTransaction,
// Signed effects is signed by only one validator, it is not a
Expand All @@ -528,7 +533,7 @@ impl AuthorityState {
) -> SuiResult {
let _metrics_guard = start_timer(self.metrics.handle_node_sync_certificate_latency.clone());
let digest = *certificate.digest();
debug!(?digest, "handle_node_sync_transaction");
debug!(?digest, "handle_certificate_with_effects");
fp_ensure!(
signed_effects.effects.transaction_digest == digest,
SuiError::ErrorWhileProcessingConfirmationTransaction {
Expand All @@ -547,7 +552,7 @@ impl AuthorityState {
}

let resp = self
.process_certificate(tx_guard, &certificate)
.process_certificate(tx_guard, &certificate, true)
.await
.tap_err(|e| debug!(?digest, "process_certificate failed: {}", e))?;

Expand All @@ -571,7 +576,23 @@ impl AuthorityState {
certificate: CertifiedTransaction,
) -> SuiResult<TransactionInfoResponse> {
let _metrics_guard = start_timer(self.metrics.handle_certificate_latency.clone());
self.handle_certificate_impl(certificate, false).await
}

#[instrument(level = "trace", skip_all)]
pub async fn handle_certificate_bypass_validator_halt(
&self,
certificate: CertifiedTransaction,
) -> SuiResult<TransactionInfoResponse> {
self.handle_certificate_impl(certificate, true).await
}

#[instrument(level = "trace", skip_all)]
async fn handle_certificate_impl(
&self,
certificate: CertifiedTransaction,
bypass_validator_halt: bool,
) -> SuiResult<TransactionInfoResponse> {
self.metrics.total_cert_attempts.inc();
if self.is_fullnode() {
return Err(SuiError::GenericStorageError(
Expand Down Expand Up @@ -603,7 +624,7 @@ impl AuthorityState {
.instrument(span)
.await?;

self.process_certificate(tx_guard, &certificate)
self.process_certificate(tx_guard, &certificate, bypass_validator_halt)
.await
.tap_err(|e| debug!(?tx_digest, "process_certificate failed: {}", e))
}
Expand Down Expand Up @@ -670,6 +691,7 @@ impl AuthorityState {
&self,
tx_guard: CertTxGuard<'_>,
certificate: &CertifiedTransaction,
bypass_validator_halt: bool,
) -> SuiResult<TransactionInfoResponse> {
let digest = *certificate.digest();
// The cert could have been processed by a concurrent attempt of the same cert, so check if
Expand All @@ -679,7 +701,10 @@ impl AuthorityState {
return Ok(info);
}

if self.is_halted() && !certificate.signed_data.data.kind.is_system_tx() {
if self.is_halted()
&& !bypass_validator_halt
&& !certificate.signed_data.data.kind.is_system_tx()
{
tx_guard.release();
// TODO: Do we want to include the new validator set?
return Err(SuiError::ValidatorHaltedAtEpochEnd);
Expand Down Expand Up @@ -1386,7 +1411,7 @@ impl AuthorityState {
continue;
}

if let Err(e) = self.process_certificate(tx_guard, &cert).await {
if let Err(e) = self.process_certificate(tx_guard, &cert, false).await {
warn!(?digest, "Failed to process in-progress certificate: {}", e);
}
} else {
Expand Down
82 changes: 50 additions & 32 deletions crates/sui-core/src/authority_active/checkpoint_driver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -639,14 +639,14 @@ where
&available_authorities,
)
.await?;
state_checkpoints
.lock()
.process_new_checkpoint_certificate(
checkpoint,
&contents,
committee,
active_authority.state.database.clone(),
)?;
process_new_checkpoint_certificate(
active_authority,
state_checkpoints,
committee,
checkpoint,
&contents,
)
.await?;
info!(
cp_seq=?checkpoint.summary.sequence_number(),
"Stored new checkpoint certificate",
Expand Down Expand Up @@ -700,37 +700,55 @@ where
let (past, contents) =
get_one_checkpoint_with_contents(net.clone(), seq, &available_authorities).await?;

let errors = active_authority
.node_sync_handle()
.sync_checkpoint_cert_transactions(&contents)
.await?
.zip(futures::stream::iter(contents.iter()))
.filter_map(|(r, digests)| async move {
r.map_err(|e| {
info!(?digests, "failed to execute digest from checkpoint: {}", e);
e
})
.err()
})
.collect::<Vec<SuiError>>()
.await;

if !errors.is_empty() {
let error = "Failed to sync transactions in checkpoint".to_string();
error!(?seq, "{}", error);
return Err(SuiError::CheckpointingError { error });
}

checkpoint_db.lock().process_synced_checkpoint_certificate(
process_new_checkpoint_certificate(
active_authority,
&checkpoint_db,
&net.committee,
&past,
&contents,
&net.committee,
)?;
)
.await?;
}

Ok(())
}

async fn process_new_checkpoint_certificate<A>(
active_authority: &ActiveAuthority<A>,
checkpoint_db: &Arc<Mutex<CheckpointStore>>,
committee: &Committee,
checkpoint_cert: &CertifiedCheckpointSummary,
contents: &CheckpointContents,
) -> SuiResult
where
A: AuthorityAPI + Send + Sync + 'static + Clone,
{
let errors = active_authority
.node_sync_handle()
.sync_checkpoint_cert_transactions(contents)
.await?
.zip(futures::stream::iter(contents.iter()))
.filter_map(|(r, digests)| async move {
r.map_err(|e| {
info!(?digests, "failed to execute digest from checkpoint: {}", e);
e
})
.err()
})
.collect::<Vec<SuiError>>()
.await;

if !errors.is_empty() {
let error = "Failed to sync transactions in checkpoint".to_string();
error!(cp_seq=?checkpoint_cert.summary.sequence_number, "{}", error);
return Err(SuiError::CheckpointingError { error });
}

checkpoint_db
.lock()
.process_synced_checkpoint_certificate(checkpoint_cert, contents, committee)
}

pub async fn get_one_checkpoint_with_contents<A>(
net: Arc<AuthorityAggregator<A>>,
sequence_number: CheckpointSequenceNumber,
Expand Down
4 changes: 3 additions & 1 deletion crates/sui-core/src/checkpoints/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -506,8 +506,10 @@ impl CheckpointStore {

// Send to consensus for sequencing.
if let Some(sender) = &self.sender {
debug!("Send fragment: {} -- {}", self.name, other_name);
let seq = fragment.proposer.summary.sequence_number;
debug!(cp_seq=?seq, "Sending fragment: {} -- {}", self.name, other_name);
sender.send_to_consensus(fragment.clone())?;
debug!(cp_seq=?seq, "Fragment successfully sent: {} -- {}", self.name, other_name);
} else {
return Err(SuiError::from("No consensus sender configured"));
}
Expand Down
49 changes: 42 additions & 7 deletions crates/sui-core/src/node_sync/node_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,13 @@ impl DigestsMessage {
}
}

fn new_for_pending_ckpt(digest: &TransactionDigest, tx: oneshot::Sender<SyncResult>) -> Self {
Self {
sync_arg: SyncArg::PendingCheckpoint(*digest),
tx: Some(tx),
}
}

fn new_for_exec_driver(digest: &TransactionDigest, tx: oneshot::Sender<SyncResult>) -> Self {
Self {
sync_arg: SyncArg::ExecDriver(*digest),
Expand Down Expand Up @@ -140,6 +147,13 @@ pub enum SyncArg {
/// In checkpoint mode, all txes are known to be final.
Checkpoint(ExecutionDigests),

/// Transactions in the current checkpoint to be signed/stored.
/// We don't have the effect digest since we may not have it when constructing the checkpoint.
/// The primary difference between PendingCheckpoint and ExecDriver is that PendingCheckpoint
/// sync can by-pass validator halting. This is to ensure that the last checkpoint of the epoch
/// can always be formed when there are missing transactions.
PendingCheckpoint(TransactionDigest),

/// Used by the execution driver to execute pending certs. No effects digest is provided,
/// because this mode is used on validators only, who must compute the effects digest
/// themselves - they cannot trust some other validator's version of the effects because that
Expand All @@ -165,7 +179,9 @@ impl SyncArg {
effects,
},
) => (transaction, Some(effects)),
SyncArg::Parent(digest) | SyncArg::ExecDriver(digest) => (digest, None),
SyncArg::Parent(digest)
| SyncArg::ExecDriver(digest)
| SyncArg::PendingCheckpoint(digest) => (digest, None),
}
}
}
Expand Down Expand Up @@ -417,7 +433,7 @@ where

match self
.state
.handle_node_sync_certificate(cert.clone(), effects.clone())
.handle_certificate_with_effects(cert.clone(), effects.clone())
.await
{
Ok(_) => Ok(SyncStatus::CertExecuted),
Expand All @@ -429,11 +445,19 @@ where
&self,
permit: OwnedSemaphorePermit,
digest: &TransactionDigest,
bypass_validator_halt: bool,
) -> SyncResult {
trace!(?digest, "validator pending execution requested");
let cert = self.get_cert(digest).await?;

match self.state.handle_certificate(cert.clone()).await {
let result = if bypass_validator_halt {
self.state
.handle_certificate_bypass_validator_halt(cert.clone())
.await
} else {
self.state.handle_certificate(cert.clone()).await
};
match result {
Ok(_) => Ok(SyncStatus::CertExecuted),
Err(SuiError::ObjectNotFound { .. })
| Err(SuiError::ObjectErrors { .. })
Expand All @@ -451,7 +475,13 @@ where

// Parents have been executed, so this should now succeed.
debug!(?digest, "parents executed, re-attempting cert");
self.state.handle_certificate(cert.clone()).await?;
if bypass_validator_halt {
self.state
.handle_certificate_bypass_validator_halt(cert.clone())
.await
} else {
self.state.handle_certificate(cert.clone()).await
}?;
Ok(SyncStatus::CertExecuted)
}
Err(e) => Err(e),
Expand Down Expand Up @@ -490,7 +520,12 @@ where
// down.
let (digests, authorities_with_cert) = match arg {
SyncArg::ExecDriver(digest) => {
return self.process_exec_driver_digest(permit, &digest).await;
return self
.process_exec_driver_digest(permit, &digest, false)
.await;
}
SyncArg::PendingCheckpoint(digest) => {
return self.process_exec_driver_digest(permit, &digest, true).await;
}
SyncArg::Parent(digest) => {
// digest is known to be final because it appeared in the dependencies list of a
Expand Down Expand Up @@ -566,7 +601,7 @@ where
.await?;

self.state
.handle_node_sync_certificate(cert, effects.clone())
.handle_certificate_with_effects(cert, effects.clone())
.await?;

Ok(SyncStatus::CertExecuted)
Expand Down Expand Up @@ -869,7 +904,7 @@ impl NodeSyncHandle {
let mut futures = FuturesOrdered::new();
for digests in transactions {
let (tx, rx) = oneshot::channel();
let msg = DigestsMessage::new_for_exec_driver(&digests.transaction, tx);
let msg = DigestsMessage::new_for_pending_ckpt(&digests.transaction, tx);
Self::send_msg_with_tx(self.sender.clone(), msg).await?;
futures.push_back(Self::map_rx(rx));
}
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-core/src/unit_tests/authority_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2425,7 +2425,7 @@ async fn test_consensus_message_processed() {
handle_cert(&authority2, &certificate).await
} else {
authority2
.handle_node_sync_certificate(certificate.clone(), effects1.clone())
.handle_certificate_with_effects(certificate.clone(), effects1.clone())
.await
.unwrap();
authority2
Expand Down
Loading