diff --git a/tls-utils/src/quic_client_certificate.rs b/tls-utils/src/quic_client_certificate.rs index e75c541ea3500d..4870e3493a7075 100644 --- a/tls-utils/src/quic_client_certificate.rs +++ b/tls-utils/src/quic_client_certificate.rs @@ -10,7 +10,12 @@ pub struct QuicClientCertificate { } impl QuicClientCertificate { - pub fn new(keypair: &Keypair) -> Self { + pub fn new(keypair: Option<&Keypair>) -> Self { + let keypair = if let Some(keypair) = keypair { + keypair + } else { + &Keypair::new() + }; let (certificate, key) = new_dummy_x509_certificate(keypair); Self { certificate, key } } diff --git a/tpu-client-next/src/connection_workers_scheduler.rs b/tpu-client-next/src/connection_workers_scheduler.rs index 20bec8e58d78f8..9e2f4a0f91d83e 100644 --- a/tpu-client-next/src/connection_workers_scheduler.rs +++ b/tpu-client-next/src/connection_workers_scheduler.rs @@ -90,20 +90,28 @@ pub struct ConnectionWorkersSchedulerConfig { pub leaders_fanout: Fanout, } +pub type TransactionStatsAndReceiver = ( + SendTransactionStatsPerAddr, + mpsc::Receiver, +); + impl ConnectionWorkersScheduler { /// Starts the scheduler, which manages the distribution of transactions to /// the network's upcoming leaders. /// /// Runs the main loop that handles worker scheduling and management for /// connections. Returns the error quic statistics per connection address or - /// an error. + /// an error along with receiver for transactions. The receiver returned + /// back to the user because in some cases we need to re-utilize the same + /// receiver for the new scheduler. For example, this happens when the + /// identity for the validator is updated. /// /// Importantly, if some transactions were not delivered due to network /// problems, they will not be retried when the problem is resolved. pub async fn run( ConnectionWorkersSchedulerConfig { bind, - stake_identity: validator_identity, + stake_identity, num_connections, skip_check_transaction_age, worker_channel_size, @@ -113,14 +121,14 @@ impl ConnectionWorkersScheduler { mut leader_updater: Box, mut transaction_receiver: mpsc::Receiver, cancel: CancellationToken, - ) -> Result { - let endpoint = Self::setup_endpoint(bind, validator_identity)?; + ) -> Result { + let endpoint = Self::setup_endpoint(bind, stake_identity.as_ref())?; debug!("Client endpoint bind address: {:?}", endpoint.local_addr()); let mut workers = WorkersCache::new(num_connections, cancel.clone()); let mut send_stats_per_addr = SendTransactionStatsPerAddr::new(); loop { - let transaction_batch = tokio::select! { + let transaction_batch: TransactionBatch = tokio::select! { recv_res = transaction_receiver.recv() => match recv_res { Some(txs) => txs, None => { @@ -184,19 +192,15 @@ impl ConnectionWorkersScheduler { endpoint.close(0u32.into(), b"Closing connection"); leader_updater.stop().await; - Ok(send_stats_per_addr) + Ok((send_stats_per_addr, transaction_receiver)) } /// Sets up the QUIC endpoint for the scheduler to handle connections. fn setup_endpoint( bind: SocketAddr, - validator_identity: Option, + stake_identity: Option<&Keypair>, ) -> Result { - let client_certificate = if let Some(validator_identity) = validator_identity { - Arc::new(QuicClientCertificate::new(&validator_identity)) - } else { - Arc::new(QuicClientCertificate::new(&Keypair::new())) - }; + let client_certificate = QuicClientCertificate::new(stake_identity); let client_config = create_client_config(client_certificate); let endpoint = create_client_endpoint(bind, client_config)?; Ok(endpoint) diff --git a/tpu-client-next/src/quic_networking.rs b/tpu-client-next/src/quic_networking.rs index d0e45c1a8a0b23..7aa65d969d8b11 100644 --- a/tpu-client-next/src/quic_networking.rs +++ b/tpu-client-next/src/quic_networking.rs @@ -18,7 +18,7 @@ pub use { solana_tls_utils::QuicClientCertificate, }; -pub(crate) fn create_client_config(client_certificate: Arc) -> ClientConfig { +pub(crate) fn create_client_config(client_certificate: QuicClientCertificate) -> ClientConfig { // adapted from QuicLazyInitializedEndpoint::create_endpoint let mut crypto = tls_client_config_builder() .with_client_auth_cert( diff --git a/tpu-client-next/tests/connection_workers_scheduler_test.rs b/tpu-client-next/tests/connection_workers_scheduler_test.rs index bce18296d4a491..2b83d0f088a132 100644 --- a/tpu-client-next/tests/connection_workers_scheduler_test.rs +++ b/tpu-client-next/tests/connection_workers_scheduler_test.rs @@ -15,11 +15,13 @@ use { streamer::StakedNodes, }, solana_tpu_client_next::{ - connection_workers_scheduler::{ConnectionWorkersSchedulerConfig, Fanout}, + connection_workers_scheduler::{ + ConnectionWorkersSchedulerConfig, Fanout, TransactionStatsAndReceiver, + }, leader_updater::create_leader_updater, send_transaction_stats::SendTransactionStatsNonAtomic, transaction_batch::TransactionBatch, - ConnectionWorkersScheduler, ConnectionWorkersSchedulerError, SendTransactionStatsPerAddr, + ConnectionWorkersScheduler, ConnectionWorkersSchedulerError, }, std::{ collections::HashMap, @@ -40,10 +42,10 @@ use { tokio_util::sync::CancellationToken, }; -fn test_config(validator_identity: Option) -> ConnectionWorkersSchedulerConfig { +fn test_config(stake_identity: Option) -> ConnectionWorkersSchedulerConfig { ConnectionWorkersSchedulerConfig { bind: SocketAddr::new(Ipv4Addr::new(127, 0, 0, 1).into(), 0), - stake_identity: validator_identity, + stake_identity, num_connections: 1, skip_check_transaction_age: false, // At the moment we have only one strategy to send transactions: we try @@ -63,9 +65,9 @@ fn test_config(validator_identity: Option) -> ConnectionWorkersSchedule async fn setup_connection_worker_scheduler( tpu_address: SocketAddr, transaction_receiver: Receiver, - validator_identity: Option, + stake_identity: Option, ) -> ( - JoinHandle>, + JoinHandle>, CancellationToken, ) { let json_rpc_url = "http://127.0.0.1:8899"; @@ -82,7 +84,7 @@ async fn setup_connection_worker_scheduler( .expect("Leader updates was successfully created"); let cancel = CancellationToken::new(); - let config = test_config(validator_identity); + let config = test_config(stake_identity); let scheduler = tokio::spawn(ConnectionWorkersScheduler::run( config, leader_updater, @@ -95,10 +97,10 @@ async fn setup_connection_worker_scheduler( async fn join_scheduler( scheduler_handle: JoinHandle< - Result, + Result, >, ) -> SendTransactionStatsNonAtomic { - let stats_per_ip = scheduler_handle + let (stats_per_ip, _) = scheduler_handle .await .unwrap() .expect("Scheduler should stop successfully."); @@ -400,8 +402,8 @@ async fn test_connection_pruned_and_reopened() { /// connection and verify that all the txs has been received. #[tokio::test] async fn test_staked_connection() { - let validator_identity = Keypair::new(); - let stakes = HashMap::from([(validator_identity.pubkey(), 100_000)]); + let stake_identity = Keypair::new(); + let stakes = HashMap::from([(stake_identity.pubkey(), 100_000)]); let staked_nodes = StakedNodes::new(Arc::new(stakes), HashMap::::default()); let SpawnTestServerResult { @@ -432,8 +434,7 @@ async fn test_staked_connection() { } = spawn_tx_sender(tx_size, expected_num_txs, Duration::from_millis(100)); let (scheduler_handle, _scheduler_cancel) = - setup_connection_worker_scheduler(server_address, tx_receiver, Some(validator_identity)) - .await; + setup_connection_worker_scheduler(server_address, tx_receiver, Some(stake_identity)).await; // Check results let actual_num_packets = count_received_packets_for(receiver, tx_size, TEST_MAX_TIME).await; @@ -533,7 +534,7 @@ async fn test_no_host() { // While attempting to establish a connection with a nonexistent host, we fill the worker's // channel. - let stats = scheduler_handle + let (stats, _) = scheduler_handle .await .expect("Scheduler should stop successfully") .expect("Scheduler execution was successful");