From afb0415661903101ec40d7d412c6829a5a72fc6e Mon Sep 17 00:00:00 2001 From: ryleung-solana <91908731+ryleung-solana@users.noreply.github.com> Date: Tue, 19 Mar 2024 03:05:00 +0800 Subject: [PATCH] Make the quic server connection table use an async lock, reducing thrashing (#293) Make the quic server connection table use an async lock, reducing lock contention (cherry picked from commit 36e97654e3ea12beb37c8a7459b16e15cc94a3f0) --- streamer/src/nonblocking/quic.rs | 34 ++++++++++++++++++++++++-------- 1 file changed, 26 insertions(+), 8 deletions(-) diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index 04b2e66e8f319e..0c278ea818e814 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -30,13 +30,26 @@ use { std::{ iter::repeat_with, net::{IpAddr, SocketAddr, UdpSocket}, + // CAUTION: be careful not to introduce any awaits while holding an RwLock. sync::{ atomic::{AtomicBool, AtomicU64, Ordering}, - Arc, Mutex, MutexGuard, RwLock, + Arc, RwLock, }, time::{Duration, Instant}, }, - tokio::{task::JoinHandle, time::timeout}, + tokio::{ + // CAUTION: It's kind of sketch that we're mixing async and sync locks (see the RwLock above). + // This is done so that sync code can also access the stake table. + // Make sure we don't hold a sync lock across an await - including the await to + // lock an async Mutex. This does not happen now and should not happen as long as we + // don't hold an async Mutex and sync RwLock at the same time (currently true) + // but if we do, the scope of the RwLock must always be a subset of the async Mutex + // (i.e. lock order is always async Mutex -> RwLock). Also, be careful not to + // introduce any other awaits while holding the RwLock. + sync::{Mutex, MutexGuard}, + task::JoinHandle, + time::timeout, + }, }; /// Limit to 500K PPS @@ -383,7 +396,7 @@ fn handle_and_cache_new_connection( } } -fn prune_unstaked_connections_and_add_new_connection( +async fn prune_unstaked_connections_and_add_new_connection( connection: Connection, connection_table: Arc>, max_connections: usize, @@ -393,7 +406,7 @@ fn prune_unstaked_connections_and_add_new_connection( let stats = params.stats.clone(); if max_connections > 0 { let connection_table_clone = connection_table.clone(); - let mut connection_table = connection_table.lock().unwrap(); + let mut connection_table = connection_table.lock().await; prune_unstaked_connection_table(&mut connection_table, max_connections, stats); handle_and_cache_new_connection( connection, @@ -496,7 +509,8 @@ async fn setup_connection( ); if params.stake > 0 { - let mut connection_table_l = staked_connection_table.lock().unwrap(); + let mut connection_table_l = staked_connection_table.lock().await; + if connection_table_l.total_size >= max_staked_connections { let num_pruned = connection_table_l.prune_random(PRUNE_RANDOM_SAMPLE_SIZE, params.stake); @@ -525,7 +539,9 @@ async fn setup_connection( max_unstaked_connections, ¶ms, wait_for_chunk_timeout, - ) { + ) + .await + { stats .connection_added_from_staked_peer .fetch_add(1, Ordering::Relaxed); @@ -544,7 +560,9 @@ async fn setup_connection( max_unstaked_connections, ¶ms, wait_for_chunk_timeout, - ) { + ) + .await + { stats .connection_added_from_unstaked_peer .fetch_add(1, Ordering::Relaxed); @@ -807,7 +825,7 @@ async fn handle_connection( } } - let removed_connection_count = connection_table.lock().unwrap().remove_connection( + let removed_connection_count = connection_table.lock().await.remove_connection( ConnectionTableKey::new(remote_addr.ip(), params.remote_pubkey), remote_addr.port(), stable_id,