diff --git a/crates/sui-core/src/authority.rs b/crates/sui-core/src/authority.rs index 90eabc1a04af2d..51bb9336748edd 100644 --- a/crates/sui-core/src/authority.rs +++ b/crates/sui-core/src/authority.rs @@ -834,7 +834,7 @@ pub struct AuthorityState { chain_identifier: ChainIdentifier, /// Traffic controller for Sui core servers (json-rpc, validator service) - pub traffic_controller: Option>>, + pub traffic_controller: Option>, } /// The authority state encapsulates all state, drives execution, and ensures safety. @@ -1510,38 +1510,7 @@ impl AuthorityState { params: TrafficControlReconfigParams, ) -> Result<(), SuiError> { if let Some(traffic_controller) = self.traffic_controller.as_ref() { - let (acl, mut config, metrics, fw_config, spam_policy, error_policy) = - traffic_controller.load().exfiltrate().await; - if spam_policy.is_none() { - return Err(SuiError::InvalidAdminRequest( - "spam policy is not previously initialized".to_string(), - )); - } - if error_policy.is_none() { - return Err(SuiError::InvalidAdminRequest( - "error policy is not previously initialized".to_string(), - )); - } - let mut spam_policy = spam_policy.unwrap(); - let mut error_policy = error_policy.unwrap(); - TrafficController::admin_reconfigure_policy( - &mut config, - &mut spam_policy, - &mut error_policy, - params, - )?; - let new_traffic_controller = TrafficController::from_state( - acl, - config, - spam_policy, - error_policy, - metrics, - fw_config, - ) - .await; - // do atomic swap - let old_traffic_controller = traffic_controller.swap(Arc::new(new_traffic_controller)); - old_traffic_controller.shutdown(); + traffic_controller.admin_reconfigure(params).await?; } Ok(()) } @@ -2958,14 +2927,14 @@ impl AuthorityState { let traffic_controller_metrics = Arc::new(TrafficControllerMetrics::new(prometheus_registry)); let traffic_controller = if let Some(policy_config) = policy_config { - Some(Arc::new(ArcSwap::new(Arc::new( + Some(Arc::new( TrafficController::init( policy_config, traffic_controller_metrics, firewall_config.clone(), ) .await, - )))) + )) } else { None }; diff --git a/crates/sui-core/src/authority_server.rs b/crates/sui-core/src/authority_server.rs index 10af865563536f..2e79dde5b9ebad 100644 --- a/crates/sui-core/src/authority_server.rs +++ b/crates/sui-core/src/authority_server.rs @@ -3,7 +3,6 @@ // SPDX-License-Identifier: Apache-2.0 use anyhow::Result; -use arc_swap::ArcSwapAny; use async_trait::async_trait; use fastcrypto::traits::KeyPair; use mysten_metrics::spawn_monitored_task; @@ -347,7 +346,7 @@ pub struct ValidatorService { state: Arc, consensus_adapter: Arc, metrics: Arc, - traffic_controller: Option>>>, + traffic_controller: Option>, client_id_source: Option, } @@ -1294,7 +1293,7 @@ impl ValidatorService { async fn handle_traffic_req(&self, client: Option) -> Result<(), tonic::Status> { if let Some(traffic_controller) = &self.traffic_controller { - if !traffic_controller.load().check(&client, &None).await { + if !traffic_controller.check(&client, &None).await { // Entity in blocklist Err(tonic::Status::from_error(SuiError::TooManyRequests.into())) } else { @@ -1305,7 +1304,7 @@ impl ValidatorService { } } - fn handle_traffic_resp( + async fn handle_traffic_resp( &self, client: Option, wrapped_response: WrappedServiceResponse, @@ -1320,17 +1319,19 @@ impl ValidatorService { }; if let Some(traffic_controller) = self.traffic_controller.clone() { - traffic_controller.load().tally(TrafficTally { - direct: client, - through_fullnode: None, - error_info: error.map(|e| { - let error_type = String::from(e.clone().as_ref()); - let error_weight = normalize(e); - (error_weight, error_type) - }), - spam_weight, - timestamp: SystemTime::now(), - }) + traffic_controller + .tally(TrafficTally { + direct: client, + through_fullnode: None, + error_info: error.map(|e| { + let error_type = String::from(e.clone().as_ref()); + let error_weight = normalize(e); + (error_weight, error_type) + }), + spam_weight, + timestamp: SystemTime::now(), + }) + .await; } unwrapped_response } @@ -1381,7 +1382,7 @@ macro_rules! handle_with_decoration { // handle traffic tallying let wrapped_response = $self.$func_name($request).await; - $self.handle_traffic_resp(client, wrapped_response) + $self.handle_traffic_resp(client, wrapped_response).await }}; } diff --git a/crates/sui-core/src/traffic_controller/mod.rs b/crates/sui-core/src/traffic_controller/mod.rs index 2101d857c1a4ac..4abb890130958b 100644 --- a/crates/sui-core/src/traffic_controller/mod.rs +++ b/crates/sui-core/src/traffic_controller/mod.rs @@ -28,7 +28,7 @@ use sui_types::traffic_control::{ PolicyConfig, PolicyType, RemoteFirewallConfig, TrafficControlReconfigParams, Weight, }; use tokio::sync::mpsc::error::TrySendError; -use tokio::sync::{broadcast, mpsc, Mutex}; +use tokio::sync::{broadcast, mpsc, Mutex, RwLock}; use tracing::{debug, error, info, trace, warn}; pub const METRICS_INTERVAL_SECS: u64 = 2; @@ -54,13 +54,12 @@ pub enum Acl { #[derive(Clone)] pub struct TrafficController { - tally_channel: Option>, + tally_channel: Arc>>>, acl: Acl, metrics: Arc, - dry_run_mode: bool, spam_policy: Option>>, error_policy: Option>>, - policy_config: PolicyConfig, + policy_config: Arc>, fw_config: Option, shutdown_sender: Option>, } @@ -102,102 +101,123 @@ impl TrafficController { }) .collect(); Self { - tally_channel: None, + tally_channel: Arc::new(Mutex::new(None)), acl: Acl::Allowlist(allowlist), metrics, - dry_run_mode: policy_config.dry_run, - policy_config, + policy_config: Arc::new(RwLock::new(policy_config)), fw_config, shutdown_sender: None, spam_policy: None, error_policy: None, } } - None => Self::spawn(policy_config, metrics, fw_config).await, + None => { + let spam_policy = Arc::new(Mutex::new( + TrafficControlPolicy::from_spam_config(policy_config.clone()).await, + )); + let error_policy = Arc::new(Mutex::new( + TrafficControlPolicy::from_error_config(policy_config.clone()).await, + )); + let this = Self { + tally_channel: Arc::new(Mutex::new(None)), + acl: Acl::Blocklists(Blocklists { + clients: Arc::new(DashMap::new()), + proxied_clients: Arc::new(DashMap::new()), + }), + metrics, + policy_config: Arc::new(RwLock::new(policy_config)), + fw_config, + shutdown_sender: None, + spam_policy: Some(spam_policy), + error_policy: Some(error_policy), + }; + this.spawn().await; + this + } } } - /// Create a TrafficController with an initial policy and ACL state. - pub async fn from_state( - acl: Acl, + pub async fn init_for_test( policy_config: PolicyConfig, - spam_policy: TrafficControlPolicy, - error_policy: TrafficControlPolicy, - metrics: Arc, fw_config: Option, ) -> Self { - match acl.clone() { - Acl::Allowlist(_) => { - warn!("Provided spam and error policies will be ignored due to allowlist ACL type"); - Self { - tally_channel: None, - acl, - metrics, - dry_run_mode: policy_config.dry_run, - policy_config, - fw_config, - shutdown_sender: None, - spam_policy: None, - error_policy: None, - } - } - Acl::Blocklists(blocklists) => { - Self::spawn_from_state( - policy_config, - spam_policy, - error_policy, - metrics, - fw_config, - blocklists, - ) - .await - } - } + let metrics = Arc::new(TrafficControllerMetrics::new(&prometheus::Registry::new())); + Self::init(policy_config, metrics, fw_config).await } - /// Export state data. Note that this cannot be used for any strong consistency - /// guarantees, but should be sufficient for purposes such as reconfig/hot swap. - pub async fn exfiltrate( - &self, - ) -> ( - Acl, - PolicyConfig, - Arc, - Option, - Option, - Option, - ) { - let spam_policy = if let Some(spam_policy) = self.spam_policy.as_ref() { - Some((*spam_policy.lock().await).clone()) - } else { - None - }; - let error_policy = if let Some(error_policy) = self.error_policy.as_ref() { - Some((*error_policy.lock().await).clone()) - } else { - None + async fn spawn(&self) { + let policy_config = { self.policy_config.read().await.clone() }; + Self::set_policy_config_metrics(&policy_config, self.metrics.clone()); + let (tx, rx) = mpsc::channel(policy_config.channel_capacity); + // Memoized drainfile existence state. This is passed into delegation + // funtions to prevent them from continuing to populate blocklists + // if drain is set, as otherwise it will grow without bounds + // without the firewall running to periodically clear it. + let mem_drainfile_present = self + .fw_config + .as_ref() + .map(|config| config.drain_path.exists()) + .unwrap_or(false); + self.metrics + .deadmans_switch_enabled + .set(mem_drainfile_present as i64); + let blocklists = match self.acl.clone() { + Acl::Blocklists(blocklists) => blocklists, + Acl::Allowlist(_) => panic!("Allowlist ACL should not exist on spawn"), }; - ( - self.acl.clone(), - self.policy_config.clone(), - self.metrics.clone(), - self.fw_config.clone(), - spam_policy, - error_policy, - ) + let tally_loop_blocklists = blocklists.clone(); + let clear_loop_blocklists = blocklists.clone(); + let tally_loop_metrics = self.metrics.clone(); + let clear_loop_metrics = self.metrics.clone(); + let tally_loop_policy_config = policy_config.clone(); + let tally_loop_fw_config = self.fw_config.clone(); + + // Create broadcast channel for shutdown signal + let (shutdown_tx, shutdown_rx1) = broadcast::channel(1); + let shutdown_rx2 = shutdown_tx.subscribe(); + + let spam_policy = self + .spam_policy + .clone() + .expect("spam policy should exist on spawn"); + let error_policy = self + .error_policy + .clone() + .expect("error policy should exist on spawn"); + let spam_policy_clone = spam_policy.clone(); + let error_policy_clone = error_policy.clone(); + + spawn_monitored_task!(run_tally_loop( + rx, + tally_loop_policy_config, + spam_policy_clone, + error_policy_clone, + tally_loop_fw_config, + tally_loop_blocklists, + tally_loop_metrics, + mem_drainfile_present, + shutdown_rx1, + )); + spawn_monitored_task!(run_clear_blocklists_loop( + clear_loop_blocklists, + clear_loop_metrics, + shutdown_rx2, + )); + self.open_tally_channel(tx).await; } - pub fn shutdown(&self) { - if let Some(sender) = &self.shutdown_sender { - // It's ok if there are no receivers - let _ = sender.send(()); - } + pub async fn admin_reconfigure( + &self, + params: TrafficControlReconfigParams, + ) -> Result<(), SuiError> { + self.shutdown(); + self.mutate_policy_config(params).await?; + self.spawn().await; + Ok(()) } - pub fn admin_reconfigure_policy( - config: &mut PolicyConfig, - spam_policy: &mut TrafficControlPolicy, - error_policy: &mut TrafficControlPolicy, + async fn mutate_policy_config( + &self, params: TrafficControlReconfigParams, ) -> Result<(), SuiError> { let TrafficControlReconfigParams { @@ -206,7 +226,7 @@ impl TrafficController { dry_run, } = params; if let Some(error_threshold) = error_threshold { - match *error_policy { + match *self.error_policy.as_ref().unwrap().lock().await { TrafficControlPolicy::FreqThreshold(ref mut policy) => { policy.client_threshold = error_threshold; if let Some(dry_run) = dry_run { @@ -228,7 +248,7 @@ impl TrafficController { } } if let Some(spam_threshold) = spam_threshold { - match *spam_policy { + match *self.spam_policy.as_ref().unwrap().lock().await { TrafficControlPolicy::FreqThreshold(ref mut policy) => { policy.client_threshold = spam_threshold; if let Some(dry_run) = dry_run { @@ -250,93 +270,19 @@ impl TrafficController { } } if let Some(dry_run) = dry_run { - config.dry_run = dry_run; + self.policy_config.write().await.dry_run = dry_run; } Ok(()) } - async fn spawn( - policy_config: PolicyConfig, - metrics: Arc, - fw_config: Option, - ) -> Self { - let blocklists = Blocklists { - clients: Arc::new(DashMap::new()), - proxied_clients: Arc::new(DashMap::new()), - }; - Self::spawn_from_state( - policy_config, - TrafficControlPolicy::from_spam_config(policy_config.clone()).await, - TrafficControlPolicy::from_error_config(policy_config.clone()).await, - metrics, - fw_config, - blocklists, - ) - .await + async fn open_tally_channel(&self, tx: mpsc::Sender) { + self.tally_channel.lock().await.replace(tx); } - async fn spawn_from_state( - policy_config: PolicyConfig, - spam_policy: TrafficControlPolicy, - error_policy: TrafficControlPolicy, - metrics: Arc, - fw_config: Option, - blocklists: Blocklists, - ) -> Self { - Self::set_policy_config_metrics(&policy_config, metrics.clone()); - let (tx, rx) = mpsc::channel(policy_config.channel_capacity); - // Memoized drainfile existence state. This is passed into delegation - // funtions to prevent them from continuing to populate blocklists - // if drain is set, as otherwise it will grow without bounds - // without the firewall running to periodically clear it. - let mem_drainfile_present = fw_config - .as_ref() - .map(|config| config.drain_path.exists()) - .unwrap_or(false); - metrics - .deadmans_switch_enabled - .set(mem_drainfile_present as i64); - let tally_loop_blocklists = blocklists.clone(); - let clear_loop_blocklists = blocklists.clone(); - let tally_loop_metrics = metrics.clone(); - let clear_loop_metrics = metrics.clone(); - let dry_run_mode = policy_config.dry_run; - let tally_loop_policy_config = policy_config.clone(); - let tally_loop_fw_config = fw_config.clone(); - - // Create broadcast channel for shutdown signal - let (shutdown_tx, shutdown_rx1) = broadcast::channel(1); - let shutdown_rx2 = shutdown_tx.subscribe(); - - let spam_policy = Arc::new(Mutex::new(spam_policy)); - let error_policy = Arc::new(Mutex::new(error_policy)); - - spawn_monitored_task!(run_tally_loop( - rx, - tally_loop_policy_config, - spam_policy.clone(), - error_policy.clone(), - tally_loop_fw_config, - tally_loop_blocklists, - tally_loop_metrics, - mem_drainfile_present, - shutdown_rx1, - )); - spawn_monitored_task!(run_clear_blocklists_loop( - clear_loop_blocklists, - clear_loop_metrics, - shutdown_rx2, - )); - Self { - tally_channel: Some(tx), - acl: Acl::Blocklists(blocklists), - metrics, - dry_run_mode, - spam_policy: Some(spam_policy), - error_policy: Some(error_policy), - policy_config, - fw_config, - shutdown_sender: Some(shutdown_tx), + fn shutdown(&self) { + if let Some(sender) = &self.shutdown_sender { + // It's ok if there are no receivers + let _ = sender.send(()); } } @@ -362,16 +308,8 @@ impl TrafficController { } } - pub async fn init_for_test( - policy_config: PolicyConfig, - fw_config: Option, - ) -> Self { - let metrics = Arc::new(TrafficControllerMetrics::new(&prometheus::Registry::new())); - Self::init(policy_config, metrics, fw_config).await - } - - pub fn tally(&self, tally: TrafficTally) { - if let Some(channel) = self.tally_channel.as_ref() { + pub async fn tally(&self, tally: TrafficTally) { + if let Some(channel) = self.tally_channel.lock().await.as_ref() { // Use try_send rather than send mainly to avoid creating backpressure // on the caller if the channel is full, which may slow down the critical // path. Dropping the tally on the floor should be ok, as in this case @@ -390,13 +328,16 @@ impl TrafficController { } Ok(_) => {} } + } else { + warn!("TrafficController not yet accepting tally requests."); } } /// Handle check with dry-run mode considered pub async fn check(&self, client: &Option, proxied_client: &Option) -> bool { + let policy_config = { self.policy_config.read().await.clone() }; let check_with_dry_run_maybe = |allowed| -> bool { - match (allowed, self.dry_run_mode()) { + match (allowed, policy_config.dry_run) { // check succeeded (true, _) => true, // check failed while in dry-run mode @@ -446,10 +387,6 @@ impl TrafficController { client_check && proxied_client_check } - pub fn dry_run_mode(&self) -> bool { - self.dry_run_mode - } - async fn check_and_clear_blocklist( &self, client: &Option, @@ -896,7 +833,7 @@ impl TrafficSim { assert!(per_client_tps > 0); assert!(duration.as_secs() > 0); - let controller = TrafficController::init_for_test(policy.clone(), None); + let controller = TrafficController::init_for_test(policy.clone(), None).await; let tasks = (0..num_clients).map(|task_num| { tokio::spawn(Self::run_single_client( controller.clone(), @@ -984,14 +921,16 @@ impl TrafficSim { total_time_blocked += time_blocked_start.elapsed(); currently_blocked = false; } - controller.tally(TrafficTally::new( - client, - // TODO add proxy IP for testing - None, - // TODO add weight adjustments - None, - Weight::one(), - )); + controller + .tally(TrafficTally::new( + client, + // TODO add proxy IP for testing + None, + // TODO add weight adjustments + None, + Weight::one(), + )) + .await; } else { if !currently_blocked { time_blocked_start = Instant::now(); diff --git a/crates/sui-e2e-tests/tests/traffic_control_tests.rs b/crates/sui-e2e-tests/tests/traffic_control_tests.rs index 671a7c9a0b90cd..1bccb78fcea800 100644 --- a/crates/sui-e2e-tests/tests/traffic_control_tests.rs +++ b/crates/sui-e2e-tests/tests/traffic_control_tests.rs @@ -22,6 +22,7 @@ use sui_macros::sim_test; use sui_network::default_mysten_network_config; use sui_swarm_config::network_config_builder::ConfigBuilder; use sui_test_transaction_builder::batch_make_transfer_transactions; +use sui_types::traffic_control::TrafficControlReconfigParams; use sui_types::{ crypto::Ed25519SuiSignature, quorum_driver_types::ExecuteTransactionRequestType, @@ -312,10 +313,11 @@ async fn test_validator_traffic_control_error_blocked_with_policy_reconfig( node.with(|node| { node.state() .reconfigure_traffic_control(TrafficControlReconfigParams { - policy_config: None, - fw_config: None, + error_threshold: None, + spam_threshold: None, dry_run: Some(false), }) + .await .unwrap(); }); } diff --git a/crates/sui-json-rpc/src/lib.rs b/crates/sui-json-rpc/src/lib.rs index a1de8249c8f366..7d945b7d7a2a8c 100644 --- a/crates/sui-json-rpc/src/lib.rs +++ b/crates/sui-json-rpc/src/lib.rs @@ -5,7 +5,6 @@ use std::env; use std::net::SocketAddr; use std::sync::Arc; -use arc_swap::ArcSwapAny; use axum::body::Body; use axum::http; use hyper::header::HeaderName; @@ -62,7 +61,7 @@ pub struct JsonRpcServerBuilder { module: RpcModule<()>, rpc_doc: Project, registry: Registry, - traffic_controller: Option>>>, + traffic_controller: Option>, policy_config: Option, } @@ -83,7 +82,7 @@ impl JsonRpcServerBuilder { pub fn new( version: &str, prometheus_registry: &Registry, - traffic_controller: Option>>>, + traffic_controller: Option>, policy_config: Option, ) -> Self { Self { diff --git a/crates/sui-json-rpc/src/traffic_control.rs b/crates/sui-json-rpc/src/traffic_control.rs index 8d1e0a79497e28..7d2d1f59988de5 100644 --- a/crates/sui-json-rpc/src/traffic_control.rs +++ b/crates/sui-json-rpc/src/traffic_control.rs @@ -1,7 +1,6 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use arc_swap::ArcSwapAny; use axum::extract::ConnectInfo; use futures::FutureExt; use jsonrpsee::server::middleware::rpc::RpcServiceT; @@ -21,14 +20,11 @@ const TOO_MANY_REQUESTS_MSG: &str = "Too many requests"; #[derive(Clone)] pub struct TrafficControllerService { inner: S, - traffic_controller: Option>>>, + traffic_controller: Option>, } impl TrafficControllerService { - pub fn new( - service: S, - traffic_controller: Option>>>, - ) -> Self { + pub fn new(service: S, traffic_controller: Option>) -> Self { Self { inner: service, traffic_controller, @@ -54,7 +50,7 @@ where response } else { let response = service.call(req).await; - handle_traffic_resp(&traffic_controller, client, &response); + handle_traffic_resp(&traffic_controller, client, &response).await; response } } else { @@ -66,10 +62,10 @@ where } async fn handle_traffic_req( - traffic_controller: &Arc>>, + traffic_controller: &Arc, client: &Option, ) -> Result<(), MethodResponse> { - if !traffic_controller.load().check(client, &None).await { + if !traffic_controller.check(client, &None).await { // Entity in blocklist let err_obj = ErrorObject::borrowed(ErrorCode::ServerIsBusy.code(), TOO_MANY_REQUESTS_MSG, None); @@ -79,30 +75,32 @@ async fn handle_traffic_req( } } -fn handle_traffic_resp( - traffic_controller: &Arc>>, +async fn handle_traffic_resp( + traffic_controller: &Arc, client: Option, response: &MethodResponse, ) { let error = response.as_error_code().map(ErrorCode::from); - traffic_controller.load().tally(TrafficTally { - direct: client, - through_fullnode: None, - error_info: error.map(|e| { - let error_type = e.to_string(); - let error_weight = normalize(e); - (error_weight, error_type) - }), - // For now, count everything as spam with equal weight - // on the rpc node side, including gas-charging endpoints - // such as `sui_executeTransactionBlock`, as this can enable - // node operators who wish to rate limit their transcation - // traffic and incentivize high volume clients to choose a - // suitable rpc provider (or run their own). Later we may want - // to provide a weight distribution based on the method being called. - spam_weight: Weight::one(), - timestamp: SystemTime::now(), - }); + traffic_controller + .tally(TrafficTally { + direct: client, + through_fullnode: None, + error_info: error.map(|e| { + let error_type = e.to_string(); + let error_weight = normalize(e); + (error_weight, error_type) + }), + // For now, count everything as spam with equal weight + // on the rpc node side, including gas-charging endpoints + // such as `sui_executeTransactionBlock`, as this can enable + // node operators who wish to rate limit their transcation + // traffic and incentivize high volume clients to choose a + // suitable rpc provider (or run their own). Later we may want + // to provide a weight distribution based on the method being called. + spam_weight: Weight::one(), + timestamp: SystemTime::now(), + }) + .await; } // TODO: refine error matching here