Skip to content

Commit

Permalink
add reconfig observer
Browse files Browse the repository at this point in the history
  • Loading branch information
longbowlu committed Dec 25, 2022
1 parent d8571ae commit ef60bc0
Show file tree
Hide file tree
Showing 14 changed files with 319 additions and 38 deletions.
7 changes: 2 additions & 5 deletions crates/sui-benchmark/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,11 +173,8 @@ impl ValidatorProxy for LocalValidatorAggregatorProxy {
err
),
Ok(auth_agg) => {
if let Err(err) = self.qd.update_validators(Arc::new(auth_agg)).await {
error!("Reconfiguration - Error when updating authority aggregator in quorum driver: {}", err);
} else {
info!("Reconfiguration - Reconfiguration to epoch {new_epoch} is done");
}
self.qd.update_validators(Arc::new(auth_agg)).await;
info!("Reconfiguration - Reconfiguration to epoch {new_epoch} is done");
}
}
}
Expand Down
49 changes: 43 additions & 6 deletions crates/sui-core/src/authority_aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,36 @@ impl<A> AuthorityAggregator<A> {
}
}

pub fn new_with_metrics(
committee: Committee,
committee_store: Arc<CommitteeStore>,
authority_clients: BTreeMap<AuthorityName, A>,
safe_client_metrics_base: SafeClientMetricsBase,
auth_agg_metrics: AuthAggMetrics,
) -> Self {
Self {
committee,
authority_clients: authority_clients
.into_iter()
.map(|(name, api)| {
(
name,
SafeClient::new(
api,
committee_store.clone(),
name,
SafeClientMetrics::new(&safe_client_metrics_base, name),
),
)
})
.collect(),
metrics: auth_agg_metrics,
safe_client_metrics_base: Arc::new(safe_client_metrics_base),
timeouts: Default::default(),
committee_store,
}
}

/// This function recreates AuthorityAggregator with the given committee.
/// It also updates committee store which impacts other of its references.
/// If it is called on a Validator/Fullnode, it **may** interleave with the the authority active's
Expand Down Expand Up @@ -388,19 +418,24 @@ impl AuthorityAggregator<NetworkAuthorityClient> {
pub fn new_from_system_state(
store: &Arc<AuthorityStore>,
committee_store: &Arc<CommitteeStore>,
prometheus_registry: &Registry,
safe_client_metrics_base: SafeClientMetricsBase,
auth_agg_metrics: AuthAggMetrics,
) -> anyhow::Result<Self> {
let net_config = default_mysten_network_config();
let sui_system_state = store.get_sui_system_state_object()?;

// TODO: the function returns on URL parsing errors. In this case we should
// tolerate it as long as we have 2f+1 good validators.
// GH issue: https://github.com/MystenLabs/sui/issues/7019
let authority_clients =
make_network_authority_client_sets_from_system_state(&sui_system_state, &net_config)?;
Ok(Self::new(
let res = Self::new_with_metrics(
sui_system_state.get_current_epoch_committee().committee,
committee_store.clone(),
authority_clients,
prometheus_registry,
))
safe_client_metrics_base,
auth_agg_metrics,
);
Ok(res)
}
}

Expand Down Expand Up @@ -449,10 +484,12 @@ impl TransactionCertifier for NetworkTransactionCertifier {
self_state: &AuthorityState,
timeout: Duration,
) -> anyhow::Result<VerifiedCertificate> {
let registry = Registry::new();
let net = AuthorityAggregator::new_from_system_state(
&self_state.db(),
self_state.committee_store(),
&Registry::new(),
SafeClientMetricsBase::new(&registry),
AuthAggMetrics::new(&registry),
)?;

net.authorty_ask_for_cert_with_retry_and_timeout(transaction, self_state, timeout)
Expand Down
2 changes: 2 additions & 0 deletions crates/sui-core/src/authority_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,8 @@ impl AuthorityAPI for NetworkAuthorityClient {
}
}

