Skip to content

Commit

Permalink
address PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
KirillLykov committed Dec 13, 2024
1 parent 5d1ffd0 commit 67a4ea3
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 16 deletions.
28 changes: 15 additions & 13 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ use {
path::{Path, PathBuf},
sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
Arc, LazyLock, Mutex, RwLock,
Arc, Mutex, RwLock,
},
thread::{sleep, Builder, JoinHandle},
time::{Duration, Instant},
Expand All @@ -154,13 +154,6 @@ use {
tokio::runtime::{self, Runtime as TokioRuntime},
};

static STS_CLIENT_RUNTIME: LazyLock<TokioRuntime> = LazyLock::new(|| {
runtime::Builder::new_multi_thread()
.enable_all()
.build()
.expect("Failed to create Tokio runtime")
});

const MAX_COMPLETED_DATA_SETS_IN_CHANNEL: usize = 100_000;
const WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT: u64 = 80;
// Right now since we reuse the wait for supermajority code, the
Expand Down Expand Up @@ -1105,13 +1098,20 @@ impl Validator {

let leader_info = ClusterTpuInfo::new(cluster_info.clone(), poh_recorder.clone());
let (json_rpc_service, client_updater) = if config.use_tpu_client_next {
let tpu_client_next_runtime = runtime::Builder::new_multi_thread()
.enable_all()
.thread_name("solSTSQuic")
.build()
.expect(
"Should be possible to create tokio runtime for SendTransactionService.",
);
let my_tpu_address = cluster_info
.my_contact_info()
.tpu(Protocol::QUIC)
.map_err(|err| ValidatorError::Other(format!("{err}")))?;

let client = TpuClientNextClient::new(
STS_CLIENT_RUNTIME.handle().clone(),
tpu_client_next_runtime.handle().clone(),
my_tpu_address,
config.send_transaction_service_config.tpu_peers.clone(),
Some(leader_info),
Expand Down Expand Up @@ -1488,6 +1488,11 @@ impl Validator {
let cluster_slots =
Arc::new(crate::cluster_slots_service::cluster_slots::ClusterSlots::default());

// If RPC is supported and ConnectionCache is used, pass ConnectionCache for being warmup inside Tvu.
let connection_cache_for_warmup = (json_rpc_service.is_some()
&& !config.use_tpu_client_next)
.then_some(&connection_cache);

let tvu = Tvu::new(
vote_account,
authorized_voter_keypairs,
Expand Down Expand Up @@ -1536,10 +1541,7 @@ impl Validator {
config.wait_to_vote_slot,
accounts_background_request_sender.clone(),
config.runtime_config.log_messages_bytes_limit,
// for the cache warmer only used for STS for RPC service when
// tpu-client-next is not used
(json_rpc_service.is_some() && !config.use_tpu_client_next)
.then_some(&connection_cache),
connection_cache_for_warmup,
&prioritization_fee_cache,
banking_tracer.clone(),
turbine_quic_endpoint_sender.clone(),
Expand Down
1 change: 0 additions & 1 deletion rpc/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5016,7 +5016,6 @@ pub mod tests {
}

#[tokio::test(flavor = "multi_thread")]

async fn test_rpc_get_tx_count_tpu_client_next() {
rpc_get_tx_count::<TpuClientNextClient<NullTpuInfo>>(Some(Handle::current()));
}
Expand Down
5 changes: 3 additions & 2 deletions send-transaction-service/src/transaction_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,8 +302,9 @@ where
let Ok(mut lock) = handle.lock() else {
return Err("TpuClientNext task panicked.".into());
};
lock.1.cancel();
lock.0.take() // Take the `join_handle` out of the critical section
let (handle, token) = std::mem::take(&mut *lock);
token.cancel();
handle
};

if let Some(join_handle) = join_handle {
Expand Down

0 comments on commit 67a4ea3

Please sign in to comment.