Skip to content

Commit

Permalink
[QD Reconfig] 3. add reconfig observer (#7024)
Browse files Browse the repository at this point in the history
1. This PR adds ReconfigObserver Trait. RO detects reconfigs and updates
quorum driver the new committee.
2. `OnsiteReconfigObserver` is the RO that lives in
`Fullnode`/`TransactionOrchestrator` that subscribes to the checkpoint
executor reconfig channel. Note the integration of
`OnsiteReconfigObserver` and `TransactionOrchestrator` happens in the
follow-up PR

Co-authored-by: Mark Logan <[email protected]>
  • Loading branch information
longbowlu and mystenmark authored Jan 6, 2023
1 parent d87a8c3 commit fa4b7fc
Show file tree
Hide file tree
Showing 11 changed files with 296 additions and 20 deletions.
7 changes: 2 additions & 5 deletions crates/sui-benchmark/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,11 +173,8 @@ impl ValidatorProxy for LocalValidatorAggregatorProxy {
err
),
Ok(auth_agg) => {
if let Err(err) = self.qd.update_validators(Arc::new(auth_agg)).await {
error!("Reconfiguration - Error when updating authority aggregator in quorum driver: {}", err);
} else {
info!("Reconfiguration - Reconfiguration to epoch {new_epoch} is done");
}
self.qd.update_validators(Arc::new(auth_agg)).await;
info!("Reconfiguration - Reconfiguration to epoch {new_epoch} is done");
}
}
}
Expand Down
38 changes: 35 additions & 3 deletions crates/sui-config/src/swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,14 @@ pub struct FullnodeConfigBuilder<'a> {
dir: Option<PathBuf>,
enable_event_store: bool,
listen_ip: Option<IpAddr>,
// port for main network_address
port: Option<u16>,
// port for p2p data sync
p2p_port: Option<u16>,
// port for json rpc api
rpc_port: Option<u16>,
// port for admin interface
admin_port: Option<u16>,
}

impl<'a> FullnodeConfigBuilder<'a> {
Expand All @@ -84,7 +91,10 @@ impl<'a> FullnodeConfigBuilder<'a> {
dir: None,
enable_event_store: false,
listen_ip: None,
port: None,
p2p_port: None,
rpc_port: None,
admin_port: None,
}
}

Expand All @@ -103,6 +113,16 @@ impl<'a> FullnodeConfigBuilder<'a> {
self
}

pub fn with_port(mut self, port: u16) -> Self {
self.port = Some(port);
self
}

pub fn with_p2p_port(mut self, port: u16) -> Self {
self.p2p_port = Some(port);
self
}

