Skip to content

Commit

Permalink
[Traffic Control] Add dynamic config via admin api
Browse files Browse the repository at this point in the history
  • Loading branch information
williampsmith committed Jan 15, 2025
1 parent 2166353 commit 8e20a05
Show file tree
Hide file tree
Showing 7 changed files with 91 additions and 32 deletions.
33 changes: 33 additions & 0 deletions crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use crate::execution_cache::ExecutionCacheTraitPointers;
use crate::execution_cache::TransactionCacheRead;
use crate::jsonrpc_index::CoinIndexKey2;
use crate::rpc_index::RpcIndexStore;
use crate::traffic_controller::metrics::TrafficControllerMetrics;
use crate::traffic_controller::TrafficController;
use crate::transaction_outputs::TransactionOutputs;
use crate::verify_indexes::verify_indexes;
use anyhow::anyhow;
Expand Down Expand Up @@ -56,6 +58,8 @@ use sui_types::layout_resolver::into_struct_layout;
use sui_types::layout_resolver::LayoutResolver;
use sui_types::messages_consensus::{AuthorityCapabilitiesV1, AuthorityCapabilitiesV2};
use sui_types::object::bounded_visitor::BoundedVisitor;
use sui_types::traffic_control::PolicyConfig;
use sui_types::traffic_control::RemoteFirewallConfig;
use sui_types::transaction_executor::SimulateTransactionResult;
use tap::TapFallible;
use tokio::sync::mpsc::unbounded_channel;
Expand Down Expand Up @@ -825,6 +829,9 @@ pub struct AuthorityState {

/// The chain identifier is derived from the digest of the genesis checkpoint.
chain_identifier: ChainIdentifier,

/// Traffic controller for Sui core servers (json-rpc, validator service)
pub traffic_controller: Option<Arc<TrafficController>>,
}

/// The authority state encapsulates all state, drives execution, and ensures safety.
Expand Down Expand Up @@ -1491,6 +1498,21 @@ impl AuthorityState {
Ok((effects, execution_error_opt))
}

pub fn reconfigure_traffic_control(
&self,
error_threshold: u64,
spam_threshold: u64,
) -> SuiResult<()> {
self.traffic_controller
.as_ref()
.map(|traffic_controller| {
traffic_controller.reconfigure_no_clear(error_threshold, spam_threshold)
})
.unwrap_or(Err(SuiError::InvalidAdminRequest(
"traffic controller not enabled".to_string(),
)))
}

