Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Traffic Control] Add dynamic config via admin api #20887

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use crate::execution_cache::ExecutionCacheTraitPointers;
use crate::execution_cache::TransactionCacheRead;
use crate::jsonrpc_index::CoinIndexKey2;
use crate::rpc_index::RpcIndexStore;
use crate::traffic_controller::metrics::TrafficControllerMetrics;
use crate::traffic_controller::TrafficController;
use crate::transaction_outputs::TransactionOutputs;
use crate::verify_indexes::verify_indexes;
use anyhow::anyhow;
Expand Down Expand Up @@ -58,6 +60,9 @@ use sui_types::layout_resolver::into_struct_layout;
use sui_types::layout_resolver::LayoutResolver;
use sui_types::messages_consensus::{AuthorityCapabilitiesV1, AuthorityCapabilitiesV2};
use sui_types::object::bounded_visitor::BoundedVisitor;
use sui_types::traffic_control::{
PolicyConfig, RemoteFirewallConfig, TrafficControlReconfigParams,
};
use sui_types::transaction_executor::SimulateTransactionResult;
use tap::TapFallible;
use tokio::sync::mpsc::unbounded_channel;
Expand Down Expand Up @@ -831,6 +836,9 @@ pub struct AuthorityState {

/// The chain identifier is derived from the digest of the genesis checkpoint.
chain_identifier: ChainIdentifier,

/// Traffic controller for Sui core servers (json-rpc, validator service)
pub traffic_controller: Option<Arc<TrafficController>>,
williampsmith marked this conversation as resolved.
Show resolved Hide resolved
}

/// The authority state encapsulates all state, drives execution, and ensures safety.
Expand Down Expand Up @@ -1509,6 +1517,16 @@ impl AuthorityState {
Ok((effects, timings, execution_error_opt))
}

pub async fn reconfigure_traffic_control(
&self,
params: TrafficControlReconfigParams,
) -> Result<(), SuiError> {
if let Some(traffic_controller) = self.traffic_controller.as_ref() {
traffic_controller.admin_reconfigure(params).await?;
}
Ok(())
}

