From 97f1c5f49e3b4d42bf128480dc59513ba106f247 Mon Sep 17 00:00:00 2001 From: Kirill Lykov Date: Fri, 3 Jan 2025 10:16:45 +0100 Subject: [PATCH] Revert "use tpu-client-next in send_transaction_service (#3515)" This reverts commit 5c0f173b88e757ca223b853d32e5fb995ade5479. --- Cargo.lock | 6 +- banks-server/src/banks_server.rs | 12 +- programs/sbf/Cargo.lock | 31 +-- rpc/src/cluster_tpu_info.rs | 77 ++++-- rpc/src/rpc.rs | 39 ++-- rpc/src/rpc_service.rs | 18 +- send-transaction-service/Cargo.toml | 9 +- send-transaction-service/src/lib.rs | 3 - .../src/send_transaction_service.rs | 205 ++++++++-------- send-transaction-service/src/test_utils.rs | 95 -------- send-transaction-service/src/tpu_info.rs | 35 ++- .../src/transaction_client.rs | 220 ++---------------- svm/examples/Cargo.lock | 31 +-- tls-utils/src/quic_client_certificate.rs | 11 +- .../src/connection_workers_scheduler.rs | 25 +- tpu-client-next/src/quic_networking.rs | 2 +- .../connection_workers_scheduler_test.rs | 29 ++- 17 files changed, 245 insertions(+), 603 deletions(-) delete mode 100644 send-transaction-service/src/test_utils.rs diff --git a/Cargo.lock b/Cargo.lock index 9c286cf2fbd1ff..0eedc1f3f69abf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9011,7 +9011,6 @@ dependencies = [ name = "solana-send-transaction-service" version = "2.2.0" dependencies = [ - "async-trait", "crossbeam-channel", "itertools 0.12.1", "log", @@ -9020,12 +9019,9 @@ dependencies = [ "solana-logger", "solana-measure", "solana-metrics", - "solana-quic-client", "solana-runtime", "solana-sdk", - "solana-tpu-client-next", - "tokio", - "tokio-util 0.7.13", + "solana-tpu-client", ] [[package]] diff --git a/banks-server/src/banks_server.rs b/banks-server/src/banks_server.rs index 46bdfa6c5f98f0..2fb0a8b7559659 100644 --- a/banks-server/src/banks_server.rs +++ b/banks-server/src/banks_server.rs @@ -29,7 +29,6 @@ use { solana_send_transaction_service::{ send_transaction_service::{SendTransactionService, TransactionInfo}, tpu_info::NullTpuInfo, - transaction_client::ConnectionCacheClient, }, std::{ io, @@ -455,16 +454,17 @@ pub async fn start_tcp_server( .map(move |chan| { let (sender, receiver) = unbounded(); - let client = ConnectionCacheClient::::new( - connection_cache.clone(), + SendTransactionService::new::( tpu_addr, + &bank_forks, None, - None, + receiver, + connection_cache.clone(), + 5_000, 0, + exit.clone(), ); - SendTransactionService::new(&bank_forks, receiver, client, 5_000, exit.clone()); - let server = BanksServer::new( bank_forks.clone(), block_commitment_cache.clone(), diff --git a/programs/sbf/Cargo.lock b/programs/sbf/Cargo.lock index f5c3afa24b5960..a2586d2f862092 100644 --- a/programs/sbf/Cargo.lock +++ b/programs/sbf/Cargo.lock @@ -7598,7 +7598,6 @@ dependencies = [ name = "solana-send-transaction-service" version = "2.2.0" dependencies = [ - "async-trait", "crossbeam-channel", "itertools 0.12.1", "log", @@ -7606,12 +7605,9 @@ dependencies = [ "solana-connection-cache", "solana-measure", "solana-metrics", - "solana-quic-client", "solana-runtime", "solana-sdk", - "solana-tpu-client-next", - "tokio", - "tokio-util 0.7.1", + "solana-tpu-client", ] [[package]] @@ -8094,31 +8090,6 @@ dependencies = [ "tokio", ] -[[package]] -name = "solana-tpu-client-next" -version = "2.2.0" -dependencies = [ - "async-trait", - "log", - "lru", - "quinn", - "rustls 0.23.20", - "solana-clock", - "solana-connection-cache", - "solana-keypair", - "solana-logger", - "solana-measure", - "solana-quic-definitions", - "solana-rpc-client", - "solana-streamer", - "solana-time-utils", - "solana-tls-utils", - "solana-tpu-client", - "thiserror 2.0.9", - "tokio", - "tokio-util 0.7.1", -] - [[package]] name = "solana-transaction" version = "2.2.0" diff --git a/rpc/src/cluster_tpu_info.rs b/rpc/src/cluster_tpu_info.rs index c57e76ff566649..777dfd5b8ff868 100644 --- a/rpc/src/cluster_tpu_info.rs +++ b/rpc/src/cluster_tpu_info.rs @@ -1,7 +1,10 @@ use { solana_gossip::{cluster_info::ClusterInfo, contact_info::Protocol}, solana_poh::poh_recorder::PohRecorder, - solana_sdk::{clock::NUM_CONSECUTIVE_LEADER_SLOTS, pubkey::Pubkey}, + solana_sdk::{ + clock::{Slot, NUM_CONSECUTIVE_LEADER_SLOTS}, + pubkey::Pubkey, + }, solana_send_transaction_service::tpu_info::TpuInfo, std::{ collections::HashMap, @@ -47,7 +50,7 @@ impl TpuInfo for ClusterTpuInfo { .collect(); } - fn get_unique_leader_tpus(&self, max_count: u64, protocol: Protocol) -> Vec<&SocketAddr> { + fn get_leader_tpus(&self, max_count: u64, protocol: Protocol) -> Vec<&SocketAddr> { let recorder = self.poh_recorder.read().unwrap(); let leaders: Vec<_> = (0..max_count) .filter_map(|i| recorder.leader_after_n_slots(i * NUM_CONSECUTIVE_LEADER_SLOTS)) @@ -67,23 +70,37 @@ impl TpuInfo for ClusterTpuInfo { unique_leaders } - fn get_leader_tpus(&self, max_count: u64, protocol: Protocol) -> Vec<&SocketAddr> { + fn get_leader_tpus_with_slots( + &self, + max_count: u64, + protocol: Protocol, + ) -> Vec<(&SocketAddr, Slot)> { let recorder = self.poh_recorder.read().unwrap(); - let leader_pubkeys: Vec<_> = (0..max_count) - .filter_map(|i| recorder.leader_after_n_slots(i * NUM_CONSECUTIVE_LEADER_SLOTS)) + let leaders: Vec<_> = (0..max_count) + .rev() + .filter_map(|future_slot| { + NUM_CONSECUTIVE_LEADER_SLOTS + .checked_mul(future_slot) + .and_then(|slots_in_the_future| { + recorder.leader_and_slot_after_n_slots(slots_in_the_future) + }) + }) .collect(); drop(recorder); - leader_pubkeys - .iter() - .filter_map(|leader_pubkey| { + let addrs_to_slots = leaders + .into_iter() + .filter_map(|(leader_id, leader_slot)| { self.recent_peers - .get(leader_pubkey) - .map(|addr| match protocol { - Protocol::UDP => &addr.0, - Protocol::QUIC => &addr.1, + .get(&leader_id) + .map(|(udp_tpu, quic_tpu)| match protocol { + Protocol::UDP => (udp_tpu, leader_slot), + Protocol::QUIC => (quic_tpu, leader_slot), }) }) - .collect() + .collect::>(); + let mut unique_leaders = Vec::from_iter(addrs_to_slots); + unique_leaders.sort_by_key(|(_addr, slot)| *slot); + unique_leaders } } @@ -258,12 +275,12 @@ mod test { let first_leader = solana_ledger::leader_schedule_utils::slot_leader_at(slot, &bank).unwrap(); assert_eq!( - leader_info.get_unique_leader_tpus(1, Protocol::UDP), + leader_info.get_leader_tpus(1, Protocol::UDP), vec![&recent_peers.get(&first_leader).unwrap().0] ); assert_eq!( - leader_info.get_leader_tpus(1, Protocol::UDP), - vec![&recent_peers.get(&first_leader).unwrap().0] + leader_info.get_leader_tpus_with_slots(1, Protocol::UDP), + vec![(&recent_peers.get(&first_leader).unwrap().0, 0)] ); let second_leader = solana_ledger::leader_schedule_utils::slot_leader_at( @@ -277,12 +294,15 @@ mod test { ]; expected_leader_sockets.dedup(); assert_eq!( - leader_info.get_unique_leader_tpus(2, Protocol::UDP), + leader_info.get_leader_tpus(2, Protocol::UDP), expected_leader_sockets ); assert_eq!( - leader_info.get_leader_tpus(2, Protocol::UDP), + leader_info.get_leader_tpus_with_slots(2, Protocol::UDP), expected_leader_sockets + .into_iter() + .zip([0, 4]) + .collect::>() ); let third_leader = solana_ledger::leader_schedule_utils::slot_leader_at( @@ -297,17 +317,26 @@ mod test { ]; expected_leader_sockets.dedup(); assert_eq!( - leader_info.get_unique_leader_tpus(3, Protocol::UDP), + leader_info.get_leader_tpus(3, Protocol::UDP), expected_leader_sockets ); + // Only 2 leader tpus are returned always... so [0, 4, 8] isn't right here. + // This assumption is safe. After all, leader schedule generation must be deterministic. + assert_eq!( + leader_info.get_leader_tpus_with_slots(3, Protocol::UDP), + expected_leader_sockets + .into_iter() + .zip([0, 4]) + .collect::>() + ); for x in 4..8 { + assert!(leader_info.get_leader_tpus(x, Protocol::UDP).len() <= recent_peers.len()); assert!( - leader_info.get_unique_leader_tpus(x, Protocol::UDP).len() <= recent_peers.len() - ); - assert_eq!( - leader_info.get_leader_tpus(x, Protocol::UDP).len(), - x as usize + leader_info + .get_leader_tpus_with_slots(x, Protocol::UDP) + .len() + <= recent_peers.len() ); } } diff --git a/rpc/src/rpc.rs b/rpc/src/rpc.rs index baffa72b8e0788..df20fa1a745344 100644 --- a/rpc/src/rpc.rs +++ b/rpc/src/rpc.rs @@ -127,7 +127,6 @@ use { solana_runtime::commitment::CommitmentSlots, solana_send_transaction_service::{ send_transaction_service::SendTransactionService, tpu_info::NullTpuInfo, - transaction_client::ConnectionCacheClient, }, solana_streamer::socket::SocketAddrSpace, }; @@ -459,19 +458,14 @@ impl JsonRpcRequestProcessor { .tpu(connection_cache.protocol()) .unwrap(); let (transaction_sender, transaction_receiver) = unbounded(); - - let client = ConnectionCacheClient::::new( - connection_cache.clone(), + SendTransactionService::new::( tpu_address, - None, - None, - 1, - ); - SendTransactionService::new( &bank_forks, + None, transaction_receiver, - client, + connection_cache, 1000, + 1, exit.clone(), ); @@ -4578,9 +4572,7 @@ pub mod tests { }, vote::state::VoteState, }, - solana_send_transaction_service::{ - tpu_info::NullTpuInfo, transaction_client::ConnectionCacheClient, - }, + solana_send_transaction_service::tpu_info::NullTpuInfo, solana_transaction_status::{ EncodedConfirmedBlock, EncodedTransaction, EncodedTransactionWithStatusMeta, TransactionDetails, @@ -6701,14 +6693,16 @@ pub mod tests { Arc::new(PrioritizationFeeCache::default()), service_runtime(rpc_threads, rpc_blocking_threads, rpc_niceness_adj), ); - let client = ConnectionCacheClient::::new( - connection_cache.clone(), + SendTransactionService::new::( tpu_address, + &bank_forks, None, - None, + receiver, + connection_cache, + 1000, 1, + exit, ); - SendTransactionService::new(&bank_forks, receiver, client, 1000, exit.clone()); let mut bad_transaction = system_transaction::transfer( &mint_keypair, @@ -6981,15 +6975,16 @@ pub mod tests { Arc::new(PrioritizationFeeCache::default()), service_runtime(rpc_threads, rpc_blocking_threads, rpc_niceness_adj), ); - let client = ConnectionCacheClient::::new( - connection_cache.clone(), + SendTransactionService::new::( tpu_address, + &bank_forks, None, - None, + receiver, + connection_cache, + 1000, 1, + exit, ); - SendTransactionService::new(&bank_forks, receiver, client, 1000, exit.clone()); - assert_eq!( request_processor.get_block_commitment(0), RpcBlockCommitment { diff --git a/rpc/src/rpc_service.rs b/rpc/src/rpc_service.rs index 78dab869ecaee6..8c7a64d1b08592 100644 --- a/rpc/src/rpc_service.rs +++ b/rpc/src/rpc_service.rs @@ -36,10 +36,7 @@ use { exit::Exit, genesis_config::DEFAULT_GENESIS_DOWNLOAD_PATH, hash::Hash, native_token::lamports_to_sol, }, - solana_send_transaction_service::{ - send_transaction_service::{self, SendTransactionService}, - transaction_client::ConnectionCacheClient, - }, + solana_send_transaction_service::send_transaction_service::{self, SendTransactionService}, solana_storage_bigtable::CredentialType, std::{ net::SocketAddr, @@ -465,20 +462,15 @@ impl JsonRpcService { let leader_info = poh_recorder.map(|recorder| ClusterTpuInfo::new(cluster_info.clone(), recorder)); - let client = ConnectionCacheClient::new( - connection_cache, + let _send_transaction_service = Arc::new(SendTransactionService::new_with_config( tpu_address, - send_transaction_service_config.tpu_peers.clone(), - leader_info, - send_transaction_service_config.leader_forward_count, - ); - let _send_transaction_service = SendTransactionService::new_with_config( &bank_forks, + leader_info, receiver, - client, + connection_cache, send_transaction_service_config, exit, - ); + )); #[cfg(test)] let test_request_processor = request_processor.clone(); diff --git a/send-transaction-service/Cargo.toml b/send-transaction-service/Cargo.toml index 92a73c33bc5d1a..07ad3f5a5b886c 100644 --- a/send-transaction-service/Cargo.toml +++ b/send-transaction-service/Cargo.toml @@ -10,7 +10,6 @@ license = { workspace = true } edition = { workspace = true } [dependencies] -async-trait = { workspace = true } crossbeam-channel = { workspace = true } itertools = { workspace = true } log = { workspace = true } @@ -18,19 +17,13 @@ solana-client = { workspace = true } solana-connection-cache = { workspace = true } solana-measure = { workspace = true } solana-metrics = { workspace = true } -solana-quic-client = { workspace = true } solana-runtime = { workspace = true } solana-sdk = { workspace = true } -solana-tpu-client-next = { workspace = true } -tokio = { workspace = true, features = ["full"] } -tokio-util = { workspace = true } +solana-tpu-client = { workspace = true } [dev-dependencies] solana-logger = { workspace = true } solana-runtime = { workspace = true, features = ["dev-context-only-utils"] } -[features] -dev-context-only-utils = [] - [package.metadata.docs.rs] targets = ["x86_64-unknown-linux-gnu"] diff --git a/send-transaction-service/src/lib.rs b/send-transaction-service/src/lib.rs index 5d3bfc9176705c..960ff1cb3c90e7 100644 --- a/send-transaction-service/src/lib.rs +++ b/send-transaction-service/src/lib.rs @@ -4,8 +4,5 @@ pub mod send_transaction_service_stats; pub mod tpu_info; pub mod transaction_client; -#[cfg(any(test, feature = "dev-context-only-utils"))] -pub mod test_utils; - #[macro_use] extern crate solana_metrics; diff --git a/send-transaction-service/src/send_transaction_service.rs b/send-transaction-service/src/send_transaction_service.rs index 62a604cc5e1392..0755cfb968b7ef 100644 --- a/send-transaction-service/src/send_transaction_service.rs +++ b/send-transaction-service/src/send_transaction_service.rs @@ -3,11 +3,13 @@ use { send_transaction_service_stats::{ SendTransactionServiceStats, SendTransactionServiceStatsReport, }, - transaction_client::TransactionClient, + tpu_info::TpuInfo, + transaction_client::{ConnectionCacheClient, TransactionClient}, }, crossbeam_channel::{Receiver, RecvTimeoutError}, itertools::Itertools, log::*, + solana_client::connection_cache::ConnectionCache, solana_runtime::{bank::Bank, bank_forks::BankForks}, solana_sdk::{ hash::Hash, nonce_account, pubkey::Pubkey, saturating_add_assign, signature::Signature, @@ -138,24 +140,38 @@ impl Default for Config { pub const MAX_RETRY_SLEEP_MS: u64 = 1000; impl SendTransactionService { - pub fn new( + pub fn new( + tpu_address: SocketAddr, bank_forks: &Arc>, + leader_info: Option, receiver: Receiver, - client: Client, + connection_cache: Arc, retry_rate_ms: u64, + leader_forward_count: u64, exit: Arc, ) -> Self { let config = Config { retry_rate_ms, + leader_forward_count, ..Config::default() }; - Self::new_with_config::(bank_forks, receiver, client, config, exit) + Self::new_with_config( + tpu_address, + bank_forks, + leader_info, + receiver, + connection_cache, + config, + exit, + ) } - pub fn new_with_config( + pub fn new_with_config( + tpu_address: SocketAddr, bank_forks: &Arc>, + leader_info: Option, receiver: Receiver, - client: Client, + connection_cache: Arc, config: Config, exit: Arc, ) -> Self { @@ -163,6 +179,14 @@ impl SendTransactionService { let retry_transactions = Arc::new(Mutex::new(HashMap::new())); + let client = ConnectionCacheClient::new( + connection_cache, + tpu_address, + config.tpu_peers, + leader_info, + config.leader_forward_count, + ); + let receive_txn_thread = Self::receive_txn_thread( receiver, client.clone(), @@ -193,9 +217,9 @@ impl SendTransactionService { } /// Thread responsible for receiving transactions from RPC clients. - fn receive_txn_thread( + fn receive_txn_thread( receiver: Receiver, - client: Client, + client: ConnectionCacheClient, retry_transactions: Arc>>, stats_report: Arc, batch_send_rate_ms: u64, @@ -293,9 +317,9 @@ impl SendTransactionService { } /// Thread responsible for retrying transactions - fn retry_thread( + fn retry_thread( bank_forks: Arc>, - client: Client, + client: ConnectionCacheClient, retry_transactions: Arc>>, retry_rate_ms: u64, service_max_retries: usize, @@ -344,11 +368,11 @@ impl SendTransactionService { } /// Retry transactions sent before. - fn process_transactions( + fn process_transactions( working_bank: &Bank, root_bank: &Bank, transactions: &mut HashMap, - client: &Client, + client: &ConnectionCacheClient, retry_rate_ms: u64, service_max_retries: usize, default_max_retries: Option, @@ -474,11 +498,7 @@ impl SendTransactionService { mod test { use { super::*, - crate::{ - test_utils::ClientWithCreator, - tpu_info::NullTpuInfo, - transaction_client::{ConnectionCacheClient, TpuClientNextClient}, - }, + crate::tpu_info::NullTpuInfo, crossbeam_channel::{bounded, unbounded}, solana_sdk::{ account::AccountSharedData, @@ -489,41 +509,34 @@ mod test { system_program, system_transaction, }, std::ops::Sub, - tokio::runtime::Handle, }; - fn service_exit(maybe_runtime: Option) { + #[test] + fn service_exit() { + let tpu_address = "127.0.0.1:0".parse().unwrap(); let bank = Bank::default_for_tests(); let bank_forks = BankForks::new_rw_arc(bank); let (sender, receiver) = unbounded(); - let client = C::create_client(maybe_runtime, "127.0.0.1:0".parse().unwrap(), None, 1); - - let send_transaction_service = SendTransactionService::new( + let connection_cache = Arc::new(ConnectionCache::new("connection_cache_test")); + let send_transaction_service = SendTransactionService::new::( + tpu_address, &bank_forks, + None, receiver, - client.clone(), + connection_cache, 1000, + 1, Arc::new(AtomicBool::new(false)), ); drop(sender); send_transaction_service.join().unwrap(); - client.cancel(); } #[test] - fn service_exit_with_connection_cache() { - service_exit::>(None); - } - - #[tokio::test(flavor = "multi_thread")] - - async fn service_exit_with_tpu_client_next() { - service_exit::>(Some(Handle::current())); - } - - fn validator_exit(maybe_runtime: Option) { + fn validator_exit() { + let tpu_address = "127.0.0.1:0".parse().unwrap(); let bank = Bank::default_for_tests(); let bank_forks = BankForks::new_rw_arc(bank); let (sender, receiver) = bounded(0); @@ -539,15 +552,22 @@ mod test { }; let exit = Arc::new(AtomicBool::new(false)); - let client = C::create_client(maybe_runtime, "127.0.0.1:0".parse().unwrap(), None, 1); - let _send_transaction_service = - SendTransactionService::new(&bank_forks, receiver, client.clone(), 1000, exit.clone()); + let connection_cache = Arc::new(ConnectionCache::new("connection_cache_test")); + let _send_transaction_service = SendTransactionService::new::( + tpu_address, + &bank_forks, + None, + receiver, + connection_cache, + 1000, + 1, + exit.clone(), + ); sender.send(dummy_tx_info()).unwrap(); thread::spawn(move || { exit.store(true, Ordering::Relaxed); - client.cancel(); }); let mut option = Ok(()); @@ -556,25 +576,33 @@ mod test { } } - #[test] - fn validator_exit_with_connection_cache() { - validator_exit::>(None); - } - - #[tokio::test(flavor = "multi_thread")] - async fn validator_exit_with_tpu_client_next() { - validator_exit::>(Some(Handle::current())); + fn create_client( + tpu_peers: Option>, + leader_forward_count: u64, + ) -> ConnectionCacheClient { + let tpu_address = "127.0.0.1:0".parse().unwrap(); + let connection_cache = Arc::new(ConnectionCache::new("connection_cache_test")); + + ConnectionCacheClient::new( + connection_cache, + tpu_address, + tpu_peers, + None, + leader_forward_count, + ) } - fn process_transactions(maybe_runtime: Option) { + #[test] + fn process_transactions() { solana_logger::setup(); let (mut genesis_config, mint_keypair) = create_genesis_config(4); genesis_config.fee_rate_governor = solana_sdk::fee_calculator::FeeRateGovernor::new(0, 0); let (_, bank_forks) = Bank::new_with_bank_forks_for_tests(&genesis_config); - - let leader_forward_count = 1; - let config = Config::default(); + let config = Config { + leader_forward_count: 1, + ..Config::default() + }; let root_bank = Bank::new_from_parent( bank_forks.read().unwrap().working_bank(), @@ -630,13 +658,8 @@ mod test { ), ); - let client = C::create_client( - maybe_runtime, - "127.0.0.1:0".parse().unwrap(), - config.tpu_peers, - leader_forward_count, - ); - let result = SendTransactionService::process_transactions( + let client = create_client(config.tpu_peers, config.leader_forward_count); + let result = SendTransactionService::process_transactions::( &working_bank, &root_bank, &mut transactions, @@ -668,7 +691,7 @@ mod test { Some(Instant::now()), ), ); - let result = SendTransactionService::process_transactions( + let result = SendTransactionService::process_transactions::( &working_bank, &root_bank, &mut transactions, @@ -700,7 +723,7 @@ mod test { Some(Instant::now()), ), ); - let result = SendTransactionService::process_transactions( + let result = SendTransactionService::process_transactions::( &working_bank, &root_bank, &mut transactions, @@ -732,7 +755,7 @@ mod test { Some(Instant::now()), ), ); - let result = SendTransactionService::process_transactions( + let result = SendTransactionService::process_transactions::( &working_bank, &root_bank, &mut transactions, @@ -766,7 +789,7 @@ mod test { ), ); - let result = SendTransactionService::process_transactions( + let result = SendTransactionService::process_transactions::( &working_bank, &root_bank, &mut transactions, @@ -810,7 +833,7 @@ mod test { Some(Instant::now().sub(Duration::from_millis(4000))), ), ); - let result = SendTransactionService::process_transactions( + let result = SendTransactionService::process_transactions::( &working_bank, &root_bank, &mut transactions, @@ -830,7 +853,7 @@ mod test { ..ProcessTransactionsResult::default() } ); - let result = SendTransactionService::process_transactions( + let result = SendTransactionService::process_transactions::( &working_bank, &root_bank, &mut transactions, @@ -849,27 +872,19 @@ mod test { ..ProcessTransactionsResult::default() } ); - client.cancel(); } #[test] - fn process_transactions_with_connection_cache() { - process_transactions::>(None); - } - - #[tokio::test(flavor = "multi_thread")] - async fn process_transactions_with_tpu_client_next() { - process_transactions::>(Some(Handle::current())); - } - - fn retry_durable_nonce_transactions(maybe_runtime: Option) { + fn test_retry_durable_nonce_transactions() { solana_logger::setup(); let (mut genesis_config, mint_keypair) = create_genesis_config(4); genesis_config.fee_rate_governor = solana_sdk::fee_calculator::FeeRateGovernor::new(0, 0); let (_, bank_forks) = Bank::new_with_bank_forks_for_tests(&genesis_config); - let leader_forward_count = 1; - let config = Config::default(); + let config = Config { + leader_forward_count: 1, + ..Config::default() + }; let root_bank = Bank::new_from_parent( bank_forks.read().unwrap().working_bank(), @@ -934,13 +949,8 @@ mod test { ), ); let stats = SendTransactionServiceStats::default(); - let client = C::create_client( - maybe_runtime, - "127.0.0.1:0".parse().unwrap(), - config.tpu_peers, - leader_forward_count, - ); - let result = SendTransactionService::process_transactions( + let client = create_client(config.tpu_peers, config.leader_forward_count); + let result = SendTransactionService::process_transactions::( &working_bank, &root_bank, &mut transactions, @@ -971,7 +981,7 @@ mod test { Some(Instant::now()), ), ); - let result = SendTransactionService::process_transactions( + let result = SendTransactionService::process_transactions::( &working_bank, &root_bank, &mut transactions, @@ -1004,7 +1014,7 @@ mod test { Some(Instant::now().sub(Duration::from_millis(4000))), ), ); - let result = SendTransactionService::process_transactions( + let result = SendTransactionService::process_transactions::( &working_bank, &root_bank, &mut transactions, @@ -1035,7 +1045,7 @@ mod test { Some(Instant::now()), ), ); - let result = SendTransactionService::process_transactions( + let result = SendTransactionService::process_transactions::( &working_bank, &root_bank, &mut transactions, @@ -1067,7 +1077,7 @@ mod test { Some(Instant::now()), ), ); - let result = SendTransactionService::process_transactions( + let result = SendTransactionService::process_transactions::( &working_bank, &root_bank, &mut transactions, @@ -1099,7 +1109,7 @@ mod test { Some(Instant::now()), ), ); - let result = SendTransactionService::process_transactions( + let result = SendTransactionService::process_transactions::( &working_bank, &root_bank, &mut transactions, @@ -1133,7 +1143,7 @@ mod test { Some(Instant::now().sub(Duration::from_millis(4000))), ), ); - let result = SendTransactionService::process_transactions( + let result = SendTransactionService::process_transactions::( &working_bank, &root_bank, &mut transactions, @@ -1164,7 +1174,7 @@ mod test { let nonce_account = AccountSharedData::new_data(43, &new_nonce_state, &system_program::id()).unwrap(); working_bank.store_account(&nonce_address, &nonce_account); - let result = SendTransactionService::process_transactions( + let result = SendTransactionService::process_transactions::( &working_bank, &root_bank, &mut transactions, @@ -1183,18 +1193,5 @@ mod test { ..ProcessTransactionsResult::default() } ); - client.cancel(); - } - - #[test] - fn retry_durable_nonce_transactions_with_connection_cache() { - retry_durable_nonce_transactions::>(None); - } - - #[tokio::test(flavor = "multi_thread")] - async fn retry_durable_nonce_transactions_with_tpu_client_next() { - retry_durable_nonce_transactions::>(Some( - Handle::current(), - )); } } diff --git a/send-transaction-service/src/test_utils.rs b/send-transaction-service/src/test_utils.rs deleted file mode 100644 index 7df6b3dec95b38..00000000000000 --- a/send-transaction-service/src/test_utils.rs +++ /dev/null @@ -1,95 +0,0 @@ -//! This module contains functionality required to create tests parameterized -//! with the client type. - -use { - crate::{ - tpu_info::NullTpuInfo, - transaction_client::{ - ConnectionCacheClient, TpuClientNextClient, TpuInfoWithSendStatic, TransactionClient, - }, - }, - solana_client::connection_cache::ConnectionCache, - std::{net::SocketAddr, sync::Arc}, - tokio::runtime::Handle, -}; - -// `maybe_runtime` argument is introduced to be able to use runtime from test -// for the TpuClientNext, while ConnectionCache uses runtime created internally -// in the quic-client module and it is impossible to pass test runtime there. -pub trait CreateClient: TransactionClient { - fn create_client( - maybe_runtime: Option, - my_tpu_address: SocketAddr, - tpu_peers: Option>, - leader_forward_count: u64, - ) -> Self; -} - -impl CreateClient for ConnectionCacheClient { - fn create_client( - maybe_runtime: Option, - my_tpu_address: SocketAddr, - tpu_peers: Option>, - leader_forward_count: u64, - ) -> Self { - assert!(maybe_runtime.is_none()); - let connection_cache = Arc::new(ConnectionCache::new("connection_cache_test")); - ConnectionCacheClient::new( - connection_cache, - my_tpu_address, - tpu_peers, - None, - leader_forward_count, - ) - } -} - -impl CreateClient for TpuClientNextClient { - fn create_client( - maybe_runtime: Option, - my_tpu_address: SocketAddr, - tpu_peers: Option>, - leader_forward_count: u64, - ) -> Self { - let runtime_handle = - maybe_runtime.expect("Runtime should be provided for the TpuClientNextClient."); - Self::new( - runtime_handle, - my_tpu_address, - tpu_peers, - None, - leader_forward_count, - None, - ) - } -} - -pub trait Cancelable { - fn cancel(&self); -} - -impl Cancelable for ConnectionCacheClient -where - T: TpuInfoWithSendStatic, -{ - fn cancel(&self) {} -} - -impl Cancelable for TpuClientNextClient -where - T: TpuInfoWithSendStatic + Clone, -{ - fn cancel(&self) { - self.cancel().unwrap(); - } -} - -// Define type alias to simplify definition of test functions. -pub trait ClientWithCreator: - CreateClient + TransactionClient + Cancelable + Send + Clone + 'static -{ -} -impl ClientWithCreator for T where - T: CreateClient + TransactionClient + Cancelable + Send + Clone + 'static -{ -} diff --git a/send-transaction-service/src/tpu_info.rs b/send-transaction-service/src/tpu_info.rs index faeae7bb041f60..2eddfc33891507 100644 --- a/send-transaction-service/src/tpu_info.rs +++ b/send-transaction-service/src/tpu_info.rs @@ -1,24 +1,17 @@ -use {solana_connection_cache::connection_cache::Protocol, std::net::SocketAddr}; +use { + solana_connection_cache::connection_cache::Protocol, solana_sdk::clock::Slot, + std::net::SocketAddr, +}; -/// A trait to abstract out the leader estimation for the SendTransactionService. pub trait TpuInfo { fn refresh_recent_peers(&mut self); - /// Takes `max_count` which specifies how many leaders per - /// `NUM_CONSECUTIVE_LEADER_SLOTS` we want to receive and returns *unique* - /// TPU socket addresses for these leaders. - /// - /// For example, if leader schedule was `[L1, L1, L1, L1, L2, L2, L2, L2, - /// L1, ...]` it will return `[L1, L2]` (the last L1 will be not added to - /// the result). - fn get_unique_leader_tpus(&self, max_count: u64, protocol: Protocol) -> Vec<&SocketAddr>; - - /// Takes `max_count` which specifies how many leaders per - /// `NUM_CONSECUTIVE_LEADER_SLOTS` we want to receive and returns TPU socket - /// addresses for these leaders. - /// - /// For example, if leader schedule was `[L1, L1, L1, L1, L2, L2, L2, L2, - /// L1, ...]` it will return `[L1, L2, L1]`. fn get_leader_tpus(&self, max_count: u64, protocol: Protocol) -> Vec<&SocketAddr>; + /// In addition to the tpu address, also return the leader slot + fn get_leader_tpus_with_slots( + &self, + max_count: u64, + protocol: Protocol, + ) -> Vec<(&SocketAddr, Slot)>; } #[derive(Clone)] @@ -26,10 +19,14 @@ pub struct NullTpuInfo; impl TpuInfo for NullTpuInfo { fn refresh_recent_peers(&mut self) {} - fn get_unique_leader_tpus(&self, _max_count: u64, _protocol: Protocol) -> Vec<&SocketAddr> { + fn get_leader_tpus(&self, _max_count: u64, _protocol: Protocol) -> Vec<&SocketAddr> { vec![] } - fn get_leader_tpus(&self, _max_count: u64, _protocol: Protocol) -> Vec<&SocketAddr> { + fn get_leader_tpus_with_slots( + &self, + _max_count: u64, + _protocol: Protocol, + ) -> Vec<(&SocketAddr, Slot)> { vec![] } } diff --git a/send-transaction-service/src/transaction_client.rs b/send-transaction-service/src/transaction_client.rs index 29d1c3c3411944..d7910b2b8609c3 100644 --- a/send-transaction-service/src/transaction_client.rs +++ b/send-transaction-service/src/transaction_client.rs @@ -1,47 +1,25 @@ use { crate::{send_transaction_service_stats::SendTransactionServiceStats, tpu_info::TpuInfo}, - async_trait::async_trait, log::warn, - solana_client::connection_cache::{ConnectionCache, Protocol}, + solana_client::connection_cache::ConnectionCache, solana_connection_cache::client_connection::ClientConnection as TpuConnection, solana_measure::measure::Measure, - solana_sdk::signature::Keypair, - solana_tpu_client_next::{ - connection_workers_scheduler::{ - ConnectionWorkersSchedulerConfig, Fanout, TransactionStatsAndReceiver, - }, - leader_updater::LeaderUpdater, - transaction_batch::TransactionBatch, - ConnectionWorkersScheduler, ConnectionWorkersSchedulerError, - }, std::{ - net::{Ipv4Addr, SocketAddr}, + net::SocketAddr, sync::{atomic::Ordering, Arc, Mutex}, time::{Duration, Instant}, }, - tokio::{ - runtime::Handle, - sync::mpsc::{self}, - task::JoinHandle, - }, - tokio_util::sync::CancellationToken, }; -// Alias trait to shorten function definitions. -pub trait TpuInfoWithSendStatic: TpuInfo + std::marker::Send + 'static {} -impl TpuInfoWithSendStatic for T where T: TpuInfo + std::marker::Send + 'static {} - pub trait TransactionClient { fn send_transactions_in_batch( &self, wire_transactions: Vec>, stats: &SendTransactionServiceStats, ); - - fn protocol(&self) -> Protocol; } -pub struct ConnectionCacheClient { +pub struct ConnectionCacheClient { connection_cache: Arc, tpu_address: SocketAddr, tpu_peers: Option>, @@ -49,10 +27,10 @@ pub struct ConnectionCacheClient { leader_forward_count: u64, } -// Manual implementation of Clone to avoid requiring T to be Clone +// Manual implementation of Clone without requiring T to be Clone impl Clone for ConnectionCacheClient where - T: TpuInfoWithSendStatic, + T: TpuInfo + std::marker::Send + 'static, { fn clone(&self) -> Self { Self { @@ -67,7 +45,7 @@ where impl ConnectionCacheClient where - T: TpuInfoWithSendStatic, + T: TpuInfo + std::marker::Send + 'static, { pub fn new( connection_cache: Arc, @@ -86,13 +64,11 @@ where } } - fn get_unique_tpu_addresses<'a>(&'a self, leader_info: Option<&'a T>) -> Vec<&'a SocketAddr> { + fn get_tpu_addresses<'a>(&'a self, leader_info: Option<&'a T>) -> Vec<&'a SocketAddr> { leader_info .map(|leader_info| { - leader_info.get_unique_leader_tpus( - self.leader_forward_count, - self.connection_cache.protocol(), - ) + leader_info + .get_leader_tpus(self.leader_forward_count, self.connection_cache.protocol()) }) .filter(|addresses| !addresses.is_empty()) .unwrap_or_else(|| vec![&self.tpu_address]) @@ -124,7 +100,7 @@ where impl TransactionClient for ConnectionCacheClient where - T: TpuInfoWithSendStatic, + T: TpuInfo + std::marker::Send + 'static, { fn send_transactions_in_batch( &self, @@ -139,182 +115,13 @@ where .unwrap_or_default(); let mut leader_info_provider = self.leader_info_provider.lock().unwrap(); let leader_info = leader_info_provider.get_leader_info(); - let leader_addresses = self.get_unique_tpu_addresses(leader_info); + let leader_addresses = self.get_tpu_addresses(leader_info); addresses.extend(leader_addresses); for address in &addresses { self.send_transactions(address, wire_transactions.clone(), stats); } } - - fn protocol(&self) -> Protocol { - self.connection_cache.protocol() - } -} - -#[derive(Clone)] -pub struct SendTransactionServiceLeaderUpdater { - leader_info_provider: CurrentLeaderInfo, - my_tpu_address: SocketAddr, - tpu_peers: Option>, -} - -#[async_trait] -impl LeaderUpdater for SendTransactionServiceLeaderUpdater -where - T: TpuInfoWithSendStatic, -{ - fn next_leaders(&mut self, lookahead_leaders: usize) -> Vec { - let discovered_peers = self - .leader_info_provider - .get_leader_info() - .map(|leader_info| { - leader_info.get_leader_tpus(lookahead_leaders as u64, Protocol::QUIC) - }) - .filter(|addresses| !addresses.is_empty()) - .unwrap_or_else(|| vec![&self.my_tpu_address]); - let mut all_peers = self.tpu_peers.clone().unwrap_or_default(); - all_peers.extend(discovered_peers.into_iter().cloned()); - all_peers - } - async fn stop(&mut self) {} -} - -type TpuClientJoinHandle = - JoinHandle>; - -/// `TpuClientNextClient` provides an interface for managing the -/// [`ConnectionWorkersScheduler`]. -/// -/// It allows: -/// * Create and initializes the scheduler with runtime configurations, -/// * Send transactions to the connection scheduler, -/// * Update the validator identity keypair and propagate the changes to the -/// scheduler. Most of the complexity of this structure arises from this -/// functionality. -/// -#[allow( - dead_code, - reason = "Unused fields will be utilized soon,\ - added in advance to avoid larger changes in the code." -)] -#[derive(Clone)] -pub struct TpuClientNextClient -where - T: TpuInfoWithSendStatic + Clone, -{ - runtime_handle: Handle, - sender: mpsc::Sender, - // This handle is needed to implement `NotifyKeyUpdate` trait. It's only - // method takes &self and thus we need to wrap with Mutex. - join_and_cancel: Arc, CancellationToken)>>, - leader_updater: SendTransactionServiceLeaderUpdater, - leader_forward_count: u64, -} - -impl TpuClientNextClient -where - T: TpuInfoWithSendStatic + Clone, -{ - pub fn new( - runtime_handle: Handle, - my_tpu_address: SocketAddr, - tpu_peers: Option>, - leader_info: Option, - leader_forward_count: u64, - identity: Option, - ) -> Self - where - T: TpuInfoWithSendStatic + Clone, - { - // The channel size represents 8s worth of transactions at a rate of - // 1000 tps, assuming batch size is 64. - let (sender, receiver) = mpsc::channel(128); - - let cancel = CancellationToken::new(); - - let leader_info_provider = CurrentLeaderInfo::new(leader_info); - let leader_updater: SendTransactionServiceLeaderUpdater = - SendTransactionServiceLeaderUpdater { - leader_info_provider, - my_tpu_address, - tpu_peers, - }; - let config = Self::create_config(identity, leader_forward_count as usize); - let handle = runtime_handle.spawn(ConnectionWorkersScheduler::run( - config, - Box::new(leader_updater.clone()), - receiver, - cancel.clone(), - )); - - Self { - runtime_handle, - join_and_cancel: Arc::new(Mutex::new((Some(handle), cancel))), - sender, - leader_updater, - leader_forward_count, - } - } - - fn create_config( - identity: Option, - leader_forward_count: usize, - ) -> ConnectionWorkersSchedulerConfig { - ConnectionWorkersSchedulerConfig { - bind: SocketAddr::new(Ipv4Addr::new(0, 0, 0, 0).into(), 0), - identity, - // to match MAX_CONNECTIONS from ConnectionCache - num_connections: 1024, - skip_check_transaction_age: true, - // experimentally found parameter values - worker_channel_size: 64, - max_reconnect_attempts: 4, - leaders_fanout: Fanout { - connect: leader_forward_count, - send: leader_forward_count, - }, - } - } - - #[cfg(any(test, feature = "dev-context-only-utils"))] - pub fn cancel(&self) -> Result<(), Box> { - let Ok(lock) = self.join_and_cancel.lock() else { - return Err("Failed to stop scheduler: TpuClientNext task panicked.".into()); - }; - lock.1.cancel(); - Ok(()) - } -} - -impl TransactionClient for TpuClientNextClient -where - T: TpuInfoWithSendStatic + Clone, -{ - fn send_transactions_in_batch( - &self, - wire_transactions: Vec>, - stats: &SendTransactionServiceStats, - ) { - let mut measure = Measure::start("send-us"); - self.runtime_handle.spawn({ - let sender = self.sender.clone(); - async move { - let res = sender.send(TransactionBatch::new(wire_transactions)).await; - if res.is_err() { - warn!("Failed to send transaction to channel: it is closed."); - } - } - }); - - measure.stop(); - stats.send_us.fetch_add(measure.as_us(), Ordering::Relaxed); - stats.send_attempt_count.fetch_add(1, Ordering::Relaxed); - } - - fn protocol(&self) -> Protocol { - Protocol::QUIC - } } /// The leader info refresh rate. @@ -322,10 +129,9 @@ pub const LEADER_INFO_REFRESH_RATE_MS: u64 = 1000; /// A struct responsible for holding up-to-date leader information /// used for sending transactions. -#[derive(Clone)] pub(crate) struct CurrentLeaderInfo where - T: TpuInfoWithSendStatic, + T: TpuInfo + std::marker::Send + 'static, { /// The last time the leader info was refreshed last_leader_refresh: Option, @@ -339,7 +145,7 @@ where impl CurrentLeaderInfo where - T: TpuInfoWithSendStatic, + T: TpuInfo + std::marker::Send + 'static, { /// Get the leader info, refresh if expired pub fn get_leader_info(&mut self) -> Option<&T> { diff --git a/svm/examples/Cargo.lock b/svm/examples/Cargo.lock index 56c0ccc16b7a88..3fa2b305a21cdf 100644 --- a/svm/examples/Cargo.lock +++ b/svm/examples/Cargo.lock @@ -6917,7 +6917,6 @@ dependencies = [ name = "solana-send-transaction-service" version = "2.2.0" dependencies = [ - "async-trait", "crossbeam-channel", "itertools 0.12.1", "log", @@ -6925,12 +6924,9 @@ dependencies = [ "solana-connection-cache", "solana-measure", "solana-metrics", - "solana-quic-client", "solana-runtime", "solana-sdk", - "solana-tpu-client-next", - "tokio", - "tokio-util 0.7.12", + "solana-tpu-client", ] [[package]] @@ -7430,31 +7426,6 @@ dependencies = [ "tokio", ] -[[package]] -name = "solana-tpu-client-next" -version = "2.2.0" -dependencies = [ - "async-trait", - "log", - "lru", - "quinn", - "rustls 0.23.20", - "solana-clock", - "solana-connection-cache", - "solana-keypair", - "solana-logger", - "solana-measure", - "solana-quic-definitions", - "solana-rpc-client", - "solana-streamer", - "solana-time-utils", - "solana-tls-utils", - "solana-tpu-client", - "thiserror 2.0.9", - "tokio", - "tokio-util 0.7.12", -] - [[package]] name = "solana-transaction" version = "2.2.0" diff --git a/tls-utils/src/quic_client_certificate.rs b/tls-utils/src/quic_client_certificate.rs index 24eea9cbc26a13..e75c541ea3500d 100644 --- a/tls-utils/src/quic_client_certificate.rs +++ b/tls-utils/src/quic_client_certificate.rs @@ -10,13 +10,8 @@ pub struct QuicClientCertificate { } impl QuicClientCertificate { - pub fn new(keypair: Option<&Keypair>) -> Self { - if let Some(keypair) = keypair { - let (certificate, key) = new_dummy_x509_certificate(keypair); - Self { certificate, key } - } else { - let (certificate, key) = new_dummy_x509_certificate(&Keypair::new()); - Self { certificate, key } - } + pub fn new(keypair: &Keypair) -> Self { + let (certificate, key) = new_dummy_x509_certificate(keypair); + Self { certificate, key } } } diff --git a/tpu-client-next/src/connection_workers_scheduler.rs b/tpu-client-next/src/connection_workers_scheduler.rs index f74e7acfe0acbd..20bec8e58d78f8 100644 --- a/tpu-client-next/src/connection_workers_scheduler.rs +++ b/tpu-client-next/src/connection_workers_scheduler.rs @@ -70,7 +70,7 @@ pub struct ConnectionWorkersSchedulerConfig { /// Optional stake identity keypair used in the endpoint certificate for /// identifying the sender. - pub identity: Option, + pub stake_identity: Option, /// The number of connections to be maintained by the scheduler. pub num_connections: usize, @@ -90,11 +90,6 @@ pub struct ConnectionWorkersSchedulerConfig { pub leaders_fanout: Fanout, } -pub type TransactionStatsAndReceiver = ( - SendTransactionStatsPerAddr, - mpsc::Receiver, -); - impl ConnectionWorkersScheduler { /// Starts the scheduler, which manages the distribution of transactions to /// the network's upcoming leaders. @@ -108,7 +103,7 @@ impl ConnectionWorkersScheduler { pub async fn run( ConnectionWorkersSchedulerConfig { bind, - identity, + stake_identity: validator_identity, num_connections, skip_check_transaction_age, worker_channel_size, @@ -118,14 +113,14 @@ impl ConnectionWorkersScheduler { mut leader_updater: Box, mut transaction_receiver: mpsc::Receiver, cancel: CancellationToken, - ) -> Result { - let endpoint = Self::setup_endpoint(bind, identity.as_ref())?; + ) -> Result { + let endpoint = Self::setup_endpoint(bind, validator_identity)?; debug!("Client endpoint bind address: {:?}", endpoint.local_addr()); let mut workers = WorkersCache::new(num_connections, cancel.clone()); let mut send_stats_per_addr = SendTransactionStatsPerAddr::new(); loop { - let transaction_batch: TransactionBatch = tokio::select! { + let transaction_batch = tokio::select! { recv_res = transaction_receiver.recv() => match recv_res { Some(txs) => txs, None => { @@ -189,15 +184,19 @@ impl ConnectionWorkersScheduler { endpoint.close(0u32.into(), b"Closing connection"); leader_updater.stop().await; - Ok((send_stats_per_addr, transaction_receiver)) + Ok(send_stats_per_addr) } /// Sets up the QUIC endpoint for the scheduler to handle connections. fn setup_endpoint( bind: SocketAddr, - identity: Option<&Keypair>, + validator_identity: Option, ) -> Result { - let client_certificate = QuicClientCertificate::new(identity); + let client_certificate = if let Some(validator_identity) = validator_identity { + Arc::new(QuicClientCertificate::new(&validator_identity)) + } else { + Arc::new(QuicClientCertificate::new(&Keypair::new())) + }; let client_config = create_client_config(client_certificate); let endpoint = create_client_endpoint(bind, client_config)?; Ok(endpoint) diff --git a/tpu-client-next/src/quic_networking.rs b/tpu-client-next/src/quic_networking.rs index 7aa65d969d8b11..d0e45c1a8a0b23 100644 --- a/tpu-client-next/src/quic_networking.rs +++ b/tpu-client-next/src/quic_networking.rs @@ -18,7 +18,7 @@ pub use { solana_tls_utils::QuicClientCertificate, }; -pub(crate) fn create_client_config(client_certificate: QuicClientCertificate) -> ClientConfig { +pub(crate) fn create_client_config(client_certificate: Arc) -> ClientConfig { // adapted from QuicLazyInitializedEndpoint::create_endpoint let mut crypto = tls_client_config_builder() .with_client_auth_cert( diff --git a/tpu-client-next/tests/connection_workers_scheduler_test.rs b/tpu-client-next/tests/connection_workers_scheduler_test.rs index aa2c5a9b84286a..bce18296d4a491 100644 --- a/tpu-client-next/tests/connection_workers_scheduler_test.rs +++ b/tpu-client-next/tests/connection_workers_scheduler_test.rs @@ -15,13 +15,11 @@ use { streamer::StakedNodes, }, solana_tpu_client_next::{ - connection_workers_scheduler::{ - ConnectionWorkersSchedulerConfig, Fanout, TransactionStatsAndReceiver, - }, + connection_workers_scheduler::{ConnectionWorkersSchedulerConfig, Fanout}, leader_updater::create_leader_updater, send_transaction_stats::SendTransactionStatsNonAtomic, transaction_batch::TransactionBatch, - ConnectionWorkersScheduler, ConnectionWorkersSchedulerError, + ConnectionWorkersScheduler, ConnectionWorkersSchedulerError, SendTransactionStatsPerAddr, }, std::{ collections::HashMap, @@ -42,10 +40,10 @@ use { tokio_util::sync::CancellationToken, }; -fn test_config(identity: Option) -> ConnectionWorkersSchedulerConfig { +fn test_config(validator_identity: Option) -> ConnectionWorkersSchedulerConfig { ConnectionWorkersSchedulerConfig { bind: SocketAddr::new(Ipv4Addr::new(127, 0, 0, 1).into(), 0), - identity, + stake_identity: validator_identity, num_connections: 1, skip_check_transaction_age: false, // At the moment we have only one strategy to send transactions: we try @@ -65,9 +63,9 @@ fn test_config(identity: Option) -> ConnectionWorkersSchedulerConfig { async fn setup_connection_worker_scheduler( tpu_address: SocketAddr, transaction_receiver: Receiver, - identity: Option, + validator_identity: Option, ) -> ( - JoinHandle>, + JoinHandle>, CancellationToken, ) { let json_rpc_url = "http://127.0.0.1:8899"; @@ -84,7 +82,7 @@ async fn setup_connection_worker_scheduler( .expect("Leader updates was successfully created"); let cancel = CancellationToken::new(); - let config = test_config(identity); + let config = test_config(validator_identity); let scheduler = tokio::spawn(ConnectionWorkersScheduler::run( config, leader_updater, @@ -97,10 +95,10 @@ async fn setup_connection_worker_scheduler( async fn join_scheduler( scheduler_handle: JoinHandle< - Result, + Result, >, ) -> SendTransactionStatsNonAtomic { - let (stats_per_ip, _) = scheduler_handle + let stats_per_ip = scheduler_handle .await .unwrap() .expect("Scheduler should stop successfully."); @@ -402,8 +400,8 @@ async fn test_connection_pruned_and_reopened() { /// connection and verify that all the txs has been received. #[tokio::test] async fn test_staked_connection() { - let identity = Keypair::new(); - let stakes = HashMap::from([(identity.pubkey(), 100_000)]); + let validator_identity = Keypair::new(); + let stakes = HashMap::from([(validator_identity.pubkey(), 100_000)]); let staked_nodes = StakedNodes::new(Arc::new(stakes), HashMap::::default()); let SpawnTestServerResult { @@ -434,7 +432,8 @@ async fn test_staked_connection() { } = spawn_tx_sender(tx_size, expected_num_txs, Duration::from_millis(100)); let (scheduler_handle, _scheduler_cancel) = - setup_connection_worker_scheduler(server_address, tx_receiver, Some(identity)).await; + setup_connection_worker_scheduler(server_address, tx_receiver, Some(validator_identity)) + .await; // Check results let actual_num_packets = count_received_packets_for(receiver, tx_size, TEST_MAX_TIME).await; @@ -534,7 +533,7 @@ async fn test_no_host() { // While attempting to establish a connection with a nonexistent host, we fill the worker's // channel. - let (stats, _) = scheduler_handle + let stats = scheduler_handle .await .expect("Scheduler should stop successfully") .expect("Scheduler execution was successful");