#[instrument(level = "trace", skip_all)]
async fn commit_certificate(
&self,
Expand Down Expand Up @@ -2863,6 +2885,8 @@ impl AuthorityState {
archive_readers: ArchiveReaderBalancer,
validator_tx_finalizer: Option<Arc<ValidatorTxFinalizer<NetworkAuthorityClient>>>,
chain_identifier: ChainIdentifier,
policy_config: Option<PolicyConfig>,
firewall_config: Option<RemoteFirewallConfig>,
) -> Arc<Self> {
Self::check_protocol_version(supported_protocol_versions, epoch_store.protocol_version());

Expand Down Expand Up @@ -2898,6 +2922,14 @@ impl AuthorityState {
let input_loader =
TransactionInputLoader::new(execution_cache_trait_pointers.object_cache_reader.clone());
let epoch = epoch_store.epoch();
let traffic_controller_metrics = TrafficControllerMetrics::new(prometheus_registry);
let traffic_controller = policy_config.clone().map(|policy| {
Arc::new(TrafficController::init(
policy,
traffic_controller_metrics,
firewall_config.clone(),
))
});
let state = Arc::new(AuthorityState {
name,
secret,
Expand All @@ -2920,6 +2952,7 @@ impl AuthorityState {
overload_info: AuthorityOverloadInfo::default(),
validator_tx_finalizer,
chain_identifier,
traffic_controller,
});

// Start a task to execute ready certificates.
Expand Down
22 changes: 6 additions & 16 deletions crates/sui-core/src/authority_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use sui_types::messages_grpc::{
};
use sui_types::multiaddr::Multiaddr;
use sui_types::sui_system_state::SuiSystemState;
use sui_types::traffic_control::{ClientIdSource, PolicyConfig, RemoteFirewallConfig, Weight};
use sui_types::traffic_control::{ClientIdSource, Weight};
use sui_types::{effects::TransactionEffectsAPI, messages_grpc::HandleTransactionRequestV2};
use sui_types::{error::*, transaction::*};
use sui_types::{
Expand All @@ -47,6 +47,7 @@ use tap::TapFallible;
use tonic::metadata::{Ascii, MetadataValue};
use tracing::{error, error_span, info, Instrument};

use crate::consensus_adapter::ConnectionMonitorStatusForTests;
use crate::{
authority::authority_per_epoch_store::AuthorityPerEpochStore,
mysticeti_adapter::LazyMysticetiClient,
Expand All @@ -58,10 +59,6 @@ use crate::{
traffic_controller::policies::TrafficTally,
traffic_controller::TrafficController,
};
use crate::{
consensus_adapter::ConnectionMonitorStatusForTests,
traffic_controller::metrics::TrafficControllerMetrics,
};
use nonempty::{nonempty, NonEmpty};
use sui_config::local_ip_utils::new_local_tcp_address_for_testing;
use tonic::transport::server::TcpConnectInfo;
Expand Down Expand Up @@ -358,22 +355,15 @@ impl ValidatorService {
state: Arc<AuthorityState>,
consensus_adapter: Arc<ConsensusAdapter>,
validator_metrics: Arc<ValidatorServiceMetrics>,
traffic_controller_metrics: TrafficControllerMetrics,
policy_config: Option<PolicyConfig>,
firewall_config: Option<RemoteFirewallConfig>,
client_id_source: Option<ClientIdSource>,
) -> Self {
let traffic_controller = state.traffic_controller.as_ref().cloned();
Self {
state,
consensus_adapter,
metrics: validator_metrics,
traffic_controller: policy_config.clone().map(|policy| {
Arc::new(TrafficController::init(
policy,
traffic_controller_metrics,
firewall_config,
))
}),
client_id_source: policy_config.map(|policy| policy.client_id_source),
traffic_controller,
client_id_source,
}
}

Expand Down
11 changes: 11 additions & 0 deletions crates/sui-core/src/traffic_controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use std::fs;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::ops::Add;
use std::sync::Arc;
use sui_types::error::SuiResult;

use self::metrics::TrafficControllerMetrics;
use crate::traffic_controller::nodefw_client::{BlockAddress, BlockAddresses, NodeFWClient};
Expand Down Expand Up @@ -104,6 +105,16 @@ impl TrafficController {
}
}

/// Reconfigure traffic control without clearing the blocklists
pub fn reconfigure_no_clear(&self, error_threshold: u64, spam_threshold: u64) -> SuiResult<()> {
// TODO: implement. This will likely require introducing a mutex, aquiring it,
// then spawning a new task to run the tally loop with the new config.
// ALTERNATIVELY, can make it so that the policy dynamically reads the threshold
// values from memory whenever it needs to, and we instead save these values in memory
// and do an atomic swap of the values when needed.
todo!()
}

fn spawn(
policy_config: PolicyConfig,
metrics: TrafficControllerMetrics,
Expand Down
16 changes: 4 additions & 12 deletions crates/sui-json-rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ pub struct JsonRpcServerBuilder {
module: RpcModule<()>,
rpc_doc: Project,
registry: Registry,
traffic_controller: Option<Arc<TrafficController>>,
policy_config: Option<PolicyConfig>,
firewall_config: Option<RemoteFirewallConfig>,
}

pub fn sui_rpc_doc(version: &str) -> Project {
Expand All @@ -84,15 +84,15 @@ impl JsonRpcServerBuilder {
pub fn new(
version: &str,
prometheus_registry: &Registry,
traffic_controller: Option<Arc<TrafficController>>,
policy_config: Option<PolicyConfig>,
firewall_config: Option<RemoteFirewallConfig>,
) -> Self {
Self {
module: RpcModule::new(()),
rpc_doc: sui_rpc_doc(version),
registry: prometheus_registry.clone(),
traffic_controller,
policy_config,
firewall_config,
}
}

Expand Down Expand Up @@ -175,14 +175,6 @@ impl JsonRpcServerBuilder {
let methods_names = module.method_names().collect::<Vec<_>>();

let metrics = Arc::new(Metrics::new(&self.registry, &methods_names));
let traffic_controller_metrics = TrafficControllerMetrics::new(&self.registry);
let traffic_controller = self.policy_config.clone().map(|policy| {
Arc::new(TrafficController::init(
policy,
traffic_controller_metrics,
self.firewall_config.clone(),
))
});
let client_id_source = self
.policy_config
.clone()
Expand All @@ -205,7 +197,7 @@ impl JsonRpcServerBuilder {

let rpc_middleware = jsonrpsee::server::middleware::rpc::RpcServiceBuilder::new()
.layer_fn(move |s| MetricsLayer::new(s, metrics.clone()))
.layer_fn(move |s| TrafficControllerService::new(s, traffic_controller.clone()));
.layer_fn(move |s| TrafficControllerService::new(s, self.traffic_controller.clone()));
let service_builder =
jsonrpsee::server::ServerBuilder::new().set_rpc_middleware(rpc_middleware);

Expand Down
29 changes: 29 additions & 0 deletions crates/sui-node/src/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ const NODE_CONFIG: &str = "/node-config";
const RANDOMNESS_PARTIAL_SIGS_ROUTE: &str = "/randomness-partial-sigs";
const RANDOMNESS_INJECT_PARTIAL_SIGS_ROUTE: &str = "/randomness-inject-partial-sigs";
const RANDOMNESS_INJECT_FULL_SIG_ROUTE: &str = "/randomness-inject-full-sig";
const RECONFIGURE_TRAFFIC_CONTROL: &str = "/reconfigure-traffic-control";

struct AppState {
node: Arc<SuiNode>,
Expand Down Expand Up @@ -119,6 +120,10 @@ pub async fn run_admin_server(node: Arc<SuiNode>, port: u16, tracing_handle: Tra
RANDOMNESS_INJECT_FULL_SIG_ROUTE,
post(randomness_inject_full_sig),
)
.route(
RECONFIGURE_TRAFFIC_CONTROL,
post(reconfigure_traffic_control),
)
.with_state(Arc::new(app_state));

let socket_address = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port);
Expand Down Expand Up @@ -443,3 +448,27 @@ async fn randomness_inject_full_sig(
Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()),
}
}

#[derive(Deserialize)]
struct TrafficControlConfig {
error_threshold: u64,
spam_threshold: u64,
}

async fn reconfigure_traffic_control(
State(state): State<Arc<AppState>>,
args: Query<TrafficControlConfig>,
) -> (StatusCode, String) {
let Query(TrafficControlConfig {
error_threshold,
spam_threshold,
}) = args;
match state
.node
.state()
.reconfigure_traffic_control(error_threshold, spam_threshold)
{
Ok(()) => (StatusCode::OK, "traffic control configured\n".to_string()),
Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()),
}
}
9 changes: 5 additions & 4 deletions crates/sui-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -710,6 +710,8 @@ impl SuiNode {
archive_readers,
validator_tx_finalizer,
chain_identifier,
config.policy_config,
config.firewall_config,
)
.await;
// ensure genesis txn was executed
Expand Down Expand Up @@ -1479,9 +1481,7 @@ impl SuiNode {
state.clone(),
consensus_adapter,
Arc::new(ValidatorServiceMetrics::new(prometheus_registry)),
TrafficControllerMetrics::new(prometheus_registry),
config.policy_config.clone(),
config.firewall_config.clone(),
config.policy_config.client_id_source,
);

let mut server_conf = mysten_network::config::Config::new();
Expand Down Expand Up @@ -2031,11 +2031,12 @@ pub async fn build_http_server(
let mut router = axum::Router::new();

let json_rpc_router = {
let traffic_controller = state.traffic_controller.as_ref().cloned();
let mut server = JsonRpcServerBuilder::new(
env!("CARGO_PKG_VERSION"),
prometheus_registry,
traffic_controller,
config.policy_config.clone(),
config.firewall_config.clone(),
);

let kv_store = build_kv_store(&state, config, prometheus_registry)?;
Expand Down
3 changes: 3 additions & 0 deletions crates/sui-types/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -675,6 +675,9 @@ pub enum SuiError {

#[error("The request did not contain a certificate")]
NoCertificateProvidedError,

#[error("Invalid admin request: {0}")]
InvalidAdminRequest(String),
}

#[repr(u64)]
Expand Down

0 comments on commit 8e20a05

Please sign in to comment.