Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ExecDriver] Launch execution driver with AuthorityState #6690

Merged
merged 3 commits into from
Dec 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 62 additions & 32 deletions crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use move_core_types::identifier::Identifier;
use move_core_types::parser::parse_struct_tag;
use move_core_types::{language_storage::ModuleId, resolver::ModuleResolver};
use move_vm_runtime::{move_vm::MoveVM, native_functions::NativeFunctionTable};
use mysten_metrics::monitored_scope;
use mysten_metrics::{monitored_scope, spawn_monitored_task};
use prometheus::{
exponential_buckets, register_histogram_with_registry, register_int_counter_with_registry,
register_int_gauge_with_registry, Histogram, IntCounter, IntGauge, Registry,
Expand All @@ -29,7 +29,7 @@ use std::{
};
use tap::TapFallible;
use tokio::sync::broadcast::error::RecvError;
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver};
use tokio::sync::mpsc::unbounded_channel;
use tracing::Instrument;
use tracing::{debug, error, instrument, warn};
use typed_store::Map;
Expand Down Expand Up @@ -86,6 +86,7 @@ use crate::consensus_handler::{
};
use crate::epoch::committee_store::CommitteeStore;
use crate::epoch::reconfiguration::ReconfigState;
use crate::execution_driver::execution_process;
use crate::module_cache_gauge::ModuleCacheGauge;
use crate::scoped_counter;
use crate::{
Expand Down Expand Up @@ -165,8 +166,12 @@ pub struct AuthorityMetrics {

pub(crate) transaction_manager_num_missing_objects: IntGauge,
pub(crate) transaction_manager_num_pending_certificates: IntGauge,
pub(crate) transaction_manager_num_executing_certificates: IntGauge,
pub(crate) transaction_manager_num_ready: IntGauge,

pub(crate) execution_driver_executed_transactions: IntCounter,
pub(crate) execution_driver_execution_failures: IntCounter,

skipped_consensus_txns: IntCounter,

pub follower_items_streamed: IntCounter,
Expand Down Expand Up @@ -324,13 +329,31 @@ impl AuthorityMetrics {
.unwrap(),
transaction_manager_num_pending_certificates: register_int_gauge_with_registry!(
"transaction_manager_num_pending_certificates",
"Current number of pending certificates in TransactionManager",
"Number of certificates pending in TransactionManager, with at least 1 missing input object",
registry,
)
.unwrap(),
transaction_manager_num_executing_certificates: register_int_gauge_with_registry!(
"transaction_manager_num_executing_certificates",
"Nnumber of executing certificates, including queued and actually running certificates",
registry,
)
.unwrap(),
transaction_manager_num_ready: register_int_gauge_with_registry!(
"transaction_manager_num_ready",
"Current number of ready transactions in TransactionManager",
"Number of ready transactions in TransactionManager",
registry,
)
.unwrap(),
execution_driver_executed_transactions: register_int_counter_with_registry!(
"execution_driver_executed_transactions",
"Cumulative number of transaction executed by execution driver",
registry,
)
.unwrap(),
execution_driver_execution_failures: register_int_counter_with_registry!(
"execution_driver_execution_failures",
"Cumulative number of transactions failed to be executed by execution driver",
registry,
)
.unwrap(),
Expand Down Expand Up @@ -529,13 +552,6 @@ pub struct AuthorityState {
/// Manages pending certificates and their missing input objects.
pub(crate) transaction_manager: Arc<TransactionManager>,

/// The contained receiver will stream out certificates that have all inputs available locally,
/// and are ready to be executed.
/// This member temporarily holds the receiver beginning from AuthorityState initialization,
/// until the receiver is extracted by execution driver. This a bit awkward because
/// AuthorityState is created before execution driver.
rx_ready_certificates: tokio::sync::Mutex<Option<UnboundedReceiver<VerifiedCertificate>>>,

// Structures needed for handling batching and notifications.
/// The sender to notify of new transactions
/// and create batches for this authority.
Expand Down Expand Up @@ -1429,7 +1445,7 @@ impl AuthorityState {
event_store: Option<Arc<EventStoreType>>,
transaction_streamer: Option<Arc<TransactionStreamer>>,
prometheus_registry: &Registry,
) -> Self {
) -> Arc<Self> {
let (tx, _rx) = tokio::sync::broadcast::channel(BROADCAST_CAPACITY);
let native_functions =
sui_framework::natives::all_natives(MOVE_STDLIB_ADDRESS, SUI_FRAMEWORK_ADDRESS);
Expand All @@ -1449,7 +1465,7 @@ impl AuthorityState {
TransactionManager::new(store.clone(), tx_ready_certificates, metrics.clone()).await,
);

let mut state = AuthorityState {
let state = Arc::new(AuthorityState {
name,
secret,
_native_functions: native_functions,
Expand All @@ -1464,14 +1480,13 @@ impl AuthorityState {
transaction_streamer,
committee_store,
transaction_manager: transaction_manager.clone(),
rx_ready_certificates: tokio::sync::Mutex::new(Some(rx_ready_certificates)),
batch_channels: tx,
batch_notifier: Arc::new(
authority_notifier::TransactionNotifier::new(store.clone(), prometheus_registry)
.expect("Notifier cannot start."),
),
metrics,
};
});

prometheus_registry
.register(Box::new(ModuleCacheGauge::new(&state.module_cache)))
Expand All @@ -1488,6 +1503,10 @@ impl AuthorityState {
.init_batches_from_database()
.expect("Init batches failed!");

// Start a task to execute ready certificates.
let authority_state = Arc::downgrade(&state);
spawn_monitored_task!(execution_process(authority_state, rx_ready_certificates));

state
}

Expand All @@ -1497,7 +1516,7 @@ impl AuthorityState {
key: &AuthorityKeyPair,
store_base_path: Option<PathBuf>,
genesis: Option<&Genesis>,
) -> Self {
) -> Arc<Self> {
let secret = Arc::pin(key.copy());
let path = match store_base_path {
Some(path) => path,
Expand Down Expand Up @@ -1636,15 +1655,6 @@ impl AuthorityState {
self.database.get_object(object_id)
}

/// Extracts the stream of ready to execute certificates, published by the transaction manager.
/// Must only be called once, from execution driver only.
pub(crate) async fn ready_certificates_stream(
&self,
) -> Option<UnboundedReceiver<VerifiedCertificate>> {
let mut rx_ready_certificates = self.rx_ready_certificates.lock().await;
rx_ready_certificates.take()
}

pub async fn get_framework_object_ref(&self) -> SuiResult<ObjectRef> {
Ok(self
.get_object(&SUI_FRAMEWORK_ADDRESS.into())
Expand Down Expand Up @@ -2202,6 +2212,26 @@ impl AuthorityState {
transaction: VerifiedSequencedConsensusTransaction,
checkpoint_service: &Arc<C>,
) -> SuiResult {
if let Some(certificate) = self
.process_consensus_transaction(transaction, checkpoint_service)
.await?
{
// The certificate has already been inserted into the pending_certificates table by
// process_consensus_transaction() above.
self.transaction_manager.enqueue(vec![certificate]).await?;
}
Ok(())
}

/// Depending on the type of the VerifiedSequencedConsensusTransaction wrapper,
/// - Verify and initialize the state to execute the certificate.
/// Returns a VerifiedCertificate only if this succeeds.
/// - Or update the state for checkpoint or epoch change protocol. Returns None.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you comment that this method is only needed for tests?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nm this is clear enough i suppose

pub(crate) async fn process_consensus_transaction<C: CheckpointServiceNotify>(
&self,
transaction: VerifiedSequencedConsensusTransaction,
checkpoint_service: &Arc<C>,
) -> SuiResult<Option<VerifiedCertificate>> {
let _scope = monitored_scope("HandleConsensusTransaction");
let VerifiedSequencedConsensusTransaction(SequencedConsensusTransaction {
certificate: consensus_output,
Expand All @@ -2218,7 +2248,7 @@ impl AuthorityState {
// However this certificate will be filtered out before this line by `consensus_message_processed` call in `verify_consensus_transaction`
// If we see some new certificate here it means authority is byzantine and sent certificate after EndOfPublish (or we have some bug in ConsensusAdapter)
warn!("[Byzantine authority] Authority {:?} sent a new, previously unseen certificate {:?} after it sent EndOfPublish message to consensus", authority.concise(), certificate.digest());
return Ok(());
return Ok(None);
}
// Safe because signatures are verified when VerifiedSequencedConsensusTransaction
// is constructed.
Expand All @@ -2237,7 +2267,7 @@ impl AuthorityState {
{
debug!("Ignoring consensus certificate for transaction {:?} because of end of epoch",
certificate.digest());
return Ok(());
return Ok(None);
}

if certificate.contains_shared_object() {
Expand All @@ -2258,21 +2288,21 @@ impl AuthorityState {
.await?;
}

// The certificate was already inserted into pending_certificates by
// finish_consensus_message_process.
self.transaction_manager.enqueue(vec![certificate]).await
Ok(Some(certificate))
}
ConsensusTransactionKind::CheckpointSignature(info) => {
checkpoint_service.notify_checkpoint_signature(info)?;
self.database
.record_consensus_transaction_processed(&transaction, consensus_index)
.await
.await?;
Ok(None)
}
ConsensusTransactionKind::EndOfPublish(authority) => {
debug!("Received EndOfPublish from {:?}", authority.concise());
self.database
.record_end_of_publish(*authority, &transaction, consensus_index)
.await
.await?;
Ok(None)
}
}
}
Expand Down
13 changes: 0 additions & 13 deletions crates/sui-core/src/authority_active.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,6 @@ use gossip::GossipMetrics;

use crate::authority_client::NetworkAuthorityClientMetrics;

pub mod execution_driver;

use self::execution_driver::{execution_process, ExecutionDriverMetrics};

// TODO: Make these into a proper config
const MAX_RETRIES_RECORDED: u32 = 10;
const DELAY_FOR_1_RETRY_MS: u64 = 2_000;
Expand Down Expand Up @@ -132,8 +128,6 @@ pub struct ActiveAuthority<A> {
// This is only meaningful if A is of type NetworkAuthorityClient,
// and stored here for reconfiguration purposes.
pub network_metrics: Arc<NetworkAuthorityClientMetrics>,

pub execution_driver_metrics: ExecutionDriverMetrics,
}

impl<A> ActiveAuthority<A> {
Expand All @@ -160,7 +154,6 @@ impl<A> ActiveAuthority<A> {
net: ArcSwap::from(net),
gossip_metrics: GossipMetrics::new(prometheus_registry),
network_metrics,
execution_driver_metrics: ExecutionDriverMetrics::new(prometheus_registry),
})
}

Expand Down Expand Up @@ -238,7 +231,6 @@ impl<A> Clone for ActiveAuthority<A> {
health: self.health.clone(),
gossip_metrics: self.gossip_metrics.clone(),
network_metrics: self.network_metrics.clone(),
execution_driver_metrics: self.execution_driver_metrics.clone(),
}
}
}
Expand Down Expand Up @@ -329,9 +321,4 @@ where
let mut lock_guard = self.node_sync_process.lock().await;
Self::cancel_node_sync_process_impl(&mut lock_guard).await;
}

/// Spawn pending certificate execution process
pub async fn spawn_execute_process(self: Arc<Self>) -> JoinHandle<()> {
spawn_monitored_task!(execution_process(self))
}
}
Loading