Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set backend config from admin rpc #304

Merged
merged 7 commits into from
May 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 83 additions & 52 deletions core/src/proxy/block_engine_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,10 @@ pub struct BlockBuilderFeeInfo {
pub block_builder_commission: u64,
}

#[derive(Clone, Debug)]
#[derive(Clone, Debug, Default, PartialEq, Eq)]
pub struct BlockEngineConfig {
/// Address to the external auth-service responsible for generating access tokens.
pub auth_service_endpoint: Endpoint,

/// Primary backend endpoint.
pub backend_endpoint: Endpoint,
/// Block Engine URL
pub block_engine_url: String,
jedleggett marked this conversation as resolved.
Show resolved Hide resolved

/// If set then it will be assumed the backend verified packets so signature verification will be bypassed in the validator.
pub trust_packets: bool,
Expand All @@ -90,7 +87,7 @@ pub struct BlockEngineStage {

impl BlockEngineStage {
pub fn new(
block_engine_config: BlockEngineConfig,
block_engine_config: Arc<Mutex<BlockEngineConfig>>,
// Channel that bundles get piped through.
bundle_tx: Sender<Vec<PacketBundle>>,
// The keypair stored here is used to sign auth challenges.
Expand All @@ -105,7 +102,7 @@ impl BlockEngineStage {
let block_builder_fee_info = block_builder_fee_info.clone();

let thread = Builder::new()
.name("block-engine-stage".into())
.name("block-engine-stage".to_string())
.spawn(move || {
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
Expand Down Expand Up @@ -137,7 +134,7 @@ impl BlockEngineStage {

#[allow(clippy::too_many_arguments)]
async fn start(
block_engine_config: BlockEngineConfig,
block_engine_config: Arc<Mutex<BlockEngineConfig>>,
cluster_info: Arc<ClusterInfo>,
bundle_tx: Sender<Vec<PacketBundle>>,
packet_tx: Sender<PacketBatch>,
Expand All @@ -150,7 +147,11 @@ impl BlockEngineStage {
let mut error_count: u64 = 0;

while !exit.load(Ordering::Relaxed) {
if let Err(e) = Self::connect_auth_and_stream(
// Wait until a valid config is supplied (either initially or by admin rpc)
// Use if!/else here to avoid extra CONNECTION_BACKOFF wait on successful termination
if !Self::is_valid_block_engine_config(&block_engine_config.lock().unwrap()) {
sleep(CONNECTION_BACKOFF).await;
} else if let Err(e) = Self::connect_auth_and_stream(
&block_engine_config,
&cluster_info,
&bundle_tx,
Expand Down Expand Up @@ -183,7 +184,7 @@ impl BlockEngineStage {
}

async fn connect_auth_and_stream(
block_engine_config: &BlockEngineConfig,
block_engine_config: &Arc<Mutex<BlockEngineConfig>>,
cluster_info: &Arc<ClusterInfo>,
bundle_tx: &Sender<Vec<PacketBundle>>,
packet_tx: &Sender<PacketBatch>,
Expand All @@ -194,18 +195,31 @@ impl BlockEngineStage {
) -> crate::proxy::Result<()> {
// Get a copy of configs here in case they have changed at runtime
let keypair = cluster_info.keypair().clone();
let local_config = block_engine_config.lock().unwrap().clone();

let mut backend_endpoint = Endpoint::from_shared(local_config.block_engine_url.clone())
.map_err(|_| {
ProxyError::BlockEngineConnectionError(format!(
"invalid block engine url value: {}",
local_config.block_engine_url
))
})?
.tcp_keepalive(Some(Duration::from_secs(60)));
jedleggett marked this conversation as resolved.
Show resolved Hide resolved
if local_config.block_engine_url.starts_with("https") {
backend_endpoint = backend_endpoint
.tls_config(tonic::transport::ClientTlsConfig::new())
.map_err(|_| {
ProxyError::BlockEngineConnectionError(
"failed to set tls_config for block engine service".to_string(),
)
})?;
}

debug!(
"connecting to auth: {:?}",
block_engine_config.auth_service_endpoint.uri()
);
let auth_channel = timeout(
*connection_timeout,
block_engine_config.auth_service_endpoint.connect(),
)
.await
.map_err(|_| ProxyError::AuthenticationConnectionTimeout)?
.map_err(|e| ProxyError::AuthenticationConnectionError(e.to_string()))?;
debug!("connecting to auth: {}", local_config.block_engine_url);
let auth_channel = timeout(*connection_timeout, backend_endpoint.connect())
.await
.map_err(|_| ProxyError::AuthenticationConnectionTimeout)?
.map_err(|e| ProxyError::AuthenticationConnectionError(e.to_string()))?;

let mut auth_client = AuthServiceClient::new(auth_channel);

Expand All @@ -219,25 +233,18 @@ impl BlockEngineStage {

datapoint_info!(
"block_engine_stage-tokens_generated",
(
"url",
block_engine_config.auth_service_endpoint.uri().to_string(),
String
),
("url", local_config.block_engine_url, String),
("count", 1, i64),
);

debug!(
"connecting to block engine: {:?}",
block_engine_config.backend_endpoint.uri()
"connecting to block engine: {}",
local_config.block_engine_url
);
let block_engine_channel = timeout(
*connection_timeout,
block_engine_config.backend_endpoint.connect(),
)
.await
.map_err(|_| ProxyError::BlockEngineConnectionTimeout)?
.map_err(|e| ProxyError::BlockEngineConnectionError(e.to_string()))?;
let block_engine_channel = timeout(*connection_timeout, backend_endpoint.connect())
.await
.map_err(|_| ProxyError::BlockEngineConnectionTimeout)?
.map_err(|e| ProxyError::BlockEngineConnectionError(e.to_string()))?;

let access_token = Arc::new(Mutex::new(access_token));
let block_engine_client = BlockEngineValidatorClient::with_interceptor(
Expand All @@ -246,13 +253,14 @@ impl BlockEngineStage {
);

Self::start_consuming_block_engine_bundles_and_packets(
&bundle_tx,
bundle_tx,
block_engine_client,
&packet_tx,
packet_tx,
&local_config,
block_engine_config,
&verified_packet_tx,
&exit,
&block_builder_fee_info,
verified_packet_tx,
exit,
block_builder_fee_info,
auth_client,
access_token,
refresh_token,
Expand All @@ -263,11 +271,13 @@ impl BlockEngineStage {
.await
}

#[allow(clippy::too_many_arguments)]
async fn start_consuming_block_engine_bundles_and_packets(
bundle_tx: &Sender<Vec<PacketBundle>>,
mut client: BlockEngineValidatorClient<InterceptedService<Channel, AuthInterceptor>>,
packet_tx: &Sender<PacketBatch>,
block_engine_config: &BlockEngineConfig,
local_config: &BlockEngineConfig, // local copy of config with current connections
global_config: &Arc<Mutex<BlockEngineConfig>>, // guarded reference for detecting run-time updates
verified_packet_tx: &Sender<(Vec<PacketBatch>, Option<SigverifyTracerPacketStats>)>,
exit: &Arc<AtomicBool>,
block_builder_fee_info: &Arc<Mutex<BlockBuilderFeeInfo>>,
Expand Down Expand Up @@ -317,7 +327,8 @@ impl BlockEngineStage {
(subscribe_bundles_stream, subscribe_packets_stream),
bundle_tx,
packet_tx,
block_engine_config,
local_config,
global_config,
verified_packet_tx,
exit,
block_builder_fee_info,
Expand All @@ -331,6 +342,7 @@ impl BlockEngineStage {
.await
}

#[allow(clippy::too_many_arguments)]
async fn consume_bundle_and_packet_stream(
mut client: BlockEngineValidatorClient<InterceptedService<Channel, AuthInterceptor>>,
(mut bundle_stream, mut packet_stream): (
Expand All @@ -339,7 +351,8 @@ impl BlockEngineStage {
),
bundle_tx: &Sender<Vec<PacketBundle>>,
packet_tx: &Sender<PacketBatch>,
block_engine_config: &BlockEngineConfig,
local_config: &BlockEngineConfig, // local copy of config with current connections
global_config: &Arc<Mutex<BlockEngineConfig>>, // guarded reference for detecting run-time updates
verified_packet_tx: &Sender<(Vec<PacketBatch>, Option<SigverifyTracerPacketStats>)>,
exit: &Arc<AtomicBool>,
block_builder_fee_info: &Arc<Mutex<BlockBuilderFeeInfo>>,
Expand All @@ -357,33 +370,36 @@ impl BlockEngineStage {
let mut num_full_refreshes: u64 = 1;
let mut num_refresh_access_token: u64 = 0;
let mut block_engine_stats = BlockEngineStageStats::default();
let mut metrics_tick = interval(METRICS_TICK);
let mut metrics_and_auth_tick = interval(METRICS_TICK);
let mut maintenance_tick = interval(MAINTENANCE_TICK);
let auth_uri_string = block_engine_config.auth_service_endpoint.uri().to_string();

info!("connected to packet and bundle stream");

while !exit.load(Ordering::Relaxed) {
tokio::select! {
maybe_msg = packet_stream.message() => {
let resp = maybe_msg?.ok_or(ProxyError::GrpcStreamDisconnected)?;
Self::handle_block_engine_packets(resp, packet_tx, verified_packet_tx, block_engine_config.trust_packets, &mut block_engine_stats)?;
Self::handle_block_engine_packets(resp, packet_tx, verified_packet_tx, local_config.trust_packets, &mut block_engine_stats)?;
}
maybe_bundles = bundle_stream.message() => {
Self::handle_block_engine_maybe_bundles(maybe_bundles, bundle_tx, &mut block_engine_stats)?;
}
_ = metrics_tick.tick() => {
_ = metrics_and_auth_tick.tick() => {
block_engine_stats.report();
block_engine_stats = BlockEngineStageStats::default();

if cluster_info.id() != keypair.pubkey() {
return Err(ProxyError::AuthenticationConnectionError("Validator ID Changed".to_string()));
return Err(ProxyError::AuthenticationConnectionError("validator identity changed".to_string()));
}

if *global_config.lock().unwrap() != *local_config {
return Err(ProxyError::AuthenticationConnectionError("block engine config changed".to_string()));
}

let (maybe_new_access, maybe_new_refresh) = maybe_refresh_auth_tokens(&mut auth_client,
&access_token,
&refresh_token,
&cluster_info,
cluster_info,
connection_timeout,
refresh_within_s,
).await?;
Expand All @@ -392,7 +408,7 @@ impl BlockEngineStage {
num_refresh_access_token += 1;
datapoint_info!(
"block_engine_stage-refresh_access_token",
("url", auth_uri_string, String),
("url", &local_config.block_engine_url, String),
("count", num_refresh_access_token, i64),
);
*access_token.lock().unwrap() = new_token;
Expand All @@ -401,7 +417,7 @@ impl BlockEngineStage {
num_full_refreshes += 1;
datapoint_info!(
"block_engine_stage-tokens_generated",
("url", auth_uri_string, String),
("url", &local_config.block_engine_url, String),
("count", num_full_refreshes, i64),
);
refresh_token = new_token;
Expand Down Expand Up @@ -500,4 +516,19 @@ impl BlockEngineStage {

Ok(())
}

pub fn is_valid_block_engine_config(config: &BlockEngineConfig) -> bool {
if config.block_engine_url.is_empty() {
warn!("can't connect to block_engine. missing block_engine_url.");
return false;
}
if let Err(e) = Endpoint::from_str(&config.block_engine_url) {
error!(
"can't connect to block engine. error creating block engine endpoint - {}",
e.to_string()
);
return false;
}
true
}
}
6 changes: 4 additions & 2 deletions core/src/proxy/fetch_stage_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,10 @@ impl FetchStageManager {
Self::set_tpu_addresses(&cluster_info, tpu_addr, tpu_forward_addr);
}
} else {
// see comment on heartbeat_sender clone in new()
unreachable!();
{
warn!("relayer heartbeat receiver disconnected, shutting down");
return;
}
}
}
recv(metrics_tick) -> _ => {
Expand Down
Loading