diff --git a/crates/sui-benchmark/src/lib.rs b/crates/sui-benchmark/src/lib.rs
index 14637e238aeb6..834329c23e109 100644
--- a/crates/sui-benchmark/src/lib.rs
+++ b/crates/sui-benchmark/src/lib.rs
@@ -173,11 +173,8 @@ impl ValidatorProxy for LocalValidatorAggregatorProxy {
err
),
Ok(auth_agg) => {
- if let Err(err) = self.qd.update_validators(Arc::new(auth_agg)).await {
- error!("Reconfiguration - Error when updating authority aggregator in quorum driver: {}", err);
- } else {
- info!("Reconfiguration - Reconfiguration to epoch {new_epoch} is done");
- }
+ self.qd.update_validators(Arc::new(auth_agg)).await;
+ info!("Reconfiguration - Reconfiguration to epoch {new_epoch} is done");
}
}
}
diff --git a/crates/sui-core/src/authority_aggregator.rs b/crates/sui-core/src/authority_aggregator.rs
index c44cdabc66ab1..886e35ef74781 100644
--- a/crates/sui-core/src/authority_aggregator.rs
+++ b/crates/sui-core/src/authority_aggregator.rs
@@ -298,6 +298,36 @@ impl AuthorityAggregator {
}
}
+ pub fn new_with_metrics(
+ committee: Committee,
+ committee_store: Arc,
+ authority_clients: BTreeMap,
+ safe_client_metrics_base: SafeClientMetricsBase,
+ auth_agg_metrics: AuthAggMetrics,
+ ) -> Self {
+ Self {
+ committee,
+ authority_clients: authority_clients
+ .into_iter()
+ .map(|(name, api)| {
+ (
+ name,
+ SafeClient::new(
+ api,
+ committee_store.clone(),
+ name,
+ SafeClientMetrics::new(&safe_client_metrics_base, name),
+ ),
+ )
+ })
+ .collect(),
+ metrics: auth_agg_metrics,
+ safe_client_metrics_base: Arc::new(safe_client_metrics_base),
+ timeouts: Default::default(),
+ committee_store,
+ }
+ }
+
/// This function recreates AuthorityAggregator with the given committee.
/// It also updates committee store which impacts other of its references.
/// If it is called on a Validator/Fullnode, it **may** interleave with the the authority active's
@@ -388,19 +418,24 @@ impl AuthorityAggregator {
pub fn new_from_system_state(
store: &Arc,
committee_store: &Arc,
- prometheus_registry: &Registry,
+ safe_client_metrics_base: SafeClientMetricsBase,
+ auth_agg_metrics: AuthAggMetrics,
) -> anyhow::Result {
let net_config = default_mysten_network_config();
let sui_system_state = store.get_sui_system_state_object()?;
-
+ // TODO: the function returns on URL parsing errors. In this case we should
+ // tolerate it as long as we have 2f+1 good validators.
+ // GH issue: https://github.com/MystenLabs/sui/issues/7019
let authority_clients =
make_network_authority_client_sets_from_system_state(&sui_system_state, &net_config)?;
- Ok(Self::new(
+ let res = Self::new_with_metrics(
sui_system_state.get_current_epoch_committee().committee,
committee_store.clone(),
authority_clients,
- prometheus_registry,
- ))
+ safe_client_metrics_base,
+ auth_agg_metrics,
+ );
+ Ok(res)
}
}
@@ -449,10 +484,12 @@ impl TransactionCertifier for NetworkTransactionCertifier {
self_state: &AuthorityState,
timeout: Duration,
) -> anyhow::Result {
+ let registry = Registry::new();
let net = AuthorityAggregator::new_from_system_state(
&self_state.db(),
self_state.committee_store(),
- &Registry::new(),
+ SafeClientMetricsBase::new(®istry),
+ AuthAggMetrics::new(®istry),
)?;
net.authorty_ask_for_cert_with_retry_and_timeout(transaction, self_state, timeout)
diff --git a/crates/sui-core/src/authority_client.rs b/crates/sui-core/src/authority_client.rs
index fc9cc1a7a2948..8336c3ee0d3f1 100644
--- a/crates/sui-core/src/authority_client.rs
+++ b/crates/sui-core/src/authority_client.rs
@@ -182,6 +182,8 @@ impl AuthorityAPI for NetworkAuthorityClient {
}
}
+// This function errs on URL parsing error. This may happen
+// when a validator provides a bad URL.
pub fn make_network_authority_client_sets_from_system_state(
sui_system_state: &SuiSystemState,
network_config: &Config,
diff --git a/crates/sui-core/src/checkpoints/checkpoint_executor/mod.rs b/crates/sui-core/src/checkpoints/checkpoint_executor/mod.rs
index 98f94a01cde84..9d7f14b6a0dc3 100644
--- a/crates/sui-core/src/checkpoints/checkpoint_executor/mod.rs
+++ b/crates/sui-core/src/checkpoints/checkpoint_executor/mod.rs
@@ -51,7 +51,7 @@ mod metrics;
pub(crate) mod tests;
const TASKS_PER_CORE: usize = 1;
-const END_OF_EPOCH_BROADCAST_CHANNEL_CAPACITY: usize = 2;
+const END_OF_EPOCH_BROADCAST_CHANNEL_CAPACITY: usize = 100;
const LOCAL_EXECUTION_TIMEOUT: Duration = Duration::from_secs(10);
type CheckpointExecutionBuffer = FuturesOrdered<
diff --git a/crates/sui-core/src/quorum_driver/mod.rs b/crates/sui-core/src/quorum_driver/mod.rs
index e9350753e15a8..57bd13db421b5 100644
--- a/crates/sui-core/src/quorum_driver/mod.rs
+++ b/crates/sui-core/src/quorum_driver/mod.rs
@@ -4,6 +4,8 @@
mod metrics;
pub use metrics::*;
+pub mod reconfig_observer;
+
use arc_swap::ArcSwap;
use std::collections::{BTreeMap, BTreeSet};
use std::fmt::{Debug, Formatter};
@@ -300,12 +302,12 @@ where
Ok(response)
}
- pub async fn update_validators(
- &self,
- new_validators: Arc>,
- ) -> SuiResult {
+ pub async fn update_validators(&self, new_validators: Arc>) {
+ info!(
+ "Quorum Drvier updating AuthorityAggregator with committee {:?}",
+ new_validators.committee
+ );
self.validators.store(new_validators);
- Ok(())
}
// TODO currently this function is not epoch-boundary-safe. We need to make it so.
@@ -609,6 +611,10 @@ where
self.quorum_driver.authority_aggregator()
}
+ pub fn current_epoch(&self) -> EpochId {
+ self.quorum_driver.current_epoch()
+ }
+
/// Process a QuorumDriverTask.
/// The function has no return value - the corresponding actions of task result
/// are performed in this call.
diff --git a/crates/sui-core/src/quorum_driver/reconfig_observer.rs b/crates/sui-core/src/quorum_driver/reconfig_observer.rs
new file mode 100644
index 0000000000000..ae317f4ea2fb6
--- /dev/null
+++ b/crates/sui-core/src/quorum_driver/reconfig_observer.rs
@@ -0,0 +1,105 @@
+// Copyright (c) Mysten Labs, Inc.
+// SPDX-License-Identifier: Apache-2.0
+
+use async_trait::async_trait;
+use std::sync::Arc;
+use sui_types::committee::Committee;
+use tracing::{info, warn};
+
+use crate::{
+ authority::AuthorityStore,
+ authority_aggregator::{AuthAggMetrics, AuthorityAggregator},
+ authority_client::NetworkAuthorityClient,
+ epoch::committee_store::CommitteeStore,
+ safe_client::SafeClientMetricsBase,
+};
+
+use super::QuorumDriver;
+
+#[async_trait]
+pub trait ReconfigObserver {
+ async fn run(&mut self, quorum_driver: Arc>);
+}
+
+pub struct OnsiteReconfigObserver {
+ reconfig_rx: tokio::sync::broadcast::Receiver,
+ authority_store: Arc,
+ committee_store: Arc,
+ safe_client_metrics_base: SafeClientMetricsBase,
+ auth_agg_metrics: AuthAggMetrics,
+}
+
+impl OnsiteReconfigObserver {
+ pub fn new(
+ reconfig_rx: tokio::sync::broadcast::Receiver,
+ authority_store: Arc,
+ committee_store: Arc,
+ safe_client_metrics_base: SafeClientMetricsBase,
+ auth_agg_metrics: AuthAggMetrics,
+ ) -> Self {
+ Self {
+ reconfig_rx,
+ authority_store,
+ committee_store,
+ safe_client_metrics_base,
+ auth_agg_metrics,
+ }
+ }
+
+ async fn create_authority_aggregator_from_system_state(
+ &self,
+ ) -> AuthorityAggregator {
+ AuthorityAggregator::new_from_system_state(
+ &self.authority_store,
+ &self.committee_store,
+ self.safe_client_metrics_base.clone(),
+ self.auth_agg_metrics.clone(),
+ )
+ // TODO: we should tolerate when <= f validators give invalid addresses
+ // GH issue: https://github.com/MystenLabs/sui/issues/7019
+ .unwrap_or_else(|e| {
+ warn!("panicked? {:?}", e);
+ panic!(
+ "Failed to create AuthorityAggregator from System State: {:?}",
+ e
+ )
+ })
+ }
+}
+
+#[async_trait]
+impl ReconfigObserver for OnsiteReconfigObserver {
+ async fn run(&mut self, quorum_driver: Arc>) {
+ // A tiny optimization: when the a very stale node just starts, the
+ // channel may fill up committees quickly. Here we skip directly to
+ // the last known committee by looking at SuiSystemState.
+ let authority_agg = self.create_authority_aggregator_from_system_state().await;
+ if authority_agg.committee.epoch > quorum_driver.current_epoch() {
+ quorum_driver
+ .update_validators(Arc::new(authority_agg))
+ .await;
+ }
+ loop {
+ match self.reconfig_rx.recv().await {
+ Ok(committee) => {
+ info!("Got reconfig message: {}", committee);
+ if committee.epoch > quorum_driver.current_epoch() {
+ let authority_agg =
+ self.create_authority_aggregator_from_system_state().await;
+ quorum_driver
+ .update_validators(Arc::new(authority_agg))
+ .await;
+ } else {
+ // This should only happen when the node just starts
+ warn!("Ignored non-newer from reconfig channel: {}", committee);
+ }
+ }
+ // Neither closed channel nor lagged shall happen
+ Err(other_err) => panic!(
+ "Got unexpected error from reconfig broadcast channel: {:?}",
+ other_err
+ ),
+ }
+ }
+ }
+}
diff --git a/crates/sui-core/src/quorum_driver/tests.rs b/crates/sui-core/src/quorum_driver/tests.rs
index b363dbdfc8db6..7ded338c815af 100644
--- a/crates/sui-core/src/quorum_driver/tests.rs
+++ b/crates/sui-core/src/quorum_driver/tests.rs
@@ -162,8 +162,7 @@ async fn test_quorum_driver_update_validators_and_max_retry_times() {
aggregator.committee.epoch = 10;
quorum_driver_clone
.update_validators(Arc::new(aggregator))
- .await
- .unwrap();
+ .await;
assert_eq!(
quorum_driver_handler.clone_quorum_driver().current_epoch(),
10
diff --git a/crates/sui-core/src/safe_client.rs b/crates/sui-core/src/safe_client.rs
index a8f1ee7a21718..e9d2078563e2b 100644
--- a/crates/sui-core/src/safe_client.rs
+++ b/crates/sui-core/src/safe_client.rs
@@ -33,6 +33,7 @@ macro_rules! check_error {
}
}
+#[derive(Clone)]
pub struct SafeClientMetricsBase {
total_requests_by_address_method: IntCounterVec,
total_responses_by_address_method: IntCounterVec,
diff --git a/crates/sui-core/src/transaction_orchestrator.rs b/crates/sui-core/src/transaction_orchestrator.rs
index 098d901f7b0ef..0a3d39663a59b 100644
--- a/crates/sui-core/src/transaction_orchestrator.rs
+++ b/crates/sui-core/src/transaction_orchestrator.rs
@@ -279,6 +279,10 @@ where
&self.quorum_driver_handler
}
+ pub fn clone_quorum_driver(&self) -> Arc> {
+ self.quorum_driver_handler.clone()
+ }
+
pub fn clone_authority_aggregator(&self) -> Arc> {
self.quorum_driver().authority_aggregator().load_full()
}
diff --git a/crates/sui-node/src/lib.rs b/crates/sui-node/src/lib.rs
index 248d59b31c63d..d19b14a95a946 100644
--- a/crates/sui-node/src/lib.rs
+++ b/crates/sui-node/src/lib.rs
@@ -20,10 +20,13 @@ use prometheus::Registry;
use std::collections::HashMap;
use std::sync::Arc;
use sui_config::{ConsensusConfig, NodeConfig};
-use sui_core::authority_aggregator::{AuthorityAggregator, NetworkTransactionCertifier};
+use sui_core::authority_aggregator::{
+ AuthAggMetrics, AuthorityAggregator, NetworkTransactionCertifier,
+};
use sui_core::authority_server::ValidatorService;
use sui_core::checkpoints::checkpoint_executor;
use sui_core::epoch::committee_store::CommitteeStore;
+use sui_core::safe_client::SafeClientMetricsBase;
use sui_core::storage::RocksDbStore;
use sui_core::transaction_orchestrator::TransactiondOrchestrator;
use sui_core::transaction_streamer::TransactionStreamer;
@@ -48,6 +51,7 @@ use sui_storage::{
IndexStore,
};
use sui_types::committee::Committee;
+use sui_types::committee::EpochId;
use sui_types::crypto::KeypairTraits;
use sui_types::messages::QuorumDriverResponse;
use tokio::sync::mpsc::channel;
@@ -174,7 +178,8 @@ impl SuiNode {
let arc_net = AuthorityAggregator::new_from_system_state(
&store,
&committee_store,
- &prometheus_registry,
+ SafeClientMetricsBase::new(&prometheus_registry),
+ AuthAggMetrics::new(&prometheus_registry),
)?;
let transaction_streamer = if is_full_node {
@@ -264,6 +269,14 @@ impl SuiNode {
Ok(node)
}
+ pub async fn subscribe_to_epoch_change(&self) -> tokio::sync::broadcast::Receiver {
+ self.reconfig_channel.lock().await.resubscribe()
+ }
+
+ pub fn current_epoch(&self) -> EpochId {
+ self.state.epoch()
+ }
+
pub fn close_epoch(&self) -> SuiResult {
self.validator_components
.load()
@@ -546,6 +559,14 @@ impl SuiNode {
self.state.clone()
}
+ pub fn clone_committee_store(&self) -> Arc {
+ self.state.committee_store().clone()
+ }
+
+ pub fn clone_authority_store(&self) -> Arc {
+ self.state.db()
+ }
+
/// Clone an AuthorityAggregator currently used in this node's
/// QuorumDriver, if the node is a fullnode. After reconfig,
/// QuorumDrvier builds a new AuthorityAggregator. The caller
diff --git a/crates/sui/tests/onsite_reconfig_observer_tests.rs b/crates/sui/tests/onsite_reconfig_observer_tests.rs
new file mode 100644
index 0000000000000..ce5df810ec0d6
--- /dev/null
+++ b/crates/sui/tests/onsite_reconfig_observer_tests.rs
@@ -0,0 +1,62 @@
+// Copyright (c) Mysten Labs, Inc.
+// SPDX-License-Identifier: Apache-2.0
+
+use prometheus::Registry;
+use sui_core::authority_aggregator::AuthAggMetrics;
+use sui_core::quorum_driver::reconfig_observer::OnsiteReconfigObserver;
+use sui_core::quorum_driver::reconfig_observer::ReconfigObserver;
+use sui_core::safe_client::SafeClientMetricsBase;
+use test_utils::authority::{spawn_test_authorities_with_fullnodes, test_authority_configs};
+use test_utils::network::wait_for_nodes_transition_to_epoch;
+
+#[tokio::test]
+async fn test_onsite_reconfig_observer_basic() {
+ telemetry_subscribers::init_for_testing();
+ let (authorities, fullnodes) =
+ spawn_test_authorities_with_fullnodes([].into_iter(), &test_authority_configs(), 1).await;
+ let fullnode = &fullnodes[0];
+ let qd = fullnode.with(|node| {
+ let qd = node
+ .transaction_orchestrator()
+ .unwrap()
+ .clone_quorum_driver();
+ assert_eq!(qd.current_epoch(), 0);
+ qd
+ });
+ let qd_clone = qd.clone_quorum_driver();
+ let registry = Registry::new();
+ let mut observer: OnsiteReconfigObserver = fullnode
+ .with_async(|node| async {
+ let rx = node.subscribe_to_epoch_change().await;
+ OnsiteReconfigObserver::new(
+ rx,
+ node.clone_authority_store(),
+ node.clone_committee_store(),
+ SafeClientMetricsBase::new(®istry),
+ AuthAggMetrics::new(®istry),
+ )
+ })
+ .await;
+
+ let _observer_handle = tokio::task::spawn(async move { observer.run(qd_clone).await });
+
+ for handle in &authorities {
+ handle.with(|node| node.close_epoch().unwrap());
+ }
+ // Wait for all nodes to reach the next epoch.
+ wait_for_nodes_transition_to_epoch(authorities.iter().chain(fullnodes.iter()), 1).await;
+
+ // Give it some time for the update to happen
+ tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
+ fullnode.with(|node| {
+ let qd = node
+ .transaction_orchestrator()
+ .unwrap()
+ .clone_quorum_driver();
+ assert_eq!(qd.current_epoch(), 1);
+ assert_eq!(
+ node.clone_authority_aggregator().unwrap().committee.epoch,
+ 1
+ );
+ });
+}
diff --git a/crates/sui/tests/reconfiguration_tests.rs b/crates/sui/tests/reconfiguration_tests.rs
index 861a68986dd44..f8a59c205619c 100644
--- a/crates/sui/tests/reconfiguration_tests.rs
+++ b/crates/sui/tests/reconfiguration_tests.rs
@@ -1,7 +1,6 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0
-use futures::future::join_all;
use std::collections::BTreeMap;
use std::sync::Arc;
use std::time::Duration;
@@ -15,6 +14,7 @@ use sui_types::error::SuiError;
use sui_types::gas::GasCostSummary;
use sui_types::messages::VerifiedTransaction;
use test_utils::authority::{spawn_test_authorities, test_authority_configs};
+use test_utils::network::wait_for_nodes_transition_to_epoch;
#[tokio::test]
async fn local_advance_epoch_tx_test() {
@@ -103,20 +103,7 @@ async fn basic_reconfig_end_to_end_test() {
handle.with(|node| node.close_epoch().unwrap());
}
// Wait for all nodes to reach the next epoch.
- let handles: Vec<_> = authorities
- .iter()
- .map(|handle| {
- handle.with_async(|node| async {
- loop {
- if node.state().epoch() == 1 {
- break;
- }
- tokio::time::sleep(Duration::from_secs(1)).await;
- }
- })
- })
- .collect();
- join_all(handles).await;
+ wait_for_nodes_transition_to_epoch(authorities.iter(), 1).await;
}
/*
diff --git a/crates/test-utils/src/authority.rs b/crates/test-utils/src/authority.rs
index 266ceb57b1a93..dfa12f218a972 100644
--- a/crates/test-utils/src/authority.rs
+++ b/crates/test-utils/src/authority.rs
@@ -8,12 +8,11 @@ use std::time::Duration;
use sui_config::{NetworkConfig, NodeConfig, ValidatorInfo};
use sui_core::authority_client::AuthorityAPI;
use sui_core::authority_client::NetworkAuthorityClient;
-use sui_types::object::Object;
-
pub use sui_node::{SuiNode, SuiNodeHandle};
use sui_types::base_types::ObjectID;
use sui_types::crypto::TEST_COMMITTEE_SIZE;
use sui_types::messages::{ObjectInfoRequest, ObjectInfoRequestKind};
+use sui_types::object::Object;
/// The default network buffer size of a test authority.
pub const NETWORK_BUFFER_SIZE: usize = 65_000;
@@ -106,6 +105,45 @@ where
handles
}
+/// Spawn all authorities in the test committee into a separate tokio task.
+pub async fn spawn_test_authorities_with_fullnodes(
+ objects: I,
+ config: &NetworkConfig,
+ fullnode_num: u8,
+) -> (Vec, Vec)
+where
+ I: IntoIterator- + Clone,
+{
+ let mut handles = Vec::new();
+ for validator in config.validator_configs() {
+ let registry_service = RegistryService::new(Registry::new());
+ let node = start_node(validator, registry_service).await;
+ let objects = objects.clone();
+
+ node.with_async(|node| async move {
+ let state = node.state();
+ for o in objects {
+ state.insert_genesis_object(o).await
+ }
+ })
+ .await;
+
+ handles.push(node);
+ }
+ let mut fullnode_handles = Vec::new();
+ for _ in 0..fullnode_num {
+ let registry_service = RegistryService::new(Registry::new());
+ let fullnode_config = config
+ .fullnode_config_builder()
+ .with_random_dir()
+ .build()
+ .unwrap();
+ let node = start_node(&fullnode_config, registry_service).await;
+ fullnode_handles.push(node);
+ }
+ (handles, fullnode_handles)
+}
+
/// Get a network client to communicate with the consensus.
pub fn get_client(config: &ValidatorInfo) -> NetworkAuthorityClient {
NetworkAuthorityClient::connect_lazy(config.network_address()).unwrap()
diff --git a/crates/test-utils/src/network.rs b/crates/test-utils/src/network.rs
index 99944b660e857..f9b044fd5e67f 100644
--- a/crates/test-utils/src/network.rs
+++ b/crates/test-utils/src/network.rs
@@ -1,6 +1,7 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0
+use futures::future::join_all;
use std::num::NonZeroUsize;
use std::sync::Arc;
@@ -17,9 +18,11 @@ use sui_config::{Config, SUI_CLIENT_CONFIG, SUI_NETWORK_CONFIG};
use sui_config::{FullnodeConfigBuilder, NodeConfig, PersistedConfig, SUI_KEYSTORE_FILENAME};
use sui_keys::keystore::{AccountKeystore, FileBasedKeystore, Keystore};
use sui_node::SuiNode;
+use sui_node::SuiNodeHandle;
use sui_sdk::SuiClient;
use sui_swarm::memory::{Swarm, SwarmBuilder};
use sui_types::base_types::SuiAddress;
+use sui_types::committee::EpochId;
use sui_types::crypto::KeypairTraits;
use sui_types::crypto::SuiKeyPair;
@@ -257,3 +260,22 @@ pub async fn start_fullnode_from_config(
ws_url,
})
}
+
+pub async fn wait_for_nodes_transition_to_epoch<'a>(
+ nodes: impl Iterator
- ,
+ expected_epoch: EpochId,
+) {
+ let handles: Vec<_> = nodes
+ .map(|handle| {
+ handle.with_async(|node| async move {
+ let mut rx = node.subscribe_to_epoch_change().await;
+ let epoch = node.current_epoch();
+ if epoch != expected_epoch {
+ let committee = rx.recv().await.unwrap();
+ assert_eq!(committee.epoch, expected_epoch);
+ }
+ })
+ })
+ .collect();
+ join_all(handles).await;
+}