Skip to content

Commit

Permalink
Another rearchitecture: stop controller tasks, mutate config, respawn…
Browse files Browse the repository at this point in the history
…; during downtime we ignore tally requests with log line
  • Loading branch information
williampsmith committed Feb 1, 2025
1 parent d481d67 commit 2bbaaa3
Show file tree
Hide file tree
Showing 6 changed files with 179 additions and 271 deletions.
39 changes: 4 additions & 35 deletions crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -834,7 +834,7 @@ pub struct AuthorityState {
chain_identifier: ChainIdentifier,

/// Traffic controller for Sui core servers (json-rpc, validator service)
pub traffic_controller: Option<Arc<ArcSwap<TrafficController>>>,
pub traffic_controller: Option<Arc<TrafficController>>,
}

/// The authority state encapsulates all state, drives execution, and ensures safety.
Expand Down Expand Up @@ -1510,38 +1510,7 @@ impl AuthorityState {
params: TrafficControlReconfigParams,
) -> Result<(), SuiError> {
if let Some(traffic_controller) = self.traffic_controller.as_ref() {
let (acl, mut config, metrics, fw_config, spam_policy, error_policy) =
traffic_controller.load().exfiltrate().await;
if spam_policy.is_none() {
return Err(SuiError::InvalidAdminRequest(
"spam policy is not previously initialized".to_string(),
));
}
if error_policy.is_none() {
return Err(SuiError::InvalidAdminRequest(
"error policy is not previously initialized".to_string(),
));
}
let mut spam_policy = spam_policy.unwrap();
let mut error_policy = error_policy.unwrap();
TrafficController::admin_reconfigure_policy(
&mut config,
&mut spam_policy,
&mut error_policy,
params,
)?;
let new_traffic_controller = TrafficController::from_state(
acl,
config,
spam_policy,
error_policy,
metrics,
fw_config,
)
.await;
// do atomic swap
let old_traffic_controller = traffic_controller.swap(Arc::new(new_traffic_controller));
old_traffic_controller.shutdown();
traffic_controller.admin_reconfigure(params).await?;
}
Ok(())
}
Expand Down Expand Up @@ -2958,14 +2927,14 @@ impl AuthorityState {
let traffic_controller_metrics =
Arc::new(TrafficControllerMetrics::new(prometheus_registry));
let traffic_controller = if let Some(policy_config) = policy_config {
Some(Arc::new(ArcSwap::new(Arc::new(
Some(Arc::new(
TrafficController::init(
policy_config,
traffic_controller_metrics,
firewall_config.clone(),
)
.await,
))))
))
} else {
None
};
Expand Down
33 changes: 17 additions & 16 deletions crates/sui-core/src/authority_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
// SPDX-License-Identifier: Apache-2.0

use anyhow::Result;
use arc_swap::ArcSwapAny;
use async_trait::async_trait;
use fastcrypto::traits::KeyPair;
use mysten_metrics::spawn_monitored_task;
Expand Down Expand Up @@ -347,7 +346,7 @@ pub struct ValidatorService {
state: Arc<AuthorityState>,
consensus_adapter: Arc<ConsensusAdapter>,
metrics: Arc<ValidatorServiceMetrics>,
traffic_controller: Option<Arc<ArcSwapAny<Arc<TrafficController>>>>,
traffic_controller: Option<Arc<TrafficController>>,
client_id_source: Option<ClientIdSource>,
}

Expand Down Expand Up @@ -1294,7 +1293,7 @@ impl ValidatorService {

async fn handle_traffic_req(&self, client: Option<IpAddr>) -> Result<(), tonic::Status> {
if let Some(traffic_controller) = &self.traffic_controller {
if !traffic_controller.load().check(&client, &None).await {
if !traffic_controller.check(&client, &None).await {
// Entity in blocklist
Err(tonic::Status::from_error(SuiError::TooManyRequests.into()))
} else {
Expand All @@ -1305,7 +1304,7 @@ impl ValidatorService {
}
}

fn handle_traffic_resp<T>(
async fn handle_traffic_resp<T>(
&self,
client: Option<IpAddr>,
wrapped_response: WrappedServiceResponse<T>,
Expand All @@ -1320,17 +1319,19 @@ impl ValidatorService {
};

if let Some(traffic_controller) = self.traffic_controller.clone() {
traffic_controller.load().tally(TrafficTally {
direct: client,
through_fullnode: None,
error_info: error.map(|e| {
let error_type = String::from(e.clone().as_ref());
let error_weight = normalize(e);
(error_weight, error_type)
}),
spam_weight,
timestamp: SystemTime::now(),
})
traffic_controller
.tally(TrafficTally {
direct: client,
through_fullnode: None,
error_info: error.map(|e| {
let error_type = String::from(e.clone().as_ref());
let error_weight = normalize(e);
(error_weight, error_type)
}),
spam_weight,
timestamp: SystemTime::now(),
})
.await;
}
unwrapped_response
}
Expand Down Expand Up @@ -1381,7 +1382,7 @@ macro_rules! handle_with_decoration {

// handle traffic tallying
let wrapped_response = $self.$func_name($request).await;
$self.handle_traffic_resp(client, wrapped_response)
$self.handle_traffic_resp(client, wrapped_response).await
}};
}

Expand Down
Loading

0 comments on commit 2bbaaa3

Please sign in to comment.