Skip to content

Commit

Permalink
tpu-client-next: return receiver in scheduler::run (#4454)
Browse files Browse the repository at this point in the history
Return receiver from scheduler::run so that it can be reused if required. This is needed to use tpu-client-next in validator because user can make an rpc call to update underlying authority, and due to the way this mechanism is implemented, we have to re-utilize the same receiver.
  • Loading branch information
KirillLykov authored Jan 17, 2025
1 parent e0d672b commit 85b6118
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 28 deletions.
7 changes: 6 additions & 1 deletion tls-utils/src/quic_client_certificate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,12 @@ pub struct QuicClientCertificate {
}

impl QuicClientCertificate {
pub fn new(keypair: &Keypair) -> Self {
pub fn new(keypair: Option<&Keypair>) -> Self {
let keypair = if let Some(keypair) = keypair {
keypair
} else {
&Keypair::new()
};
let (certificate, key) = new_dummy_x509_certificate(keypair);
Self { certificate, key }
}
Expand Down
28 changes: 16 additions & 12 deletions tpu-client-next/src/connection_workers_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,20 +90,28 @@ pub struct ConnectionWorkersSchedulerConfig {
pub leaders_fanout: Fanout,
}

pub type TransactionStatsAndReceiver = (
SendTransactionStatsPerAddr,
mpsc::Receiver<TransactionBatch>,
);

impl ConnectionWorkersScheduler {
/// Starts the scheduler, which manages the distribution of transactions to
/// the network's upcoming leaders.
///
/// Runs the main loop that handles worker scheduling and management for
/// connections. Returns the error quic statistics per connection address or
/// an error.
/// an error along with receiver for transactions. The receiver returned
/// back to the user because in some cases we need to re-utilize the same
/// receiver for the new scheduler. For example, this happens when the
/// identity for the validator is updated.
///
/// Importantly, if some transactions were not delivered due to network
/// problems, they will not be retried when the problem is resolved.
pub async fn run(
ConnectionWorkersSchedulerConfig {
bind,
stake_identity: validator_identity,
stake_identity,
num_connections,
skip_check_transaction_age,
worker_channel_size,
Expand All @@ -113,14 +121,14 @@ impl ConnectionWorkersScheduler {
mut leader_updater: Box<dyn LeaderUpdater>,
mut transaction_receiver: mpsc::Receiver<TransactionBatch>,
cancel: CancellationToken,
) -> Result<SendTransactionStatsPerAddr, ConnectionWorkersSchedulerError> {
let endpoint = Self::setup_endpoint(bind, validator_identity)?;
) -> Result<TransactionStatsAndReceiver, ConnectionWorkersSchedulerError> {
let endpoint = Self::setup_endpoint(bind, stake_identity.as_ref())?;
debug!("Client endpoint bind address: {:?}", endpoint.local_addr());
let mut workers = WorkersCache::new(num_connections, cancel.clone());
let mut send_stats_per_addr = SendTransactionStatsPerAddr::new();

loop {
let transaction_batch = tokio::select! {
let transaction_batch: TransactionBatch = tokio::select! {
recv_res = transaction_receiver.recv() => match recv_res {
Some(txs) => txs,
None => {
Expand Down Expand Up @@ -184,19 +192,15 @@ impl ConnectionWorkersScheduler {

endpoint.close(0u32.into(), b"Closing connection");
leader_updater.stop().await;
Ok(send_stats_per_addr)
Ok((send_stats_per_addr, transaction_receiver))
}

/// Sets up the QUIC endpoint for the scheduler to handle connections.
fn setup_endpoint(
bind: SocketAddr,
validator_identity: Option<Keypair>,
stake_identity: Option<&Keypair>,
) -> Result<Endpoint, ConnectionWorkersSchedulerError> {
let client_certificate = if let Some(validator_identity) = validator_identity {
Arc::new(QuicClientCertificate::new(&validator_identity))
} else {
Arc::new(QuicClientCertificate::new(&Keypair::new()))
};
let client_certificate = QuicClientCertificate::new(stake_identity);
let client_config = create_client_config(client_certificate);
let endpoint = create_client_endpoint(bind, client_config)?;
Ok(endpoint)
Expand Down
2 changes: 1 addition & 1 deletion tpu-client-next/src/quic_networking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub use {
solana_tls_utils::QuicClientCertificate,
};

pub(crate) fn create_client_config(client_certificate: Arc<QuicClientCertificate>) -> ClientConfig {
pub(crate) fn create_client_config(client_certificate: QuicClientCertificate) -> ClientConfig {
// adapted from QuicLazyInitializedEndpoint::create_endpoint
let mut crypto = tls_client_config_builder()
.with_client_auth_cert(
Expand Down
29 changes: 15 additions & 14 deletions tpu-client-next/tests/connection_workers_scheduler_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@ use {
streamer::StakedNodes,
},
solana_tpu_client_next::{
connection_workers_scheduler::{ConnectionWorkersSchedulerConfig, Fanout},
connection_workers_scheduler::{
ConnectionWorkersSchedulerConfig, Fanout, TransactionStatsAndReceiver,
},
leader_updater::create_leader_updater,
send_transaction_stats::SendTransactionStatsNonAtomic,
transaction_batch::TransactionBatch,
ConnectionWorkersScheduler, ConnectionWorkersSchedulerError, SendTransactionStatsPerAddr,
ConnectionWorkersScheduler, ConnectionWorkersSchedulerError,
},
std::{
collections::HashMap,
Expand All @@ -40,10 +42,10 @@ use {
tokio_util::sync::CancellationToken,
};

fn test_config(validator_identity: Option<Keypair>) -> ConnectionWorkersSchedulerConfig {
fn test_config(stake_identity: Option<Keypair>) -> ConnectionWorkersSchedulerConfig {
ConnectionWorkersSchedulerConfig {
bind: SocketAddr::new(Ipv4Addr::new(127, 0, 0, 1).into(), 0),
stake_identity: validator_identity,
stake_identity,
num_connections: 1,
skip_check_transaction_age: false,
// At the moment we have only one strategy to send transactions: we try
Expand All @@ -63,9 +65,9 @@ fn test_config(validator_identity: Option<Keypair>) -> ConnectionWorkersSchedule
async fn setup_connection_worker_scheduler(
tpu_address: SocketAddr,
transaction_receiver: Receiver<TransactionBatch>,
validator_identity: Option<Keypair>,
stake_identity: Option<Keypair>,
) -> (
JoinHandle<Result<SendTransactionStatsPerAddr, ConnectionWorkersSchedulerError>>,
JoinHandle<Result<TransactionStatsAndReceiver, ConnectionWorkersSchedulerError>>,
CancellationToken,
) {
let json_rpc_url = "http://127.0.0.1:8899";
Expand All @@ -82,7 +84,7 @@ async fn setup_connection_worker_scheduler(
.expect("Leader updates was successfully created");

let cancel = CancellationToken::new();
let config = test_config(validator_identity);
let config = test_config(stake_identity);
let scheduler = tokio::spawn(ConnectionWorkersScheduler::run(
config,
leader_updater,
Expand All @@ -95,10 +97,10 @@ async fn setup_connection_worker_scheduler(

async fn join_scheduler(
scheduler_handle: JoinHandle<
Result<SendTransactionStatsPerAddr, ConnectionWorkersSchedulerError>,
Result<TransactionStatsAndReceiver, ConnectionWorkersSchedulerError>,
>,
) -> SendTransactionStatsNonAtomic {
let stats_per_ip = scheduler_handle
let (stats_per_ip, _) = scheduler_handle
.await
.unwrap()
.expect("Scheduler should stop successfully.");
Expand Down Expand Up @@ -400,8 +402,8 @@ async fn test_connection_pruned_and_reopened() {
/// connection and verify that all the txs has been received.
#[tokio::test]
async fn test_staked_connection() {
let validator_identity = Keypair::new();
let stakes = HashMap::from([(validator_identity.pubkey(), 100_000)]);
let stake_identity = Keypair::new();
let stakes = HashMap::from([(stake_identity.pubkey(), 100_000)]);
let staked_nodes = StakedNodes::new(Arc::new(stakes), HashMap::<Pubkey, u64>::default());

let SpawnTestServerResult {
Expand Down Expand Up @@ -432,8 +434,7 @@ async fn test_staked_connection() {
} = spawn_tx_sender(tx_size, expected_num_txs, Duration::from_millis(100));

let (scheduler_handle, _scheduler_cancel) =
setup_connection_worker_scheduler(server_address, tx_receiver, Some(validator_identity))
.await;
setup_connection_worker_scheduler(server_address, tx_receiver, Some(stake_identity)).await;

// Check results
let actual_num_packets = count_received_packets_for(receiver, tx_size, TEST_MAX_TIME).await;
Expand Down Expand Up @@ -533,7 +534,7 @@ async fn test_no_host() {

// While attempting to establish a connection with a nonexistent host, we fill the worker's
// channel.
let stats = scheduler_handle
let (stats, _) = scheduler_handle
.await
.expect("Scheduler should stop successfully")
.expect("Scheduler execution was successful");
Expand Down

0 comments on commit 85b6118

Please sign in to comment.