From 54ca96602f38407bd5df2fffb4fe4806bbae76ec Mon Sep 17 00:00:00 2001 From: greg Date: Thu, 14 Mar 2024 22:23:15 +0000 Subject: [PATCH 1/2] add in method for building a TpuClient for LocalCluster tests --- Cargo.lock | 1 + bench-tps/tests/bench_tps.rs | 28 +++------------- dos/src/main.rs | 34 ++++--------------- local-cluster/Cargo.toml | 1 + local-cluster/src/local_cluster.rs | 54 +++++++++++++++++++++++++++++- 5 files changed, 67 insertions(+), 51 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c35f20738a8c1c..e8d86dec4c6269 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6427,6 +6427,7 @@ dependencies = [ "solana-ledger", "solana-logger", "solana-pubsub-client", + "solana-quic-client", "solana-rpc-client", "solana-rpc-client-api", "solana-runtime", diff --git a/bench-tps/tests/bench_tps.rs b/bench-tps/tests/bench_tps.rs index 7a2b0fe20a5b8d..b073622835b823 100644 --- a/bench-tps/tests/bench_tps.rs +++ b/bench-tps/tests/bench_tps.rs @@ -7,14 +7,11 @@ use { cli::{Config, InstructionPaddingConfig}, send_batch::generate_durable_nonce_accounts, }, - solana_client::{ - connection_cache::ConnectionCache, - tpu_client::{TpuClient, TpuClientConfig}, - }, + solana_client::tpu_client::{TpuClient, TpuClientConfig}, solana_core::validator::ValidatorConfig, solana_faucet::faucet::run_local_faucet, solana_local_cluster::{ - local_cluster::{ClusterConfig, LocalCluster}, + local_cluster::{build_tpu_quic_client, ClusterConfig, LocalCluster}, validator_configs::make_identical_validator_configs, }, solana_rpc::rpc::JsonRpcConfig, @@ -78,24 +75,9 @@ fn test_bench_tps_local_cluster(config: Config) { cluster.transfer(&cluster.funding_keypair, &faucet_pubkey, 100_000_000); - let ConnectionCache::Quic(cache) = &*cluster.connection_cache else { - panic!("Expected a Quic ConnectionCache."); - }; - - let rpc_pubsub_url = format!("ws://{}/", cluster.entry_point_info.rpc_pubsub().unwrap()); - let rpc_url = format!("http://{}", cluster.entry_point_info.rpc().unwrap()); - - let client = Arc::new( - TpuClient::new_with_connection_cache( - Arc::new(RpcClient::new(rpc_url)), - rpc_pubsub_url.as_str(), - TpuClientConfig::default(), - cache.clone(), - ) - .unwrap_or_else(|err| { - panic!("Could not create TpuClient {err:?}"); - }), - ); + let client = Arc::new(build_tpu_quic_client(&cluster).unwrap_or_else(|err| { + panic!("Could not create TpuClient with Quic Cache {err:?}"); + })); let lamports_per_account = 100; diff --git a/dos/src/main.rs b/dos/src/main.rs index 3bf7cce0e782cc..a8c8b483f41420 100644 --- a/dos/src/main.rs +++ b/dos/src/main.rs @@ -824,13 +824,12 @@ pub mod test { solana_gossip::contact_info::LegacyContactInfo, solana_local_cluster::{ cluster::Cluster, - local_cluster::{ClusterConfig, LocalCluster}, + local_cluster::{build_tpu_quic_client, ClusterConfig, LocalCluster}, validator_configs::make_identical_validator_configs, }, solana_quic_client::{QuicConfig, QuicConnectionManager, QuicPool}, solana_rpc::rpc::JsonRpcConfig, solana_sdk::timing::timestamp, - solana_tpu_client::tpu_client::TpuClientConfig, }; const TEST_SEND_BATCH_SIZE: usize = 1; @@ -843,29 +842,6 @@ pub mod test { ); } - fn build_tpu_quic_client( - cluster: &LocalCluster, - ) -> Arc> { - let rpc_pubsub_url = format!("ws://{}/", cluster.entry_point_info.rpc_pubsub().unwrap()); - let rpc_url = format!("http://{}", cluster.entry_point_info.rpc().unwrap()); - - let ConnectionCache::Quic(cache) = &*cluster.connection_cache else { - panic!("Expected a Quic ConnectionCache."); - }; - - Arc::new( - TpuClient::new_with_connection_cache( - Arc::new(RpcClient::new(rpc_url)), - rpc_pubsub_url.as_str(), - TpuClientConfig::default(), - cache.clone(), - ) - .unwrap_or_else(|err| { - panic!("Could not create TpuClient with Quic Cache {err:?}"); - }), - ) - } - #[test] fn test_dos() { let nodes = [ContactInfo::new_localhost( @@ -1003,7 +979,9 @@ pub mod test { .unwrap(); let nodes_slice = [node]; - let client = build_tpu_quic_client(&cluster); + let client = Arc::new(build_tpu_quic_client(&cluster).unwrap_or_else(|err| { + panic!("Could not create TpuClient with Quic Cache {err:?}"); + })); // creates one transaction with 8 valid signatures and sends it 10 times run_dos( @@ -1135,7 +1113,9 @@ pub mod test { .unwrap(); let nodes_slice = [node]; - let client = build_tpu_quic_client(&cluster); + let client = Arc::new(build_tpu_quic_client(&cluster).unwrap_or_else(|err| { + panic!("Could not create TpuClient with Quic Cache {err:?}"); + })); // creates one transaction and sends it 10 times // this is done in single thread diff --git a/local-cluster/Cargo.toml b/local-cluster/Cargo.toml index 4248fc02945238..07b30030295e52 100644 --- a/local-cluster/Cargo.toml +++ b/local-cluster/Cargo.toml @@ -24,6 +24,7 @@ solana-gossip = { workspace = true } solana-ledger = { workspace = true } solana-logger = { workspace = true } solana-pubsub-client = { workspace = true } +solana-quic-client = { workspace = true } solana-rpc-client = { workspace = true } solana-rpc-client-api = { workspace = true } solana-runtime = { workspace = true } diff --git a/local-cluster/src/local_cluster.rs b/local-cluster/src/local_cluster.rs index 9d1b483d85fdd3..9fc4b4452564ed 100644 --- a/local-cluster/src/local_cluster.rs +++ b/local-cluster/src/local_cluster.rs @@ -7,7 +7,12 @@ use { itertools::izip, log::*, solana_accounts_db::utils::create_accounts_run_and_snapshot_dirs, - solana_client::{connection_cache::ConnectionCache, thin_client::ThinClient}, + solana_client::{ + connection_cache::ConnectionCache, + rpc_client::RpcClient, + thin_client::ThinClient, + tpu_client::{TpuClient, TpuClientConfig}, + }, solana_core::{ consensus::tower_storage::FileTowerStorage, validator::{Validator, ValidatorConfig, ValidatorStartProgress}, @@ -18,6 +23,7 @@ use { gossip_service::discover_cluster, }, solana_ledger::{create_new_tmp_ledger, shred::Shred}, + solana_quic_client::{QuicConfig, QuicConnectionManager, QuicPool}, solana_runtime::{ genesis_utils::{ create_genesis_config_with_vote_accounts_and_cluster_type, GenesisConfigInfo, @@ -63,6 +69,52 @@ use { }, }; +pub fn build_tpu_quic_client( + cluster: &LocalCluster, +) -> Result> { + build_tpu_client(cluster, |rpc_url| Arc::new(RpcClient::new(rpc_url))) +} + +pub fn build_tpu_quic_client_with_commitment( + cluster: &LocalCluster, + commitment_config: CommitmentConfig, +) -> Result> { + build_tpu_client(cluster, |rpc_url| { + Arc::new(RpcClient::new_with_commitment(rpc_url, commitment_config)) + }) +} + +fn build_tpu_client( + cluster: &LocalCluster, + rpc_client_builder: F, +) -> Result> +where + F: FnOnce(String) -> Arc, +{ + let rpc_pubsub_url = format!("ws://{}/", cluster.entry_point_info.rpc_pubsub().unwrap()); + let rpc_url = format!("http://{}", cluster.entry_point_info.rpc().unwrap()); + + let cache = match &*cluster.connection_cache { + ConnectionCache::Quic(cache) => cache, + ConnectionCache::Udp(_) => { + return Err(Error::new( + ErrorKind::Other, + "Expected a Quic ConnectionCache. Got UDP", + )) + } + }; + + let tpu_client = TpuClient::new_with_connection_cache( + rpc_client_builder(rpc_url), + rpc_pubsub_url.as_str(), + TpuClientConfig::default(), + cache.clone(), + ) + .map_err(|err| Error::new(ErrorKind::Other, format!("TpuSenderError: {}", err)))?; + + Ok(tpu_client) +} + const DUMMY_SNAPSHOT_CONFIG_PATH_MARKER: &str = "dummy"; pub struct ClusterConfig { From af875035d18c03964ba1f29c711351cb7853be32 Mon Sep 17 00:00:00 2001 From: greg Date: Fri, 15 Mar 2024 23:28:59 +0000 Subject: [PATCH 2/2] add cluster trait. leave dependency on solana_client::tpu_client --- bench-tps/tests/bench_tps.rs | 5 +- client/src/tpu_client.rs | 2 + dos/src/main.rs | 13 ++--- local-cluster/src/cluster.rs | 11 +++- local-cluster/src/local_cluster.rs | 90 ++++++++++++++---------------- 5 files changed, 60 insertions(+), 61 deletions(-) diff --git a/bench-tps/tests/bench_tps.rs b/bench-tps/tests/bench_tps.rs index b073622835b823..bfff1f7e1250c4 100644 --- a/bench-tps/tests/bench_tps.rs +++ b/bench-tps/tests/bench_tps.rs @@ -11,7 +11,8 @@ use { solana_core::validator::ValidatorConfig, solana_faucet::faucet::run_local_faucet, solana_local_cluster::{ - local_cluster::{build_tpu_quic_client, ClusterConfig, LocalCluster}, + cluster::Cluster, + local_cluster::{ClusterConfig, LocalCluster}, validator_configs::make_identical_validator_configs, }, solana_rpc::rpc::JsonRpcConfig, @@ -75,7 +76,7 @@ fn test_bench_tps_local_cluster(config: Config) { cluster.transfer(&cluster.funding_keypair, &faucet_pubkey, 100_000_000); - let client = Arc::new(build_tpu_quic_client(&cluster).unwrap_or_else(|err| { + let client = Arc::new(cluster.build_tpu_quic_client().unwrap_or_else(|err| { panic!("Could not create TpuClient with Quic Cache {err:?}"); })); diff --git a/client/src/tpu_client.rs b/client/src/tpu_client.rs index 038dd86774ea98..555d3aad88bcb1 100644 --- a/client/src/tpu_client.rs +++ b/client/src/tpu_client.rs @@ -21,6 +21,8 @@ pub use { solana_tpu_client::tpu_client::{TpuClientConfig, DEFAULT_FANOUT_SLOTS, MAX_FANOUT_SLOTS}, }; +pub type QuicTpuClient = TpuClient; + pub enum TpuClientWrapper { Quic(TpuClient), Udp(TpuClient), diff --git a/dos/src/main.rs b/dos/src/main.rs index a8c8b483f41420..15874a86973f9c 100644 --- a/dos/src/main.rs +++ b/dos/src/main.rs @@ -818,16 +818,15 @@ fn main() { pub mod test { use { super::*, - solana_client::tpu_client::TpuClient, + solana_client::tpu_client::QuicTpuClient, solana_core::validator::ValidatorConfig, solana_faucet::faucet::run_local_faucet, solana_gossip::contact_info::LegacyContactInfo, solana_local_cluster::{ cluster::Cluster, - local_cluster::{build_tpu_quic_client, ClusterConfig, LocalCluster}, + local_cluster::{ClusterConfig, LocalCluster}, validator_configs::make_identical_validator_configs, }, - solana_quic_client::{QuicConfig, QuicConnectionManager, QuicPool}, solana_rpc::rpc::JsonRpcConfig, solana_sdk::timing::timestamp, }; @@ -837,9 +836,7 @@ pub mod test { // thin wrapper for the run_dos function // to avoid specifying everywhere generic parameters fn run_dos_no_client(nodes: &[ContactInfo], iterations: usize, params: DosClientParameters) { - run_dos::>( - nodes, iterations, None, params, - ); + run_dos::(nodes, iterations, None, params); } #[test] @@ -979,7 +976,7 @@ pub mod test { .unwrap(); let nodes_slice = [node]; - let client = Arc::new(build_tpu_quic_client(&cluster).unwrap_or_else(|err| { + let client = Arc::new(cluster.build_tpu_quic_client().unwrap_or_else(|err| { panic!("Could not create TpuClient with Quic Cache {err:?}"); })); @@ -1113,7 +1110,7 @@ pub mod test { .unwrap(); let nodes_slice = [node]; - let client = Arc::new(build_tpu_quic_client(&cluster).unwrap_or_else(|err| { + let client = Arc::new(cluster.build_tpu_quic_client().unwrap_or_else(|err| { panic!("Could not create TpuClient with Quic Cache {err:?}"); })); diff --git a/local-cluster/src/cluster.rs b/local-cluster/src/cluster.rs index 03ec1b7abe13f2..425f65c48e14c5 100644 --- a/local-cluster/src/cluster.rs +++ b/local-cluster/src/cluster.rs @@ -1,11 +1,11 @@ use { - solana_client::thin_client::ThinClient, + solana_client::{thin_client::ThinClient, tpu_client::QuicTpuClient}, solana_core::validator::{Validator, ValidatorConfig}, solana_gossip::{cluster_info::Node, contact_info::ContactInfo}, solana_ledger::shred::Shred, - solana_sdk::{pubkey::Pubkey, signature::Keypair}, + solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey, signature::Keypair}, solana_streamer::socket::SocketAddrSpace, - std::{path::PathBuf, sync::Arc}, + std::{io::Result, path::PathBuf, sync::Arc}, }; pub struct ValidatorInfo { @@ -38,6 +38,11 @@ impl ClusterValidatorInfo { pub trait Cluster { fn get_node_pubkeys(&self) -> Vec; fn get_validator_client(&self, pubkey: &Pubkey) -> Option; + fn build_tpu_quic_client(&self) -> Result; + fn build_tpu_quic_client_with_commitment( + &self, + commitment_config: CommitmentConfig, + ) -> Result; fn get_contact_info(&self, pubkey: &Pubkey) -> Option<&ContactInfo>; fn exit_node(&mut self, pubkey: &Pubkey) -> ClusterValidatorInfo; fn restart_node( diff --git a/local-cluster/src/local_cluster.rs b/local-cluster/src/local_cluster.rs index 9fc4b4452564ed..400f4f73f78c26 100644 --- a/local-cluster/src/local_cluster.rs +++ b/local-cluster/src/local_cluster.rs @@ -11,7 +11,7 @@ use { connection_cache::ConnectionCache, rpc_client::RpcClient, thin_client::ThinClient, - tpu_client::{TpuClient, TpuClientConfig}, + tpu_client::{QuicTpuClient, TpuClient, TpuClientConfig}, }, solana_core::{ consensus::tower_storage::FileTowerStorage, @@ -23,7 +23,6 @@ use { gossip_service::discover_cluster, }, solana_ledger::{create_new_tmp_ledger, shred::Shred}, - solana_quic_client::{QuicConfig, QuicConnectionManager, QuicPool}, solana_runtime::{ genesis_utils::{ create_genesis_config_with_vote_accounts_and_cluster_type, GenesisConfigInfo, @@ -69,52 +68,6 @@ use { }, }; -pub fn build_tpu_quic_client( - cluster: &LocalCluster, -) -> Result> { - build_tpu_client(cluster, |rpc_url| Arc::new(RpcClient::new(rpc_url))) -} - -pub fn build_tpu_quic_client_with_commitment( - cluster: &LocalCluster, - commitment_config: CommitmentConfig, -) -> Result> { - build_tpu_client(cluster, |rpc_url| { - Arc::new(RpcClient::new_with_commitment(rpc_url, commitment_config)) - }) -} - -fn build_tpu_client( - cluster: &LocalCluster, - rpc_client_builder: F, -) -> Result> -where - F: FnOnce(String) -> Arc, -{ - let rpc_pubsub_url = format!("ws://{}/", cluster.entry_point_info.rpc_pubsub().unwrap()); - let rpc_url = format!("http://{}", cluster.entry_point_info.rpc().unwrap()); - - let cache = match &*cluster.connection_cache { - ConnectionCache::Quic(cache) => cache, - ConnectionCache::Udp(_) => { - return Err(Error::new( - ErrorKind::Other, - "Expected a Quic ConnectionCache. Got UDP", - )) - } - }; - - let tpu_client = TpuClient::new_with_connection_cache( - rpc_client_builder(rpc_url), - rpc_pubsub_url.as_str(), - TpuClientConfig::default(), - cache.clone(), - ) - .map_err(|err| Error::new(ErrorKind::Other, format!("TpuSenderError: {}", err)))?; - - Ok(tpu_client) -} - const DUMMY_SNAPSHOT_CONFIG_PATH_MARKER: &str = "dummy"; pub struct ClusterConfig { @@ -854,6 +807,34 @@ impl LocalCluster { ..SnapshotConfig::new_load_only() } } + + fn build_tpu_client(&self, rpc_client_builder: F) -> Result + where + F: FnOnce(String) -> Arc, + { + let rpc_pubsub_url = format!("ws://{}/", self.entry_point_info.rpc_pubsub().unwrap()); + let rpc_url = format!("http://{}", self.entry_point_info.rpc().unwrap()); + + let cache = match &*self.connection_cache { + ConnectionCache::Quic(cache) => cache, + ConnectionCache::Udp(_) => { + return Err(Error::new( + ErrorKind::Other, + "Expected a Quic ConnectionCache. Got UDP", + )) + } + }; + + let tpu_client = TpuClient::new_with_connection_cache( + rpc_client_builder(rpc_url), + rpc_pubsub_url.as_str(), + TpuClientConfig::default(), + cache.clone(), + ) + .map_err(|err| Error::new(ErrorKind::Other, format!("TpuSenderError: {}", err)))?; + + Ok(tpu_client) + } } impl Cluster for LocalCluster { @@ -872,6 +853,19 @@ impl Cluster for LocalCluster { }) } + fn build_tpu_quic_client(&self) -> Result { + self.build_tpu_client(|rpc_url| Arc::new(RpcClient::new(rpc_url))) + } + + fn build_tpu_quic_client_with_commitment( + &self, + commitment_config: CommitmentConfig, + ) -> Result { + self.build_tpu_client(|rpc_url| { + Arc::new(RpcClient::new_with_commitment(rpc_url, commitment_config)) + }) + } + fn exit_node(&mut self, pubkey: &Pubkey) -> ClusterValidatorInfo { let mut node = self.validators.remove(pubkey).unwrap();