pub fn with_rpc_port(mut self, port: u16) -> Self {
self.rpc_port = Some(port);
self
Expand All @@ -113,6 +133,11 @@ impl<'a> FullnodeConfigBuilder<'a> {
self
}

pub fn with_admin_port(mut self, port: u16) -> Self {
self.admin_port = Some(port);
self
}

pub fn set_event_store(mut self, status: bool) -> Self {
self.enable_event_store = status;
self
Expand Down Expand Up @@ -149,13 +174,18 @@ impl<'a> FullnodeConfigBuilder<'a> {
let network_address = format!(
"/ip4/{}/tcp/{}/http",
listen_ip,
utils::get_available_port(&listen_ip_str)
self.port
.unwrap_or_else(|| utils::get_available_port(&listen_ip_str))
)
.parse()
.unwrap();

let p2p_config = {
let address = SocketAddr::new(listen_ip, utils::get_available_port(&listen_ip_str));
let address = SocketAddr::new(
listen_ip,
self.p2p_port
.unwrap_or_else(|| utils::get_available_port(&listen_ip_str)),
);
let seed_peers = validator_configs
.iter()
.map(|config| SeedPeer {
Expand Down Expand Up @@ -191,7 +221,9 @@ impl<'a> FullnodeConfigBuilder<'a> {
metrics_address: utils::available_local_socket_address(),
// TODO: admin server is hard coded to start on 127.0.0.1 - we should probably
// provide the entire socket address here to avoid confusion.
admin_interface_port: utils::get_available_port("127.0.0.1"),
admin_interface_port: self
.admin_port
.unwrap_or_else(|| utils::get_available_port("127.0.0.1")),
json_rpc_address,
consensus_config: None,
enable_event_processing: self.enable_event_store,
Expand Down
16 changes: 11 additions & 5 deletions crates/sui-core/src/quorum_driver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
mod metrics;
pub use metrics::*;

pub mod reconfig_observer;

use arc_swap::ArcSwap;
use std::collections::{BTreeMap, BTreeSet};
use std::fmt::{Debug, Formatter};
Expand Down Expand Up @@ -300,12 +302,12 @@ where
Ok(response)
}

pub async fn update_validators(
&self,
new_validators: Arc<AuthorityAggregator<A>>,
) -> SuiResult {
pub async fn update_validators(&self, new_validators: Arc<AuthorityAggregator<A>>) {
info!(
"Quorum Drvier updating AuthorityAggregator with committee {:?}",
new_validators.committee
);
self.validators.store(new_validators);
Ok(())
}

// TODO currently this function is not epoch-boundary-safe. We need to make it so.
Expand Down Expand Up @@ -609,6 +611,10 @@ where
self.quorum_driver.authority_aggregator()
}

pub fn current_epoch(&self) -> EpochId {
self.quorum_driver.current_epoch()
}

/// Process a QuorumDriverTask.
/// The function has no return value - the corresponding actions of task result
/// are performed in this call.
Expand Down
105 changes: 105 additions & 0 deletions crates/sui-core/src/quorum_driver/reconfig_observer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use async_trait::async_trait;
use std::sync::Arc;
use sui_types::committee::Committee;
use tokio::sync::broadcast::error::RecvError;
use tracing::{info, warn};

use crate::{
authority::AuthorityStore,
authority_aggregator::{AuthAggMetrics, AuthorityAggregator},
authority_client::NetworkAuthorityClient,
epoch::committee_store::CommitteeStore,
safe_client::SafeClientMetricsBase,
};

use super::QuorumDriver;

#[async_trait]
pub trait ReconfigObserver<A> {
async fn run(&mut self, quorum_driver: Arc<QuorumDriver<A>>);
}

pub struct OnsiteReconfigObserver {
reconfig_rx: tokio::sync::broadcast::Receiver<Committee>,
authority_store: Arc<AuthorityStore>,
committee_store: Arc<CommitteeStore>,
safe_client_metrics_base: SafeClientMetricsBase,
auth_agg_metrics: AuthAggMetrics,
}

impl OnsiteReconfigObserver {
pub fn new(
reconfig_rx: tokio::sync::broadcast::Receiver<Committee>,
authority_store: Arc<AuthorityStore>,
committee_store: Arc<CommitteeStore>,
safe_client_metrics_base: SafeClientMetricsBase,
auth_agg_metrics: AuthAggMetrics,
) -> Self {
Self {
reconfig_rx,
authority_store,
committee_store,
safe_client_metrics_base,
auth_agg_metrics,
}
}

async fn create_authority_aggregator_from_system_state(
&self,
) -> AuthorityAggregator<NetworkAuthorityClient> {
AuthorityAggregator::new_from_system_state(
&self.authority_store,
&self.committee_store,
self.safe_client_metrics_base.clone(),
self.auth_agg_metrics.clone(),
)
// TODO: we should tolerate when <= f validators give invalid addresses
// GH issue: https://github.com/MystenLabs/sui/issues/7019
.unwrap_or_else(|e| {
panic!(
"Failed to create AuthorityAggregator from System State: {:?}",
e
)
})
}
}

#[async_trait]
impl ReconfigObserver<NetworkAuthorityClient> for OnsiteReconfigObserver {
async fn run(&mut self, quorum_driver: Arc<QuorumDriver<NetworkAuthorityClient>>) {
// A tiny optimization: when a very stale node just starts, the
// channel may fill up committees quickly. Here we skip directly to
// the last known committee by looking at SuiSystemState.
let authority_agg = self.create_authority_aggregator_from_system_state().await;
if authority_agg.committee.epoch > quorum_driver.current_epoch() {
quorum_driver
.update_validators(Arc::new(authority_agg))
.await;
}
loop {
match self.reconfig_rx.recv().await {
Ok(committee) => {
info!("Got reconfig message: {}", committee);
if committee.epoch > quorum_driver.current_epoch() {
let authority_agg =
self.create_authority_aggregator_from_system_state().await;
quorum_driver
.update_validators(Arc::new(authority_agg))
.await;
} else {
// This should only happen when the node just starts
warn!("Epoch number decreased - ignoring committee: {}", committee);
}
}
// It's ok to miss messages due to overflow here
Err(RecvError::Lagged(_)) => {
continue;
}
Err(RecvError::Closed) => panic!("Do not expect the channel to be closed"),
}
}
}
}
3 changes: 1 addition & 2 deletions crates/sui-core/src/quorum_driver/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,7 @@ async fn test_quorum_driver_update_validators_and_max_retry_times() {
aggregator.committee.epoch = 10;
quorum_driver_clone
.update_validators(Arc::new(aggregator))
.await
.unwrap();
.await;
assert_eq!(
quorum_driver_handler.clone_quorum_driver().current_epoch(),
10
Expand Down
1 change: 1 addition & 0 deletions crates/sui-core/src/safe_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ macro_rules! check_error {
}
}

#[derive(Clone)]
pub struct SafeClientMetricsBase {
total_requests_by_address_method: IntCounterVec,
total_responses_by_address_method: IntCounterVec,
Expand Down
4 changes: 4 additions & 0 deletions crates/sui-core/src/transaction_orchestrator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,10 @@ where
&self.quorum_driver_handler
}

pub fn clone_quorum_driver(&self) -> Arc<QuorumDriverHandler<A>> {
self.quorum_driver_handler.clone()
}

pub fn clone_authority_aggregator(&self) -> Arc<AuthorityAggregator<A>> {
self.quorum_driver().authority_aggregator().load_full()
}
Expand Down
20 changes: 18 additions & 2 deletions crates/sui-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ pub struct SuiNode {
_discovery: discovery::Handle,
state_sync: state_sync::Handle,
checkpoint_store: Arc<CheckpointStore>,
_checkpoint_executor_handle: checkpoint_executor::Handle,
checkpoint_executor_handle: checkpoint_executor::Handle,

reconfig_channel: Mutex<tokio::sync::broadcast::Receiver<Committee>>,

Expand Down Expand Up @@ -261,7 +261,7 @@ impl SuiNode {
_discovery: discovery_handle,
state_sync: state_sync_handle,
checkpoint_store,
_checkpoint_executor_handle: checkpoint_executor_handle,
checkpoint_executor_handle,
reconfig_channel: Mutex::new(reconfig_channel),

#[cfg(msim)]
Expand All @@ -276,6 +276,14 @@ impl SuiNode {
Ok(node)
}

pub async fn subscribe_to_epoch_change(&self) -> tokio::sync::broadcast::Receiver<Committee> {
self.checkpoint_executor_handle.subscribe_to_end_of_epoch()
}

pub fn current_epoch(&self) -> EpochId {
self.state.epoch()
}

pub async fn close_epoch(&self) -> SuiResult {
self.validator_components
.lock()
Expand Down Expand Up @@ -596,6 +604,14 @@ impl SuiNode {
self.state.clone()
}

pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
self.state.committee_store().clone()
}

pub fn clone_authority_store(&self) -> Arc<AuthorityStore> {
self.state.db()
}

/// Clone an AuthorityAggregator currently used in this node's
/// QuorumDriver, if the node is a fullnode. After reconfig,
/// QuorumDrvier builds a new AuthorityAggregator. The caller
Expand Down
67 changes: 67 additions & 0 deletions crates/sui/tests/onsite_reconfig_observer_tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

#![allow(clippy::async_yields_async)]
use prometheus::Registry;
use sui_core::authority_aggregator::AuthAggMetrics;
use sui_core::quorum_driver::reconfig_observer::OnsiteReconfigObserver;
use sui_core::quorum_driver::reconfig_observer::ReconfigObserver;
use sui_core::safe_client::SafeClientMetricsBase;
use test_utils::authority::{spawn_fullnodes, spawn_test_authorities, test_authority_configs};
use test_utils::network::wait_for_nodes_transition_to_epoch;
use tracing::info;

use sui_macros::sim_test;

#[sim_test]
async fn test_onsite_reconfig_observer_basic() {
telemetry_subscribers::init_for_testing();
let config = test_authority_configs();
let authorities = spawn_test_authorities([].into_iter(), &config).await;
let fullnodes = spawn_fullnodes(&config, 1).await;
let fullnode = &fullnodes[0];

let _observer_handle = fullnode
.with_async(|node| async {
let qd = node
.transaction_orchestrator()
.unwrap()
.clone_quorum_driver();
assert_eq!(qd.current_epoch(), 0);
let rx = node.subscribe_to_epoch_change().await;
let registry = Registry::new();
let mut observer = OnsiteReconfigObserver::new(
rx,
node.clone_authority_store(),
node.clone_committee_store(),
SafeClientMetricsBase::new(&registry),
AuthAggMetrics::new(&registry),
);
let qd_clone = qd.clone_quorum_driver();
tokio::task::spawn(async move { observer.run(qd_clone).await })
})
.await;
info!("Shutting down epoch 0");
for handle in &authorities {
handle
.with_async(|node| async { node.close_epoch().await.unwrap() })
.await;
}
// Wait for all nodes to reach the next epoch.
info!("Waiting for nodes to advance to epoch 1");
wait_for_nodes_transition_to_epoch(authorities.iter().chain(fullnodes.iter()), 1).await;

// Give it some time for the update to happen
tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
fullnode.with(|node| {
let qd = node
.transaction_orchestrator()
.unwrap()
.clone_quorum_driver();
assert_eq!(qd.current_epoch(), 1);
assert_eq!(
node.clone_authority_aggregator().unwrap().committee.epoch,
1
);
});
}
Loading

0 comments on commit fa4b7fc

Please sign in to comment.