Skip to content

Commit

Permalink
[Traffic Control] Add dynamic config via admin api
Browse files Browse the repository at this point in the history
  • Loading branch information
williampsmith committed Jan 16, 2025
1 parent 2166353 commit 813327f
Show file tree
Hide file tree
Showing 8 changed files with 196 additions and 53 deletions.
34 changes: 34 additions & 0 deletions crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use crate::execution_cache::ExecutionCacheTraitPointers;
use crate::execution_cache::TransactionCacheRead;
use crate::jsonrpc_index::CoinIndexKey2;
use crate::rpc_index::RpcIndexStore;
use crate::traffic_controller::metrics::TrafficControllerMetrics;
use crate::traffic_controller::TrafficController;
use crate::transaction_outputs::TransactionOutputs;
use crate::verify_indexes::verify_indexes;
use anyhow::anyhow;
Expand Down Expand Up @@ -56,6 +58,8 @@ use sui_types::layout_resolver::into_struct_layout;
use sui_types::layout_resolver::LayoutResolver;
use sui_types::messages_consensus::{AuthorityCapabilitiesV1, AuthorityCapabilitiesV2};
use sui_types::object::bounded_visitor::BoundedVisitor;
use sui_types::traffic_control::PolicyConfig;
use sui_types::traffic_control::RemoteFirewallConfig;
use sui_types::transaction_executor::SimulateTransactionResult;
use tap::TapFallible;
use tokio::sync::mpsc::unbounded_channel;
Expand Down Expand Up @@ -825,6 +829,9 @@ pub struct AuthorityState {

/// The chain identifier is derived from the digest of the genesis checkpoint.
chain_identifier: ChainIdentifier,

/// Traffic controller for Sui core servers (json-rpc, validator service)
pub traffic_controller: Option<Arc<TrafficController>>,
}

/// The authority state encapsulates all state, drives execution, and ensures safety.
Expand Down Expand Up @@ -1491,6 +1498,22 @@ impl AuthorityState {
Ok((effects, execution_error_opt))
}

pub fn reconfigure_traffic_control(
&self,
error_threshold: Option<u64>,
spam_threshold: Option<u64>,
dry_run: Option<bool>,
) -> SuiResult<()> {
self.traffic_controller
.as_ref()
.map(|traffic_controller| {
traffic_controller.reconfigure_no_clear(error_threshold, spam_threshold, dry_run)
})
.unwrap_or(Err(SuiError::InvalidAdminRequest(
"traffic controller not enabled".to_string(),
)))
}