#[instrument(level = "trace", skip_all)]
fn commit_certificate(
&self,
Expand Down Expand Up @@ -2860,6 +2878,7 @@ impl AuthorityState {
}

#[allow(clippy::disallowed_methods)] // allow unbounded_channel()
#[allow(clippy::too_many_arguments)]
pub async fn new(
name: AuthorityName,
secret: StableSyncAuthoritySigner,
Expand All @@ -2880,6 +2899,8 @@ impl AuthorityState {
validator_tx_finalizer: Option<Arc<ValidatorTxFinalizer<NetworkAuthorityClient>>>,
chain_identifier: ChainIdentifier,
pruner_db: Option<Arc<AuthorityPrunerTables>>,
policy_config: Option<PolicyConfig>,
firewall_config: Option<RemoteFirewallConfig>,
) -> Arc<Self> {
Self::check_protocol_version(supported_protocol_versions, epoch_store.protocol_version());

Expand Down Expand Up @@ -2916,6 +2937,20 @@ impl AuthorityState {
let input_loader =
TransactionInputLoader::new(execution_cache_trait_pointers.object_cache_reader.clone());
let epoch = epoch_store.epoch();
let traffic_controller_metrics =
Arc::new(TrafficControllerMetrics::new(prometheus_registry));
let traffic_controller = if let Some(policy_config) = policy_config {
Some(Arc::new(
TrafficController::init(
policy_config,
traffic_controller_metrics,
firewall_config.clone(),
)
.await,
))
} else {
None
};
let state = Arc::new(AuthorityState {
name,
secret,
Expand All @@ -2938,6 +2973,7 @@ impl AuthorityState {
overload_info: AuthorityOverloadInfo::default(),
validator_tx_finalizer,
chain_identifier,
traffic_controller,
});

// Start a task to execute ready certificates.
Expand Down
4 changes: 4 additions & 0 deletions crates/sui-core/src/authority/test_authority_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,8 @@ impl<'a> TestAuthorityBuilder<'a> {
config.authority_store_pruning_config = pruning_config;

let chain_identifier = ChainIdentifier::from(*genesis.checkpoint().digest());
let policy_config = config.policy_config.clone();
let firewall_config = config.firewall_config.clone();

let state = AuthorityState::new(
name,
Expand All @@ -351,6 +353,8 @@ impl<'a> TestAuthorityBuilder<'a> {
None,
chain_identifier,
pruner_db,
policy_config,
firewall_config,
)
.await;

Expand Down
50 changes: 21 additions & 29 deletions crates/sui-core/src/authority_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use sui_types::messages_grpc::{
};
use sui_types::multiaddr::Multiaddr;
use sui_types::sui_system_state::SuiSystemState;
use sui_types::traffic_control::{ClientIdSource, PolicyConfig, RemoteFirewallConfig, Weight};
use sui_types::traffic_control::{ClientIdSource, Weight};
use sui_types::{effects::TransactionEffectsAPI, messages_grpc::HandleTransactionRequestV2};
use sui_types::{error::*, transaction::*};
use sui_types::{
Expand All @@ -47,6 +47,7 @@ use tap::TapFallible;
use tonic::metadata::{Ascii, MetadataValue};
use tracing::{error, error_span, info, Instrument};

use crate::consensus_adapter::ConnectionMonitorStatusForTests;
use crate::{
authority::authority_per_epoch_store::AuthorityPerEpochStore,
mysticeti_adapter::LazyMysticetiClient,
Expand All @@ -58,10 +59,6 @@ use crate::{
traffic_controller::policies::TrafficTally,
traffic_controller::TrafficController,
};
use crate::{
consensus_adapter::ConnectionMonitorStatusForTests,
traffic_controller::metrics::TrafficControllerMetrics,
};
use nonempty::{nonempty, NonEmpty};
use sui_config::local_ip_utils::new_local_tcp_address_for_testing;
use tonic::transport::server::TcpConnectInfo;
Expand Down Expand Up @@ -358,22 +355,15 @@ impl ValidatorService {
state: Arc<AuthorityState>,
consensus_adapter: Arc<ConsensusAdapter>,
validator_metrics: Arc<ValidatorServiceMetrics>,
traffic_controller_metrics: TrafficControllerMetrics,
policy_config: Option<PolicyConfig>,
firewall_config: Option<RemoteFirewallConfig>,
client_id_source: Option<ClientIdSource>,
) -> Self {
let traffic_controller = state.traffic_controller.clone();
Self {
state,
consensus_adapter,
metrics: validator_metrics,
traffic_controller: policy_config.clone().map(|policy| {
Arc::new(TrafficController::init(
policy,
traffic_controller_metrics,
firewall_config,
))
}),
client_id_source: policy_config.map(|policy| policy.client_id_source),
traffic_controller,
client_id_source,
}
}

Expand Down Expand Up @@ -1314,7 +1304,7 @@ impl ValidatorService {
}
}

fn handle_traffic_resp<T>(
async fn handle_traffic_resp<T>(
&self,
client: Option<IpAddr>,
wrapped_response: WrappedServiceResponse<T>,
Expand All @@ -1329,17 +1319,19 @@ impl ValidatorService {
};

if let Some(traffic_controller) = self.traffic_controller.clone() {
traffic_controller.tally(TrafficTally {
direct: client,
through_fullnode: None,
error_info: error.map(|e| {
let error_type = String::from(e.clone().as_ref());
let error_weight = normalize(e);
(error_weight, error_type)
}),
spam_weight,
timestamp: SystemTime::now(),
})
traffic_controller
.tally(TrafficTally {
direct: client,
through_fullnode: None,
error_info: error.map(|e| {
let error_type = String::from(e.clone().as_ref());
let error_weight = normalize(e);
(error_weight, error_type)
}),
spam_weight,
timestamp: SystemTime::now(),
})
.await;
}
unwrapped_response
}
Expand Down Expand Up @@ -1390,7 +1382,7 @@ macro_rules! handle_with_decoration {

// handle traffic tallying
let wrapped_response = $self.$func_name($request).await;
$self.handle_traffic_resp(client, wrapped_response)
$self.handle_traffic_resp(client, wrapped_response).await
}};
}

Expand Down
Loading
Loading