// This function errs on URL parsing error. This may happen
// when a validator provides a bad URL.
pub fn make_network_authority_client_sets_from_system_state(
sui_system_state: &SuiSystemState,
network_config: &Config,
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-core/src/checkpoints/checkpoint_executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ mod metrics;
pub(crate) mod tests;

const TASKS_PER_CORE: usize = 1;
const END_OF_EPOCH_BROADCAST_CHANNEL_CAPACITY: usize = 2;
const END_OF_EPOCH_BROADCAST_CHANNEL_CAPACITY: usize = 100;
const LOCAL_EXECUTION_TIMEOUT: Duration = Duration::from_secs(10);

type CheckpointExecutionBuffer = FuturesOrdered<
Expand Down
16 changes: 11 additions & 5 deletions crates/sui-core/src/quorum_driver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
mod metrics;
pub use metrics::*;

pub mod reconfig_observer;

use arc_swap::ArcSwap;
use std::collections::{BTreeMap, BTreeSet};
use std::fmt::{Debug, Formatter};
Expand Down Expand Up @@ -300,12 +302,12 @@ where
Ok(response)
}

pub async fn update_validators(
&self,
new_validators: Arc<AuthorityAggregator<A>>,
) -> SuiResult {
pub async fn update_validators(&self, new_validators: Arc<AuthorityAggregator<A>>) {
info!(
"Quorum Drvier updating AuthorityAggregator with committee {:?}",
new_validators.committee
);
self.validators.store(new_validators);
Ok(())
}

// TODO currently this function is not epoch-boundary-safe. We need to make it so.
Expand Down Expand Up @@ -609,6 +611,10 @@ where
self.quorum_driver.authority_aggregator()
}

pub fn current_epoch(&self) -> EpochId {
self.quorum_driver.current_epoch()
}

/// Process a QuorumDriverTask.
/// The function has no return value - the corresponding actions of task result
/// are performed in this call.
Expand Down
105 changes: 105 additions & 0 deletions crates/sui-core/src/quorum_driver/reconfig_observer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use async_trait::async_trait;
use std::sync::Arc;
use sui_types::committee::Committee;
use tracing::{info, warn};

use crate::{
authority::AuthorityStore,
authority_aggregator::{AuthAggMetrics, AuthorityAggregator},
authority_client::NetworkAuthorityClient,
epoch::committee_store::CommitteeStore,
safe_client::SafeClientMetricsBase,
};

use super::QuorumDriver;

#[async_trait]
pub trait ReconfigObserver<A> {
async fn run(&mut self, quorum_driver: Arc<QuorumDriver<A>>);
}

pub struct OnsiteReconfigObserver {
reconfig_rx: tokio::sync::broadcast::Receiver<Committee>,
authority_store: Arc<AuthorityStore>,
committee_store: Arc<CommitteeStore>,
safe_client_metrics_base: SafeClientMetricsBase,
auth_agg_metrics: AuthAggMetrics,
}

impl OnsiteReconfigObserver {
pub fn new(
reconfig_rx: tokio::sync::broadcast::Receiver<Committee>,
authority_store: Arc<AuthorityStore>,
committee_store: Arc<CommitteeStore>,
safe_client_metrics_base: SafeClientMetricsBase,
auth_agg_metrics: AuthAggMetrics,
) -> Self {
Self {
reconfig_rx,
authority_store,
committee_store,
safe_client_metrics_base,
auth_agg_metrics,
}
}

async fn create_authority_aggregator_from_system_state(
&self,
) -> AuthorityAggregator<NetworkAuthorityClient> {
AuthorityAggregator::new_from_system_state(
&self.authority_store,
&self.committee_store,
self.safe_client_metrics_base.clone(),
self.auth_agg_metrics.clone(),
)
// TODO: we should tolerate when <= f validators give invalid addresses
// GH issue: https://github.com/MystenLabs/sui/issues/7019
.unwrap_or_else(|e| {
warn!("panicked? {:?}", e);
panic!(
"Failed to create AuthorityAggregator from System State: {:?}",
e
)
})
}
}

