diff --git a/turbine/src/retransmit_stage.rs b/turbine/src/retransmit_stage.rs index 88cd3730e9b74f..3fc2f19319b355 100644 --- a/turbine/src/retransmit_stage.rs +++ b/turbine/src/retransmit_stage.rs @@ -4,7 +4,7 @@ use { crate::cluster_nodes::{self, ClusterNodes, ClusterNodesCache, Error, MAX_NUM_TURBINE_HOPS}, bytes::Bytes, - crossbeam_channel::{Receiver, RecvTimeoutError}, + crossbeam_channel::{Receiver, RecvError}, lru::LruCache, rand::Rng, rayon::{prelude::*, ThreadPool, ThreadPoolBuilder}, @@ -193,10 +193,12 @@ fn retransmit( max_slots: &MaxSlots, rpc_subscriptions: Option<&RpcSubscriptions>, slot_status_notifier: Option<&SlotStatusNotifier>, -) -> Result<(), RecvTimeoutError> { - const RECV_TIMEOUT: Duration = Duration::from_secs(1); - let mut shreds = shreds_receiver.recv_timeout(RECV_TIMEOUT)?; +) -> Result<(), RecvError> { + // wait for something on the channel + let mut shreds = shreds_receiver.recv()?; + // now the batch has started let mut timer_start = Measure::start("retransmit"); + // drain the channel until it is empty to form a batch shreds.extend(shreds_receiver.try_iter().flatten()); stats.num_shreds += shreds.len(); stats.total_batches += 1; @@ -392,7 +394,9 @@ pub fn retransmitter( ); let mut rng = rand::thread_rng(); let mut shred_deduper = ShredDeduper::new(&mut rng, DEDUPER_NUM_BITS); + let mut stats = RetransmitStats::new(Instant::now()); + #[allow(clippy::manual_clamp)] let num_threads = get_thread_count().min(8).max(sockets.len()); let thread_pool = ThreadPoolBuilder::new() @@ -402,8 +406,8 @@ pub fn retransmitter( .unwrap(); Builder::new() .name("solRetransmittr".to_string()) - .spawn(move || loop { - match retransmit( + .spawn(move || { + while retransmit( &thread_pool, &bank_forks, &leader_schedule_cache, @@ -417,11 +421,9 @@ pub fn retransmitter( &max_slots, rpc_subscriptions.as_deref(), slot_status_notifier.as_ref(), - ) { - Ok(()) => (), - Err(RecvTimeoutError::Timeout) => (), - Err(RecvTimeoutError::Disconnected) => break, - } + ) + .is_ok() + {} }) .unwrap() }