#[instrument(level = "trace", skip_all)]
async fn commit_certificate(
&self,
Expand Down Expand Up @@ -2863,6 +2886,8 @@ impl AuthorityState {
archive_readers: ArchiveReaderBalancer,
validator_tx_finalizer: Option<Arc<ValidatorTxFinalizer<NetworkAuthorityClient>>>,
chain_identifier: ChainIdentifier,
policy_config: Option<PolicyConfig>,
firewall_config: Option<RemoteFirewallConfig>,
) -> Arc<Self> {
Self::check_protocol_version(supported_protocol_versions, epoch_store.protocol_version());

Expand Down Expand Up @@ -2898,6 +2923,14 @@ impl AuthorityState {
let input_loader =
TransactionInputLoader::new(execution_cache_trait_pointers.object_cache_reader.clone());
let epoch = epoch_store.epoch();
let traffic_controller_metrics = TrafficControllerMetrics::new(prometheus_registry);
let traffic_controller = policy_config.clone().map(|policy| {
Arc::new(TrafficController::init(
policy,
traffic_controller_metrics,
firewall_config.clone(),
))
});
let state = Arc::new(AuthorityState {
name,
secret,
Expand All @@ -2920,6 +2953,7 @@ impl AuthorityState {
overload_info: AuthorityOverloadInfo::default(),
validator_tx_finalizer,
chain_identifier,
traffic_controller,
});

// Start a task to execute ready certificates.
Expand Down
4 changes: 4 additions & 0 deletions crates/sui-core/src/authority/test_authority_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,8 @@ impl<'a> TestAuthorityBuilder<'a> {
config.authority_store_pruning_config = pruning_config;

let chain_identifier = ChainIdentifier::from(*genesis.checkpoint().digest());
let policy_config = config.policy_config.clone();
let firewall_config = config.firewall_config.clone();

let state = AuthorityState::new(
name,
Expand All @@ -332,6 +334,8 @@ impl<'a> TestAuthorityBuilder<'a> {
ArchiveReaderBalancer::default(),
None,
chain_identifier,
policy_config,
firewall_config,
)
.await;

Expand Down
22 changes: 6 additions & 16 deletions crates/sui-core/src/authority_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use sui_types::messages_grpc::{
};
use sui_types::multiaddr::Multiaddr;
use sui_types::sui_system_state::SuiSystemState;
use sui_types::traffic_control::{ClientIdSource, PolicyConfig, RemoteFirewallConfig, Weight};
use sui_types::traffic_control::{ClientIdSource, Weight};
use sui_types::{effects::TransactionEffectsAPI, messages_grpc::HandleTransactionRequestV2};
use sui_types::{error::*, transaction::*};
use sui_types::{
Expand All @@ -47,6 +47,7 @@ use tap::TapFallible;
use tonic::metadata::{Ascii, MetadataValue};
use tracing::{error, error_span, info, Instrument};

use crate::consensus_adapter::ConnectionMonitorStatusForTests;
use crate::{
authority::authority_per_epoch_store::AuthorityPerEpochStore,
mysticeti_adapter::LazyMysticetiClient,
Expand All @@ -58,10 +59,6 @@ use crate::{
traffic_controller::policies::TrafficTally,
traffic_controller::TrafficController,
};
use crate::{
consensus_adapter::ConnectionMonitorStatusForTests,
traffic_controller::metrics::TrafficControllerMetrics,
};
use nonempty::{nonempty, NonEmpty};
use sui_config::local_ip_utils::new_local_tcp_address_for_testing;
use tonic::transport::server::TcpConnectInfo;
Expand Down Expand Up @@ -358,22 +355,15 @@ impl ValidatorService {
state: Arc<AuthorityState>,
consensus_adapter: Arc<ConsensusAdapter>,
validator_metrics: Arc<ValidatorServiceMetrics>,
traffic_controller_metrics: TrafficControllerMetrics,
policy_config: Option<PolicyConfig>,
firewall_config: Option<RemoteFirewallConfig>,
client_id_source: Option<ClientIdSource>,
) -> Self {
let traffic_controller = state.traffic_controller.as_ref().cloned();
Self {
state,
consensus_adapter,
metrics: validator_metrics,
traffic_controller: policy_config.clone().map(|policy| {
Arc::new(TrafficController::init(
policy,
traffic_controller_metrics,
firewall_config,
))
}),
client_id_source: policy_config.map(|policy| policy.client_id_source),
traffic_controller,
client_id_source,
}
}

Expand Down
123 changes: 105 additions & 18 deletions crates/sui-core/src/traffic_controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@ pub mod policies;

use dashmap::DashMap;
use fs::File;
use parking_lot::Mutex;
use prometheus::IntGauge;
use std::fs;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::ops::Add;
use std::sync::Arc;
use sui_types::error::{SuiError, SuiResult};

use self::metrics::TrafficControllerMetrics;
use crate::traffic_controller::nodefw_client::{BlockAddress, BlockAddresses, NodeFWClient};
Expand All @@ -23,7 +25,10 @@ use mysten_metrics::spawn_monitored_task;
use rand::Rng;
use std::fmt::Debug;
use std::time::{Duration, Instant, SystemTime};
use sui_types::traffic_control::{PolicyConfig, PolicyType, RemoteFirewallConfig, Weight};
use sui_types::traffic_control::{
FreqThresholdConfig, PolicyConfig, PolicyType, RemoteFirewallConfig, Weight,
};
use tokio::sync::broadcast;
use tokio::sync::mpsc;
use tokio::sync::mpsc::error::TrySendError;
use tracing::{debug, error, info, trace, warn};
Expand Down Expand Up @@ -55,6 +60,8 @@ pub struct TrafficController {
acl: Acl,
metrics: Arc<TrafficControllerMetrics>,
dry_run_mode: bool,
config: Arc<Mutex<PolicyConfig>>,
reconfigure_channel: Option<broadcast::Sender<PolicyConfig>>,
}

impl Debug for TrafficController {
Expand Down Expand Up @@ -83,12 +90,12 @@ impl TrafficController {
metrics: TrafficControllerMetrics,
fw_config: Option<RemoteFirewallConfig>,
) -> Self {
match policy_config.allow_list {
match &policy_config.allow_list {
Some(allow_list) => {
let allowlist = allow_list
.into_iter()
.iter()
.map(|ip_str| {
parse_ip(&ip_str).unwrap_or_else(|| {
parse_ip(ip_str).unwrap_or_else(|| {
panic!("Failed to parse allowlist IP address: {:?}", ip_str)
})
})
Expand All @@ -98,12 +105,68 @@ impl TrafficController {
acl: Acl::Allowlist(allowlist),
metrics: Arc::new(metrics),
dry_run_mode: policy_config.dry_run,
config: Arc::new(Mutex::new(policy_config)),
reconfigure_channel: None,
}
}
None => Self::spawn(policy_config, metrics, fw_config),
}
}

/// Reconfigure traffic control without clearing the blocklists
pub fn reconfigure_no_clear(
&self,
error_threshold: Option<u64>,
spam_threshold: Option<u64>,
dry_run: Option<bool>,
) -> SuiResult<()> {
let mut config = self.config.lock();
if let Some(error_threshold) = error_threshold {
if let PolicyType::FreqThreshold(threshold_config) = &mut config.error_policy_type {
threshold_config.client_threshold = error_threshold;
} else {
let error_policy_type = PolicyType::FreqThreshold(FreqThresholdConfig {
client_threshold: error_threshold,
window_size_secs: 5,
update_interval_secs: 1,
..Default::default()
});
config.error_policy_type = error_policy_type;
}
}
if let Some(spam_threshold) = spam_threshold {
if let PolicyType::FreqThreshold(threshold_config) = &mut config.spam_policy_type {
threshold_config.client_threshold = spam_threshold;
} else {
let spam_policy_type = PolicyType::FreqThreshold(FreqThresholdConfig {
client_threshold: spam_threshold,
window_size_secs: 5,
update_interval_secs: 1,
..Default::default()
});
config.spam_policy_type = spam_policy_type;
}
}
if let Some(dry_run) = dry_run {
config.dry_run = dry_run;
}

self.reconfigure_channel
.as_ref()
.map(|channel| {
channel.send(config.clone()).map_err(|e| {
SuiError::InvalidAdminRequest(format!(
"Failed to send reconfigure message: {}",
e
))
})
})
.unwrap_or(Err(SuiError::InvalidAdminRequest(
"reconfigure channel not enabled".to_string(),
)))?;
Ok(())
}

fn spawn(
policy_config: PolicyConfig,
metrics: TrafficControllerMetrics,
Expand Down Expand Up @@ -132,23 +195,30 @@ impl TrafficController {
let tally_loop_metrics = metrics.clone();
let clear_loop_metrics = metrics.clone();
let dry_run_mode = policy_config.dry_run;
let (reconfigure_tx, reconfigure_rx) = broadcast::channel(4);
let reconfigure_rx2 = reconfigure_tx.subscribe();
let policy_config_clone = policy_config.clone();
spawn_monitored_task!(run_tally_loop(
rx,
policy_config,
policy_config_clone,
fw_config,
tally_loop_blocklists,
tally_loop_metrics,
mem_drainfile_present,
reconfigure_rx,
));
spawn_monitored_task!(run_clear_blocklists_loop(
clear_loop_blocklists,
clear_loop_metrics,
reconfigure_rx2,
));
Self {
tally_channel: Some(tx),
acl: Acl::Blocklists(blocklists),
metrics: metrics.clone(),
dry_run_mode,
config: Arc::new(Mutex::new(policy_config)),
reconfigure_channel: Some(reconfigure_tx),
}
}

Expand Down Expand Up @@ -296,20 +366,30 @@ impl TrafficController {
/// never checked again. This function runs periodically to clear out any
/// such stale IPs. This also ensures that the blocklist length metric
/// accurately reflects TTL.
async fn run_clear_blocklists_loop(blocklists: Blocklists, metrics: Arc<TrafficControllerMetrics>) {
async fn run_clear_blocklists_loop(
blocklists: Blocklists,
metrics: Arc<TrafficControllerMetrics>,
mut reconfigure_rx: broadcast::Receiver<PolicyConfig>,
) {
loop {
tokio::time::sleep(Duration::from_secs(3)).await;
let now = SystemTime::now();
blocklists.clients.retain(|_, expiration| now < *expiration);
blocklists
.proxied_clients
.retain(|_, expiration| now < *expiration);
metrics
.connection_ip_blocklist_len
.set(blocklists.clients.len() as i64);
metrics
.proxy_ip_blocklist_len
.set(blocklists.proxied_clients.len() as i64);
tokio::select! {
_ = tokio::time::sleep(Duration::from_secs(3)) => {
let now = SystemTime::now();
blocklists.clients.retain(|_, expiration| now < *expiration);
blocklists
.proxied_clients
.retain(|_, expiration| now < *expiration);
metrics
.connection_ip_blocklist_len
.set(blocklists.clients.len() as i64);
metrics
.proxy_ip_blocklist_len
.set(blocklists.proxied_clients.len() as i64);
}
Ok(config) = reconfigure_rx.recv() => {
TrafficController::set_policy_config_metrics(&config, metrics.clone());
}
}
}
}

Expand All @@ -320,6 +400,7 @@ async fn run_tally_loop(
blocklists: Blocklists,
metrics: Arc<TrafficControllerMetrics>,
mut mem_drainfile_present: bool,
mut reconfigure_rx: broadcast::Receiver<PolicyConfig>,
) {
let mut spam_policy = TrafficControlPolicy::from_spam_config(policy_config.clone()).await;
let mut error_policy = TrafficControlPolicy::from_error_config(policy_config.clone()).await;
Expand Down Expand Up @@ -391,6 +472,12 @@ async fn run_tally_loop(
}
}
}
// process channel for receiving new policy config
Ok(config) = reconfigure_rx.recv() => {
// mutate the policy config
spam_policy = TrafficControlPolicy::from_spam_config(config.clone()).await;
error_policy = TrafficControlPolicy::from_error_config(config.clone()).await;
}
}

// every N seconds, we update metrics and logging that would be too
Expand Down
Loading

0 comments on commit 813327f

Please sign in to comment.