#[async_trait]
impl ReconfigObserver<NetworkAuthorityClient> for OnsiteReconfigObserver {
async fn run(&mut self, quorum_driver: Arc<QuorumDriver<NetworkAuthorityClient>>) {
// A tiny optimization: when the a very stale node just starts, the
// channel may fill up committees quickly. Here we skip directly to
// the last known committee by looking at SuiSystemState.
let authority_agg = self.create_authority_aggregator_from_system_state().await;
if authority_agg.committee.epoch > quorum_driver.current_epoch() {
quorum_driver
.update_validators(Arc::new(authority_agg))
.await;
}
loop {
match self.reconfig_rx.recv().await {
Ok(committee) => {
info!("Got reconfig message: {}", committee);
if committee.epoch > quorum_driver.current_epoch() {
let authority_agg =
self.create_authority_aggregator_from_system_state().await;
quorum_driver
.update_validators(Arc::new(authority_agg))
.await;
} else {
// This should only happen when the node just starts
warn!("Ignored non-newer from reconfig channel: {}", committee);
}
}
// Neither closed channel nor lagged shall happen
Err(other_err) => panic!(
"Got unexpected error from reconfig broadcast channel: {:?}",
other_err
),
}
}
}
}
3 changes: 1 addition & 2 deletions crates/sui-core/src/quorum_driver/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,7 @@ async fn test_quorum_driver_update_validators_and_max_retry_times() {
aggregator.committee.epoch = 10;
quorum_driver_clone
.update_validators(Arc::new(aggregator))
.await
.unwrap();
.await;
assert_eq!(
quorum_driver_handler.clone_quorum_driver().current_epoch(),
10
Expand Down
1 change: 1 addition & 0 deletions crates/sui-core/src/safe_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ macro_rules! check_error {
}
}

#[derive(Clone)]
pub struct SafeClientMetricsBase {
total_requests_by_address_method: IntCounterVec,
total_responses_by_address_method: IntCounterVec,
Expand Down
4 changes: 4 additions & 0 deletions crates/sui-core/src/transaction_orchestrator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,10 @@ where
&self.quorum_driver_handler
}

pub fn clone_quorum_driver(&self) -> Arc<QuorumDriverHandler<A>> {
self.quorum_driver_handler.clone()
}

pub fn clone_authority_aggregator(&self) -> Arc<AuthorityAggregator<A>> {
self.quorum_driver().authority_aggregator().load_full()
}
Expand Down
25 changes: 23 additions & 2 deletions crates/sui-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@ use prometheus::Registry;
use std::collections::HashMap;
use std::sync::Arc;
use sui_config::{ConsensusConfig, NodeConfig};
use sui_core::authority_aggregator::{AuthorityAggregator, NetworkTransactionCertifier};
use sui_core::authority_aggregator::{
AuthAggMetrics, AuthorityAggregator, NetworkTransactionCertifier,
};
use sui_core::authority_server::ValidatorService;
use sui_core::checkpoints::checkpoint_executor;
use sui_core::epoch::committee_store::CommitteeStore;
use sui_core::safe_client::SafeClientMetricsBase;
use sui_core::storage::RocksDbStore;
use sui_core::transaction_orchestrator::TransactiondOrchestrator;
use sui_core::transaction_streamer::TransactionStreamer;
Expand All @@ -48,6 +51,7 @@ use sui_storage::{
IndexStore,
};
use sui_types::committee::Committee;
use sui_types::committee::EpochId;
use sui_types::crypto::KeypairTraits;
use sui_types::messages::QuorumDriverResponse;
use tokio::sync::mpsc::channel;
Expand Down Expand Up @@ -174,7 +178,8 @@ impl SuiNode {
let arc_net = AuthorityAggregator::new_from_system_state(
&store,
&committee_store,
&prometheus_registry,
SafeClientMetricsBase::new(&prometheus_registry),
AuthAggMetrics::new(&prometheus_registry),
)?;

let transaction_streamer = if is_full_node {
Expand Down Expand Up @@ -264,6 +269,14 @@ impl SuiNode {
Ok(node)
}

pub async fn subscribe_to_epoch_change(&self) -> tokio::sync::broadcast::Receiver<Committee> {
self.reconfig_channel.lock().await.resubscribe()
}

pub fn current_epoch(&self) -> EpochId {
self.state.epoch()
}

pub fn close_epoch(&self) -> SuiResult {
self.validator_components
.load()
Expand Down Expand Up @@ -546,6 +559,14 @@ impl SuiNode {
self.state.clone()
}

pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
self.state.committee_store().clone()
}

pub fn clone_authority_store(&self) -> Arc<AuthorityStore> {
self.state.db()
}

/// Clone an AuthorityAggregator currently used in this node's
/// QuorumDriver, if the node is a fullnode. After reconfig,
/// QuorumDrvier builds a new AuthorityAggregator. The caller
Expand Down
Loading

0 comments on commit ef60bc0

Please sign in to comment.