diff --git a/crates/sui-core/src/authority.rs b/crates/sui-core/src/authority.rs index 07cd0dc0d1920..448d572c8f543 100644 --- a/crates/sui-core/src/authority.rs +++ b/crates/sui-core/src/authority.rs @@ -7,6 +7,8 @@ use crate::execution_cache::ExecutionCacheTraitPointers; use crate::execution_cache::TransactionCacheRead; use crate::jsonrpc_index::CoinIndexKey2; use crate::rpc_index::RpcIndexStore; +use crate::traffic_controller::metrics::TrafficControllerMetrics; +use crate::traffic_controller::TrafficController; use crate::transaction_outputs::TransactionOutputs; use crate::verify_indexes::verify_indexes; use anyhow::anyhow; @@ -58,6 +60,9 @@ use sui_types::layout_resolver::into_struct_layout; use sui_types::layout_resolver::LayoutResolver; use sui_types::messages_consensus::{AuthorityCapabilitiesV1, AuthorityCapabilitiesV2}; use sui_types::object::bounded_visitor::BoundedVisitor; +use sui_types::traffic_control::{ + PolicyConfig, RemoteFirewallConfig, TrafficControlReconfigParams, +}; use sui_types::transaction_executor::SimulateTransactionResult; use tap::TapFallible; use tokio::sync::mpsc::unbounded_channel; @@ -831,6 +836,9 @@ pub struct AuthorityState { /// The chain identifier is derived from the digest of the genesis checkpoint. chain_identifier: ChainIdentifier, + + /// Traffic controller for Sui core servers (json-rpc, validator service) + pub traffic_controller: Option>, } /// The authority state encapsulates all state, drives execution, and ensures safety. @@ -1509,6 +1517,16 @@ impl AuthorityState { Ok((effects, timings, execution_error_opt)) } + pub async fn reconfigure_traffic_control( + &self, + params: TrafficControlReconfigParams, + ) -> Result<(), SuiError> { + if let Some(traffic_controller) = self.traffic_controller.as_ref() { + traffic_controller.admin_reconfigure(params).await?; + } + Ok(()) + } + #[instrument(level = "trace", skip_all)] fn commit_certificate( &self, @@ -2860,6 +2878,7 @@ impl AuthorityState { } #[allow(clippy::disallowed_methods)] // allow unbounded_channel() + #[allow(clippy::too_many_arguments)] pub async fn new( name: AuthorityName, secret: StableSyncAuthoritySigner, @@ -2880,6 +2899,8 @@ impl AuthorityState { validator_tx_finalizer: Option>>, chain_identifier: ChainIdentifier, pruner_db: Option>, + policy_config: Option, + firewall_config: Option, ) -> Arc { Self::check_protocol_version(supported_protocol_versions, epoch_store.protocol_version()); @@ -2916,6 +2937,20 @@ impl AuthorityState { let input_loader = TransactionInputLoader::new(execution_cache_trait_pointers.object_cache_reader.clone()); let epoch = epoch_store.epoch(); + let traffic_controller_metrics = + Arc::new(TrafficControllerMetrics::new(prometheus_registry)); + let traffic_controller = if let Some(policy_config) = policy_config { + Some(Arc::new( + TrafficController::init( + policy_config, + traffic_controller_metrics, + firewall_config.clone(), + ) + .await, + )) + } else { + None + }; let state = Arc::new(AuthorityState { name, secret, @@ -2938,6 +2973,7 @@ impl AuthorityState { overload_info: AuthorityOverloadInfo::default(), validator_tx_finalizer, chain_identifier, + traffic_controller, }); // Start a task to execute ready certificates. diff --git a/crates/sui-core/src/authority/test_authority_builder.rs b/crates/sui-core/src/authority/test_authority_builder.rs index 00e120a208d6b..5a8a6540f8f35 100644 --- a/crates/sui-core/src/authority/test_authority_builder.rs +++ b/crates/sui-core/src/authority/test_authority_builder.rs @@ -330,6 +330,8 @@ impl<'a> TestAuthorityBuilder<'a> { config.authority_store_pruning_config = pruning_config; let chain_identifier = ChainIdentifier::from(*genesis.checkpoint().digest()); + let policy_config = config.policy_config.clone(); + let firewall_config = config.firewall_config.clone(); let state = AuthorityState::new( name, @@ -351,6 +353,8 @@ impl<'a> TestAuthorityBuilder<'a> { None, chain_identifier, pruner_db, + policy_config, + firewall_config, ) .await; diff --git a/crates/sui-core/src/authority_server.rs b/crates/sui-core/src/authority_server.rs index 8ee1290749965..2e79dde5b9eba 100644 --- a/crates/sui-core/src/authority_server.rs +++ b/crates/sui-core/src/authority_server.rs @@ -34,7 +34,7 @@ use sui_types::messages_grpc::{ }; use sui_types::multiaddr::Multiaddr; use sui_types::sui_system_state::SuiSystemState; -use sui_types::traffic_control::{ClientIdSource, PolicyConfig, RemoteFirewallConfig, Weight}; +use sui_types::traffic_control::{ClientIdSource, Weight}; use sui_types::{effects::TransactionEffectsAPI, messages_grpc::HandleTransactionRequestV2}; use sui_types::{error::*, transaction::*}; use sui_types::{ @@ -47,6 +47,7 @@ use tap::TapFallible; use tonic::metadata::{Ascii, MetadataValue}; use tracing::{error, error_span, info, Instrument}; +use crate::consensus_adapter::ConnectionMonitorStatusForTests; use crate::{ authority::authority_per_epoch_store::AuthorityPerEpochStore, mysticeti_adapter::LazyMysticetiClient, @@ -58,10 +59,6 @@ use crate::{ traffic_controller::policies::TrafficTally, traffic_controller::TrafficController, }; -use crate::{ - consensus_adapter::ConnectionMonitorStatusForTests, - traffic_controller::metrics::TrafficControllerMetrics, -}; use nonempty::{nonempty, NonEmpty}; use sui_config::local_ip_utils::new_local_tcp_address_for_testing; use tonic::transport::server::TcpConnectInfo; @@ -358,22 +355,15 @@ impl ValidatorService { state: Arc, consensus_adapter: Arc, validator_metrics: Arc, - traffic_controller_metrics: TrafficControllerMetrics, - policy_config: Option, - firewall_config: Option, + client_id_source: Option, ) -> Self { + let traffic_controller = state.traffic_controller.clone(); Self { state, consensus_adapter, metrics: validator_metrics, - traffic_controller: policy_config.clone().map(|policy| { - Arc::new(TrafficController::init( - policy, - traffic_controller_metrics, - firewall_config, - )) - }), - client_id_source: policy_config.map(|policy| policy.client_id_source), + traffic_controller, + client_id_source, } } @@ -1314,7 +1304,7 @@ impl ValidatorService { } } - fn handle_traffic_resp( + async fn handle_traffic_resp( &self, client: Option, wrapped_response: WrappedServiceResponse, @@ -1329,17 +1319,19 @@ impl ValidatorService { }; if let Some(traffic_controller) = self.traffic_controller.clone() { - traffic_controller.tally(TrafficTally { - direct: client, - through_fullnode: None, - error_info: error.map(|e| { - let error_type = String::from(e.clone().as_ref()); - let error_weight = normalize(e); - (error_weight, error_type) - }), - spam_weight, - timestamp: SystemTime::now(), - }) + traffic_controller + .tally(TrafficTally { + direct: client, + through_fullnode: None, + error_info: error.map(|e| { + let error_type = String::from(e.clone().as_ref()); + let error_weight = normalize(e); + (error_weight, error_type) + }), + spam_weight, + timestamp: SystemTime::now(), + }) + .await; } unwrapped_response } @@ -1390,7 +1382,7 @@ macro_rules! handle_with_decoration { // handle traffic tallying let wrapped_response = $self.$func_name($request).await; - $self.handle_traffic_resp(client, wrapped_response) + $self.handle_traffic_resp(client, wrapped_response).await }}; } diff --git a/crates/sui-core/src/traffic_controller/mod.rs b/crates/sui-core/src/traffic_controller/mod.rs index a4039ad65887d..145e7a401b680 100644 --- a/crates/sui-core/src/traffic_controller/mod.rs +++ b/crates/sui-core/src/traffic_controller/mod.rs @@ -13,6 +13,7 @@ use std::fs; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::ops::Add; use std::sync::Arc; +use sui_types::error::SuiError; use self::metrics::TrafficControllerMetrics; use crate::traffic_controller::nodefw_client::{BlockAddress, BlockAddresses, NodeFWClient}; @@ -23,9 +24,11 @@ use mysten_metrics::spawn_monitored_task; use rand::Rng; use std::fmt::Debug; use std::time::{Duration, Instant, SystemTime}; -use sui_types::traffic_control::{PolicyConfig, PolicyType, RemoteFirewallConfig, Weight}; -use tokio::sync::mpsc; +use sui_types::traffic_control::{ + PolicyConfig, PolicyType, RemoteFirewallConfig, TrafficControlReconfigParams, Weight, +}; use tokio::sync::mpsc::error::TrySendError; +use tokio::sync::{mpsc, Mutex, RwLock}; use tracing::{debug, error, info, trace, warn}; pub const METRICS_INTERVAL_SECS: u64 = 2; @@ -34,13 +37,13 @@ pub const DEFAULT_DRAIN_TIMEOUT_SECS: u64 = 300; type Blocklist = Arc>; #[derive(Clone)] -struct Blocklists { +pub struct Blocklists { clients: Blocklist, proxied_clients: Blocklist, } #[derive(Clone)] -enum Acl { +pub enum Acl { Blocklists(Blocklists), /// If this variant is set, then we do no tallying or running /// of background tasks, and instead simply block all IPs not @@ -51,10 +54,13 @@ enum Acl { #[derive(Clone)] pub struct TrafficController { - tally_channel: Option>, + tally_channel: Arc>>>, acl: Acl, metrics: Arc, - dry_run_mode: bool, + spam_policy: Option>>, + error_policy: Option>>, + policy_config: Arc>, + fw_config: Option, } impl Debug for TrafficController { @@ -78,12 +84,12 @@ impl Debug for TrafficController { } impl TrafficController { - pub fn init( + pub async fn init( policy_config: PolicyConfig, - metrics: TrafficControllerMetrics, + metrics: Arc, fw_config: Option, ) -> Self { - match policy_config.allow_list { + match policy_config.allow_list.clone() { Some(allow_list) => { let allowlist = allow_list .into_iter() @@ -94,48 +100,92 @@ impl TrafficController { }) .collect(); Self { - tally_channel: None, + tally_channel: Arc::new(Mutex::new(None)), acl: Acl::Allowlist(allowlist), - metrics: Arc::new(metrics), - dry_run_mode: policy_config.dry_run, + metrics, + policy_config: Arc::new(RwLock::new(policy_config)), + fw_config, + spam_policy: None, + error_policy: None, } } - None => Self::spawn(policy_config, metrics, fw_config), + None => { + let spam_policy = Arc::new(Mutex::new( + TrafficControlPolicy::from_spam_config(policy_config.clone()).await, + )); + let error_policy = Arc::new(Mutex::new( + TrafficControlPolicy::from_error_config(policy_config.clone()).await, + )); + let this = Self { + tally_channel: Arc::new(Mutex::new(None)), + acl: Acl::Blocklists(Blocklists { + clients: Arc::new(DashMap::new()), + proxied_clients: Arc::new(DashMap::new()), + }), + metrics, + policy_config: Arc::new(RwLock::new(policy_config)), + fw_config, + spam_policy: Some(spam_policy), + error_policy: Some(error_policy), + }; + this.spawn().await; + this + } } } - fn spawn( + pub async fn init_for_test( policy_config: PolicyConfig, - metrics: TrafficControllerMetrics, fw_config: Option, ) -> Self { - let metrics = Arc::new(metrics); - Self::set_policy_config_metrics(&policy_config, metrics.clone()); + let metrics = Arc::new(TrafficControllerMetrics::new(&prometheus::Registry::new())); + Self::init(policy_config, metrics, fw_config).await + } + + async fn spawn(&self) { + let policy_config = { self.policy_config.read().await.clone() }; + Self::set_policy_config_metrics(&policy_config, self.metrics.clone()); let (tx, rx) = mpsc::channel(policy_config.channel_capacity); // Memoized drainfile existence state. This is passed into delegation // funtions to prevent them from continuing to populate blocklists // if drain is set, as otherwise it will grow without bounds // without the firewall running to periodically clear it. - let mem_drainfile_present = fw_config + let mem_drainfile_present = self + .fw_config .as_ref() .map(|config| config.drain_path.exists()) .unwrap_or(false); - metrics + self.metrics .deadmans_switch_enabled .set(mem_drainfile_present as i64); - let blocklists = Blocklists { - clients: Arc::new(DashMap::new()), - proxied_clients: Arc::new(DashMap::new()), + let blocklists = match self.acl.clone() { + Acl::Blocklists(blocklists) => blocklists, + Acl::Allowlist(_) => panic!("Allowlist ACL should not exist on spawn"), }; let tally_loop_blocklists = blocklists.clone(); let clear_loop_blocklists = blocklists.clone(); - let tally_loop_metrics = metrics.clone(); - let clear_loop_metrics = metrics.clone(); - let dry_run_mode = policy_config.dry_run; + let tally_loop_metrics = self.metrics.clone(); + let clear_loop_metrics = self.metrics.clone(); + let tally_loop_policy_config = policy_config.clone(); + let tally_loop_fw_config = self.fw_config.clone(); + + let spam_policy = self + .spam_policy + .clone() + .expect("spam policy should exist on spawn"); + let error_policy = self + .error_policy + .clone() + .expect("error policy should exist on spawn"); + let spam_policy_clone = spam_policy.clone(); + let error_policy_clone = error_policy.clone(); + spawn_monitored_task!(run_tally_loop( rx, - policy_config, - fw_config, + tally_loop_policy_config, + spam_policy_clone, + error_policy_clone, + tally_loop_fw_config, tally_loop_blocklists, tally_loop_metrics, mem_drainfile_present, @@ -144,12 +194,70 @@ impl TrafficController { clear_loop_blocklists, clear_loop_metrics, )); - Self { - tally_channel: Some(tx), - acl: Acl::Blocklists(blocklists), - metrics: metrics.clone(), - dry_run_mode, + self.open_tally_channel(tx).await; + } + + pub async fn admin_reconfigure( + &self, + params: TrafficControlReconfigParams, + ) -> Result<(), SuiError> { + let TrafficControlReconfigParams { + error_threshold, + spam_threshold, + dry_run, + } = params; + if let Some(error_threshold) = error_threshold { + match *self.error_policy.as_ref().unwrap().lock().await { + TrafficControlPolicy::FreqThreshold(ref mut policy) => { + policy.client_threshold = error_threshold; + if let Some(dry_run) = dry_run { + policy.config.dry_run = dry_run; + } + } + TrafficControlPolicy::TestNConnIP(ref mut policy) => { + policy.threshold = error_threshold; + if let Some(dry_run) = dry_run { + policy.config.dry_run = dry_run; + } + } + _ => { + return Err(SuiError::InvalidAdminRequest( + "Unsupported prior error policy type during traffic control reconfiguration" + .to_string(), + )); + } + } } + if let Some(spam_threshold) = spam_threshold { + match *self.spam_policy.as_ref().unwrap().lock().await { + TrafficControlPolicy::FreqThreshold(ref mut policy) => { + policy.client_threshold = spam_threshold; + if let Some(dry_run) = dry_run { + policy.config.dry_run = dry_run; + } + } + TrafficControlPolicy::TestNConnIP(ref mut policy) => { + policy.threshold = spam_threshold; + if let Some(dry_run) = dry_run { + policy.config.dry_run = dry_run; + } + } + _ => { + return Err(SuiError::InvalidAdminRequest( + "Unsupported prior spam policy type during traffic control reconfiguration" + .to_string(), + )); + } + } + } + if let Some(dry_run) = dry_run { + self.policy_config.write().await.dry_run = dry_run; + } + Ok(()) + } + + async fn open_tally_channel(&self, tx: mpsc::Sender) { + self.tally_channel.lock().await.replace(tx); } fn set_policy_config_metrics( @@ -174,16 +282,8 @@ impl TrafficController { } } - pub fn init_for_test( - policy_config: PolicyConfig, - fw_config: Option, - ) -> Self { - let metrics = TrafficControllerMetrics::new(&prometheus::Registry::new()); - Self::init(policy_config, metrics, fw_config) - } - - pub fn tally(&self, tally: TrafficTally) { - if let Some(channel) = self.tally_channel.as_ref() { + pub async fn tally(&self, tally: TrafficTally) { + if let Some(channel) = self.tally_channel.lock().await.as_ref() { // Use try_send rather than send mainly to avoid creating backpressure // on the caller if the channel is full, which may slow down the critical // path. Dropping the tally on the floor should be ok, as in this case @@ -202,13 +302,16 @@ impl TrafficController { } Ok(_) => {} } + } else { + warn!("TrafficController not yet accepting tally requests."); } } /// Handle check with dry-run mode considered pub async fn check(&self, client: &Option, proxied_client: &Option) -> bool { + let policy_config = { self.policy_config.read().await.clone() }; let check_with_dry_run_maybe = |allowed| -> bool { - match (allowed, self.dry_run_mode()) { + match (allowed, policy_config.dry_run) { // check succeeded (true, _) => true, // check failed while in dry-run mode @@ -258,10 +361,6 @@ impl TrafficController { client_check && proxied_client_check } - pub fn dry_run_mode(&self) -> bool { - self.dry_run_mode - } - async fn check_and_clear_blocklist( &self, client: &Option, @@ -316,13 +415,13 @@ async fn run_clear_blocklists_loop(blocklists: Blocklists, metrics: Arc, policy_config: PolicyConfig, + spam_policy: Arc>, + error_policy: Arc>, fw_config: Option, blocklists: Blocklists, metrics: Arc, mut mem_drainfile_present: bool, ) { - let mut spam_policy = TrafficControlPolicy::from_spam_config(policy_config.clone()).await; - let mut error_policy = TrafficControlPolicy::from_error_config(policy_config.clone()).await; let spam_blocklists = Arc::new(blocklists.clone()); let error_blocklists = Arc::new(blocklists); let node_fw_client = fw_config @@ -343,7 +442,7 @@ async fn run_tally_loop( Some(tally) => { // TODO: spawn a task to handle tallying concurrently if let Err(err) = handle_spam_tally( - &mut spam_policy, + spam_policy.clone(), &policy_config, &node_fw_client, &fw_config, @@ -356,7 +455,7 @@ async fn run_tally_loop( warn!("Error handling spam tally: {}", err); } if let Err(err) = handle_error_tally( - &mut error_policy, + error_policy.clone(), &policy_config, &node_fw_client, &fw_config, @@ -396,7 +495,8 @@ async fn run_tally_loop( // every N seconds, we update metrics and logging that would be too // spammy to be handled while processing each tally if metric_timer.elapsed() > Duration::from_secs(METRICS_INTERVAL_SECS) { - if let TrafficControlPolicy::FreqThreshold(spam_policy) = &spam_policy { + if let TrafficControlPolicy::FreqThreshold(ref spam_policy) = *spam_policy.lock().await + { if let Some(highest_direct_rate) = spam_policy.highest_direct_rate() { metrics .highest_direct_spam_rate @@ -413,7 +513,9 @@ async fn run_tally_loop( ); } } - if let TrafficControlPolicy::FreqThreshold(error_policy) = &error_policy { + if let TrafficControlPolicy::FreqThreshold(ref error_policy) = + *error_policy.lock().await + { if let Some(highest_direct_rate) = error_policy.highest_direct_rate() { metrics .highest_direct_error_rate @@ -439,7 +541,7 @@ async fn run_tally_loop( } async fn handle_error_tally( - policy: &mut TrafficControlPolicy, + policy: Arc>, policy_config: &PolicyConfig, nodefw_client: &Option, fw_config: &Option, @@ -463,7 +565,7 @@ async fn handle_error_tally( .tally_error_types .with_label_values(&[error_type.as_str()]) .inc(); - let resp = policy.handle_tally(tally); + let resp = policy.lock().await.handle_tally(tally); metrics.error_tally_handled.inc(); if let Some(fw_config) = fw_config { if fw_config.delegate_error_blocking && !mem_drainfile_present { @@ -485,7 +587,7 @@ async fn handle_error_tally( } async fn handle_spam_tally( - policy: &mut TrafficControlPolicy, + policy: Arc>, policy_config: &PolicyConfig, nodefw_client: &Option, fw_config: &Option, @@ -497,7 +599,7 @@ async fn handle_spam_tally( if !(tally.spam_weight.is_sampled() && policy_config.spam_sample_rate.is_sampled()) { return Ok(()); } - let resp = policy.handle_tally(tally.clone()); + let resp = policy.lock().await.handle_tally(tally.clone()); metrics.tally_handled.inc(); if let Some(fw_config) = fw_config { if fw_config.delegate_spam_blocking && !mem_drainfile_present { @@ -689,7 +791,7 @@ impl TrafficSim { assert!(per_client_tps > 0); assert!(duration.as_secs() > 0); - let controller = TrafficController::init_for_test(policy.clone(), None); + let controller = TrafficController::init_for_test(policy.clone(), None).await; let tasks = (0..num_clients).map(|task_num| { tokio::spawn(Self::run_single_client( controller.clone(), @@ -777,14 +879,16 @@ impl TrafficSim { total_time_blocked += time_blocked_start.elapsed(); currently_blocked = false; } - controller.tally(TrafficTally::new( - client, - // TODO add proxy IP for testing - None, - // TODO add weight adjustments - None, - Weight::one(), - )); + controller + .tally(TrafficTally::new( + client, + // TODO add proxy IP for testing + None, + // TODO add weight adjustments + None, + Weight::one(), + )) + .await; } else { if !currently_blocked { time_blocked_start = Instant::now(); diff --git a/crates/sui-core/src/traffic_controller/policies.rs b/crates/sui-core/src/traffic_controller/policies.rs index 86fb432f13312..b741b64397d70 100644 --- a/crates/sui-core/src/traffic_controller/policies.rs +++ b/crates/sui-core/src/traffic_controller/policies.rs @@ -14,7 +14,7 @@ use std::hash::Hash; use std::time::Duration; use std::time::{Instant, SystemTime}; use sui_types::traffic_control::{FreqThresholdConfig, PolicyConfig, PolicyType, Weight}; -use tracing::{info, trace}; +use tracing::{info, trace, warn}; const HIGHEST_RATES_CAPACITY: usize = 20; @@ -317,10 +317,10 @@ impl TrafficControlPolicy { ////////////// *** Policy definitions *** ////////////// pub struct FreqThresholdPolicy { - config: PolicyConfig, + pub config: PolicyConfig, + pub client_threshold: u64, + pub proxied_client_threshold: u64, sketch: TrafficSketch, - client_threshold: u64, - proxied_client_threshold: u64, /// Unique salt to be added to all keys in the sketch. This /// ensures that false positives are not correlated across /// all nodes at the same time. For Sui validators for example, @@ -439,13 +439,14 @@ impl NoOpPolicy { #[derive(Clone)] pub struct TestNConnIPPolicy { - config: PolicyConfig, + pub threshold: u64, + pub config: PolicyConfig, frequencies: Arc>>, - threshold: u64, } impl TestNConnIPPolicy { pub async fn new(config: PolicyConfig, threshold: u64) -> Self { + warn!("TESTING -- Creating new TestNConnIPPolicy"); let frequencies = Arc::new(RwLock::new(HashMap::new())); let frequencies_clone = frequencies.clone(); spawn_monitored_task!(run_clear_frequencies( @@ -461,8 +462,10 @@ impl TestNConnIPPolicy { fn handle_tally(&mut self, tally: TrafficTally) -> PolicyResponse { let client = if let Some(client) = tally.direct { + warn!("TESTING -- Handling tally for client: {:?}", client); client } else { + warn!("TESTING -- No client to handle tally for"); return PolicyResponse::default(); }; @@ -470,10 +473,16 @@ impl TestNConnIPPolicy { let mut frequencies = self.frequencies.write(); let count = frequencies.entry(client).or_insert(0); *count += 1; + warn!( + "TESTING -- Count for client: {:?}, count: {:?}", + client, *count + ); PolicyResponse { block_client: if *count >= self.threshold { + warn!("TESTING -- Blocking client: {:?}", client); Some(client) } else { + warn!("TESTING -- Not blocking client: {:?}", client); None }, block_proxied_client: None, @@ -488,6 +497,7 @@ impl TestNConnIPPolicy { async fn run_clear_frequencies(frequencies: Arc>>, window_secs: u64) { loop { tokio::time::sleep(tokio::time::Duration::from_secs(window_secs)).await; + warn!("TESTING -- Clearing frequencies"); frequencies.write().clear(); } } diff --git a/crates/sui-e2e-tests/tests/traffic_control_tests.rs b/crates/sui-e2e-tests/tests/traffic_control_tests.rs index 71470781cb0bb..a92e2c282d40c 100644 --- a/crates/sui-e2e-tests/tests/traffic_control_tests.rs +++ b/crates/sui-e2e-tests/tests/traffic_control_tests.rs @@ -22,6 +22,7 @@ use sui_macros::sim_test; use sui_network::default_mysten_network_config; use sui_swarm_config::network_config_builder::ConfigBuilder; use sui_test_transaction_builder::batch_make_transfer_transactions; +use sui_types::traffic_control::TrafficControlReconfigParams; use sui_types::{ crypto::Ed25519SuiSignature, quorum_driver_types::ExecuteTransactionRequestType, @@ -34,6 +35,7 @@ use test_cluster::{TestCluster, TestClusterBuilder}; #[tokio::test] async fn test_validator_traffic_control_noop() -> Result<(), anyhow::Error> { + telemetry_subscribers::init_for_testing(); let policy_config = PolicyConfig { connection_blocklist_ttl_sec: 1, proxy_blocklist_ttl_sec: 5, @@ -58,6 +60,7 @@ async fn test_validator_traffic_control_noop() -> Result<(), anyhow::Error> { #[tokio::test] async fn test_fullnode_traffic_control_noop() -> Result<(), anyhow::Error> { + telemetry_subscribers::init_for_testing(); let policy_config = PolicyConfig { connection_blocklist_ttl_sec: 1, proxy_blocklist_ttl_sec: 5, @@ -77,6 +80,7 @@ async fn test_fullnode_traffic_control_noop() -> Result<(), anyhow::Error> { #[tokio::test] async fn test_validator_traffic_control_ok() -> Result<(), anyhow::Error> { + telemetry_subscribers::init_for_testing(); let policy_config = PolicyConfig { connection_blocklist_ttl_sec: 1, proxy_blocklist_ttl_sec: 5, @@ -102,6 +106,7 @@ async fn test_validator_traffic_control_ok() -> Result<(), anyhow::Error> { #[tokio::test] async fn test_fullnode_traffic_control_ok() -> Result<(), anyhow::Error> { + telemetry_subscribers::init_for_testing(); let policy_config = PolicyConfig { connection_blocklist_ttl_sec: 1, proxy_blocklist_ttl_sec: 5, @@ -122,6 +127,7 @@ async fn test_fullnode_traffic_control_ok() -> Result<(), anyhow::Error> { #[tokio::test] async fn test_validator_traffic_control_dry_run() -> Result<(), anyhow::Error> { + telemetry_subscribers::init_for_testing(); let n = 5; let policy_config = PolicyConfig { connection_blocklist_ttl_sec: 1, @@ -148,6 +154,7 @@ async fn test_validator_traffic_control_dry_run() -> Result<(), anyhow::Error> { #[tokio::test] async fn test_fullnode_traffic_control_dry_run() -> Result<(), anyhow::Error> { + telemetry_subscribers::init_for_testing(); let txn_count = 15; let policy_config = PolicyConfig { connection_blocklist_ttl_sec: 1, @@ -211,6 +218,7 @@ async fn test_fullnode_traffic_control_dry_run() -> Result<(), anyhow::Error> { #[tokio::test] async fn test_validator_traffic_control_error_blocked() -> Result<(), anyhow::Error> { + telemetry_subscribers::init_for_testing(); let n = 5; let policy_config = PolicyConfig { connection_blocklist_ttl_sec: 1, @@ -254,8 +262,77 @@ async fn test_validator_traffic_control_error_blocked() -> Result<(), anyhow::Er panic!("Expected error policy to trigger within {n} requests"); } +#[tokio::test] +async fn test_validator_traffic_control_error_blocked_with_policy_reconfig( +) -> Result<(), anyhow::Error> { + telemetry_subscribers::init_for_testing(); + let n = 5; + let policy_config = PolicyConfig { + connection_blocklist_ttl_sec: 100, + error_policy_type: PolicyType::TestNConnIP(n - 1), + dry_run: true, + ..Default::default() + }; + let network_config = ConfigBuilder::new_with_temp_dir() + .committee_size(NonZeroUsize::new(4).unwrap()) + .with_policy_config(Some(policy_config)) + .build(); + let committee = network_config.committee_with_network(); + let test_cluster = TestClusterBuilder::new() + .set_network_config(network_config) + .build() + .await; + let local_clients = make_network_authority_clients_with_network_config( + &committee, + &default_mysten_network_config(), + ); + let (_, auth_client) = local_clients.first_key_value().unwrap(); + + let mut txns = batch_make_transfer_transactions(&test_cluster.wallet, n as usize).await; + let mut tx = txns.swap_remove(0); + let signatures = tx.tx_signatures_mut_for_testing(); + signatures.pop(); + signatures.push(GenericSignature::Signature( + sui_types::crypto::Signature::Ed25519SuiSignature(Ed25519SuiSignature::default()), + )); + + // Before reconfiguring the policy, we should not block any requests due to dry run mode, + // even after far exceeding the threshold. However the blocklist should be updated. + for _ in 0..(2 * n) { + let response = auth_client.handle_transaction(tx.clone(), None).await; + if let Err(err) = response { + assert!( + !err.to_string().contains("Too many requests"), + "Expected no blocked requests due to dry run mode" + ); + } + } + // Reconfigure traffic control to disable dry run mode + for node in test_cluster.all_validator_handles() { + node.state() + .reconfigure_traffic_control(TrafficControlReconfigParams { + error_threshold: None, + spam_threshold: None, + dry_run: Some(false), + }) + .await + .unwrap(); + } + tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; + // If Node and TrafficController has not crashed, blocklist and policy freq state should still + // be intact. A single additional erroneous request from the client should trigger enforcement. + let response = auth_client.handle_transaction(tx.clone(), None).await; + if let Err(err) = response { + if err.to_string().contains("Too many requests") { + return Ok(()); + } + } + panic!("Expected error policy to trigger on next requests after reconfiguration"); +} + #[tokio::test] async fn test_fullnode_traffic_control_spam_blocked() -> Result<(), anyhow::Error> { + telemetry_subscribers::init_for_testing(); let txn_count = 15; let policy_config = PolicyConfig { connection_blocklist_ttl_sec: 3, @@ -324,6 +401,7 @@ async fn test_fullnode_traffic_control_spam_blocked() -> Result<(), anyhow::Erro #[tokio::test] async fn test_fullnode_traffic_control_error_blocked() -> Result<(), anyhow::Error> { + telemetry_subscribers::init_for_testing(); let txn_count = 5; let policy_config = PolicyConfig { connection_blocklist_ttl_sec: 3, @@ -381,6 +459,7 @@ async fn test_fullnode_traffic_control_error_blocked() -> Result<(), anyhow::Err #[tokio::test] async fn test_validator_traffic_control_error_delegated() -> Result<(), anyhow::Error> { + telemetry_subscribers::init_for_testing(); let n = 5; let port = 65000; let policy_config = PolicyConfig { @@ -450,6 +529,7 @@ async fn test_validator_traffic_control_error_delegated() -> Result<(), anyhow:: #[tokio::test] async fn test_fullnode_traffic_control_spam_delegated() -> Result<(), anyhow::Error> { + telemetry_subscribers::init_for_testing(); let txn_count = 10; let port = 65001; let policy_config = PolicyConfig { @@ -530,6 +610,7 @@ async fn test_fullnode_traffic_control_spam_delegated() -> Result<(), anyhow::Er #[tokio::test] async fn test_traffic_control_dead_mans_switch() -> Result<(), anyhow::Error> { + telemetry_subscribers::init_for_testing(); let policy_config = PolicyConfig { connection_blocklist_ttl_sec: 3, spam_policy_type: PolicyType::TestNConnIP(10), @@ -586,6 +667,7 @@ async fn test_traffic_control_dead_mans_switch() -> Result<(), anyhow::Error> { #[tokio::test] async fn test_traffic_control_manual_set_dead_mans_switch() -> Result<(), anyhow::Error> { + telemetry_subscribers::init_for_testing(); let drain_path = tempfile::tempdir().unwrap().into_path().join("drain"); assert!(!drain_path.exists(), "Expected drain file to not yet exist",); File::create(&drain_path).expect("Failed to touch nodefw drain file"); @@ -597,6 +679,7 @@ async fn test_traffic_control_manual_set_dead_mans_switch() -> Result<(), anyhow #[sim_test] async fn test_traffic_sketch_no_blocks() { + telemetry_subscribers::init_for_testing(); let sketch_config = FreqThresholdConfig { client_threshold: 10_100, proxied_client_threshold: 10_100, @@ -637,6 +720,7 @@ async fn test_traffic_sketch_no_blocks() { #[ignore] #[sim_test] async fn test_traffic_sketch_with_slow_blocks() { + telemetry_subscribers::init_for_testing(); let sketch_config = FreqThresholdConfig { client_threshold: 9_900, proxied_client_threshold: 9_900, @@ -676,6 +760,7 @@ async fn test_traffic_sketch_with_slow_blocks() { #[sim_test] async fn test_traffic_sketch_with_sampled_spam() { + telemetry_subscribers::init_for_testing(); let sketch_config = FreqThresholdConfig { client_threshold: 4_500, proxied_client_threshold: 4_500, @@ -713,6 +798,7 @@ async fn test_traffic_sketch_with_sampled_spam() { #[sim_test] async fn test_traffic_sketch_allowlist_mode() { + telemetry_subscribers::init_for_testing(); let policy_config = PolicyConfig { connection_blocklist_ttl_sec: 1, proxy_blocklist_ttl_sec: 1, @@ -738,6 +824,7 @@ async fn test_traffic_sketch_allowlist_mode() { } async fn assert_traffic_control_ok(mut test_cluster: TestCluster) -> Result<(), anyhow::Error> { + telemetry_subscribers::init_for_testing(); let context = &mut test_cluster.wallet; let jsonrpc_client = &test_cluster.fullnode_handle.rpc_client; diff --git a/crates/sui-json-rpc/src/lib.rs b/crates/sui-json-rpc/src/lib.rs index e540b52fddd5e..7e21e69d01915 100644 --- a/crates/sui-json-rpc/src/lib.rs +++ b/crates/sui-json-rpc/src/lib.rs @@ -15,10 +15,8 @@ use jsonrpsee::RpcModule; use metrics::Metrics; use metrics::MetricsLayer; use prometheus::Registry; -use sui_core::traffic_controller::metrics::TrafficControllerMetrics; use sui_core::traffic_controller::TrafficController; use sui_types::traffic_control::PolicyConfig; -use sui_types::traffic_control::RemoteFirewallConfig; use tokio::runtime::Handle; use tokio_util::sync::CancellationToken; use tower::ServiceBuilder; @@ -63,8 +61,8 @@ pub struct JsonRpcServerBuilder { module: RpcModule<()>, rpc_doc: Project, registry: Registry, + traffic_controller: Option>, policy_config: Option, - firewall_config: Option, } pub fn sui_rpc_doc(version: &str) -> Project { @@ -84,15 +82,15 @@ impl JsonRpcServerBuilder { pub fn new( version: &str, prometheus_registry: &Registry, + traffic_controller: Option>, policy_config: Option, - firewall_config: Option, ) -> Self { Self { module: RpcModule::new(()), rpc_doc: sui_rpc_doc(version), registry: prometheus_registry.clone(), + traffic_controller, policy_config, - firewall_config, } } @@ -175,14 +173,6 @@ impl JsonRpcServerBuilder { let methods_names = module.method_names().collect::>(); let metrics = Arc::new(Metrics::new(&self.registry, &methods_names)); - let traffic_controller_metrics = TrafficControllerMetrics::new(&self.registry); - let traffic_controller = self.policy_config.clone().map(|policy| { - Arc::new(TrafficController::init( - policy, - traffic_controller_metrics, - self.firewall_config.clone(), - )) - }); let client_id_source = self .policy_config .clone() @@ -203,9 +193,13 @@ impl JsonRpcServerBuilder { let (stop_handle, server_handle) = jsonrpsee::server::stop_channel(); std::mem::forget(server_handle); + let traffic_controller = self.traffic_controller.clone(); let rpc_middleware = jsonrpsee::server::middleware::rpc::RpcServiceBuilder::new() .layer_fn(move |s| MetricsLayer::new(s, metrics.clone())) - .layer_fn(move |s| TrafficControllerService::new(s, traffic_controller.clone())); + .layer_fn({ + let traffic_controller = traffic_controller.clone(); + move |s| TrafficControllerService::new(s, traffic_controller.clone()) + }); let service_builder = jsonrpsee::server::ServerBuilder::new() // Since we're not using jsonrpsee's server to actually handle connections this value // is instead limiting the number of concurrent requests and has no impact on the diff --git a/crates/sui-json-rpc/src/traffic_control.rs b/crates/sui-json-rpc/src/traffic_control.rs index adaa8b064b464..7d2d1f59988de 100644 --- a/crates/sui-json-rpc/src/traffic_control.rs +++ b/crates/sui-json-rpc/src/traffic_control.rs @@ -50,7 +50,7 @@ where response } else { let response = service.call(req).await; - handle_traffic_resp(&traffic_controller, client, &response); + handle_traffic_resp(&traffic_controller, client, &response).await; response } } else { @@ -62,7 +62,7 @@ where } async fn handle_traffic_req( - traffic_controller: &TrafficController, + traffic_controller: &Arc, client: &Option, ) -> Result<(), MethodResponse> { if !traffic_controller.check(client, &None).await { @@ -75,30 +75,32 @@ async fn handle_traffic_req( } } -fn handle_traffic_resp( - traffic_controller: &TrafficController, +async fn handle_traffic_resp( + traffic_controller: &Arc, client: Option, response: &MethodResponse, ) { let error = response.as_error_code().map(ErrorCode::from); - traffic_controller.tally(TrafficTally { - direct: client, - through_fullnode: None, - error_info: error.map(|e| { - let error_type = e.to_string(); - let error_weight = normalize(e); - (error_weight, error_type) - }), - // For now, count everything as spam with equal weight - // on the rpc node side, including gas-charging endpoints - // such as `sui_executeTransactionBlock`, as this can enable - // node operators who wish to rate limit their transcation - // traffic and incentivize high volume clients to choose a - // suitable rpc provider (or run their own). Later we may want - // to provide a weight distribution based on the method being called. - spam_weight: Weight::one(), - timestamp: SystemTime::now(), - }); + traffic_controller + .tally(TrafficTally { + direct: client, + through_fullnode: None, + error_info: error.map(|e| { + let error_type = e.to_string(); + let error_weight = normalize(e); + (error_weight, error_type) + }), + // For now, count everything as spam with equal weight + // on the rpc node side, including gas-charging endpoints + // such as `sui_executeTransactionBlock`, as this can enable + // node operators who wish to rate limit their transcation + // traffic and incentivize high volume clients to choose a + // suitable rpc provider (or run their own). Later we may want + // to provide a weight distribution based on the method being called. + spam_weight: Weight::one(), + timestamp: SystemTime::now(), + }) + .await; } // TODO: refine error matching here diff --git a/crates/sui-node/src/admin.rs b/crates/sui-node/src/admin.rs index 3c867e2f994a0..e942c7f19702e 100644 --- a/crates/sui-node/src/admin.rs +++ b/crates/sui-node/src/admin.rs @@ -20,6 +20,7 @@ use sui_types::{ base_types::AuthorityName, crypto::{RandomnessPartialSignature, RandomnessRound, RandomnessSignature}, error::SuiError, + traffic_control::TrafficControlReconfigParams, }; use telemetry_subscribers::TracingHandle; use tokio::sync::oneshot; @@ -68,6 +69,10 @@ use tracing::info; // Inject a full signature from another node, bypassing validity checks. // // $ curl 'http://127.0.0.1:1337/randomness-inject-full-sig?round=123&sigs=base64encodedsig' +// +// Reconfigure traffic control policy +// +// $ curl 'http://127.0.0.1:1337/traffic-control?error_threshold=100&spam_threshold=100&dry_run=true' const LOGGING_ROUTE: &str = "/logging"; const TRACING_ROUTE: &str = "/enable-tracing"; @@ -80,6 +85,7 @@ const NODE_CONFIG: &str = "/node-config"; const RANDOMNESS_PARTIAL_SIGS_ROUTE: &str = "/randomness-partial-sigs"; const RANDOMNESS_INJECT_PARTIAL_SIGS_ROUTE: &str = "/randomness-inject-partial-sigs"; const RANDOMNESS_INJECT_FULL_SIG_ROUTE: &str = "/randomness-inject-full-sig"; +const TRAFFIC_CONTROL: &str = "/traffic-control"; struct AppState { node: Arc, @@ -119,6 +125,7 @@ pub async fn run_admin_server(node: Arc, port: u16, tracing_handle: Tra RANDOMNESS_INJECT_FULL_SIG_ROUTE, post(randomness_inject_full_sig), ) + .route(TRAFFIC_CONTROL, post(traffic_control)) .with_state(Arc::new(app_state)); let socket_address = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port); @@ -443,3 +450,14 @@ async fn randomness_inject_full_sig( Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()), } } + +async fn traffic_control( + State(state): State>, + args: Query, +) -> (StatusCode, String) { + let Query(params) = args; + match state.node.state().reconfigure_traffic_control(params).await { + Ok(()) => (StatusCode::OK, "traffic control configured\n".to_string()), + Err(err) => (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()), + } +} diff --git a/crates/sui-node/src/lib.rs b/crates/sui-node/src/lib.rs index a420b95d4592b..b0f35de043ec0 100644 --- a/crates/sui-node/src/lib.rs +++ b/crates/sui-node/src/lib.rs @@ -39,7 +39,6 @@ use sui_core::epoch::randomness::RandomnessManager; use sui_core::execution_cache::build_execution_cache; use sui_core::state_accumulator::StateAccumulatorMetrics; use sui_core::storage::RestReadStore; -use sui_core::traffic_controller::metrics::TrafficControllerMetrics; use sui_json_rpc::bridge_api::BridgeReadApi; use sui_json_rpc_api::JsonRpcMetrics; use sui_network::randomness; @@ -740,6 +739,8 @@ impl SuiNode { validator_tx_finalizer, chain_identifier, pruner_db, + config.policy_config.clone(), + config.firewall_config.clone(), ) .await; // ensure genesis txn was executed @@ -1523,9 +1524,7 @@ impl SuiNode { state.clone(), consensus_adapter, Arc::new(ValidatorServiceMetrics::new(prometheus_registry)), - TrafficControllerMetrics::new(prometheus_registry), - config.policy_config.clone(), - config.firewall_config.clone(), + config.policy_config.clone().map(|p| p.client_id_source), ); let mut server_conf = mysten_network::config::Config::new(); @@ -2163,11 +2162,12 @@ pub async fn build_http_server( let mut router = axum::Router::new(); let json_rpc_router = { + let traffic_controller = state.traffic_controller.clone(); let mut server = JsonRpcServerBuilder::new( env!("CARGO_PKG_VERSION"), prometheus_registry, + traffic_controller, config.policy_config.clone(), - config.firewall_config.clone(), ); let kv_store = build_kv_store(&state, config, prometheus_registry)?; diff --git a/crates/sui-types/src/error.rs b/crates/sui-types/src/error.rs index 4e3fd07c1a430..ee4b2bc0002c8 100644 --- a/crates/sui-types/src/error.rs +++ b/crates/sui-types/src/error.rs @@ -678,6 +678,9 @@ pub enum SuiError { #[error("Enclave attestation failed: {0}")] AttestationFailedToVerify(String), + + #[error("Invalid admin request: {0}")] + InvalidAdminRequest(String), } #[repr(u64)] diff --git a/crates/sui-types/src/traffic_control.rs b/crates/sui-types/src/traffic_control.rs index 6a70ea3ad28ab..385625fa1afd9 100644 --- a/crates/sui-types/src/traffic_control.rs +++ b/crates/sui-types/src/traffic_control.rs @@ -71,6 +71,13 @@ pub enum ClientIdSource { XForwardedFor(usize), } +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct TrafficControlReconfigParams { + pub error_threshold: Option, + pub spam_threshold: Option, + pub dry_run: Option, +} + #[derive(Clone, Debug, Deserialize, Serialize)] pub struct Weight(f32);