From be89907f381c267c8fd1db60e49d9b7909b29050 Mon Sep 17 00:00:00 2001 From: Stuart Hunt Date: Tue, 14 Nov 2023 16:35:59 -0500 Subject: [PATCH] Fixed lack of guarentee on SCTP -> DTLS packet ordering by removing tokio::spawn. The ordering mixup triggered SCTPs congestion control, severely limitting throughput in practice. --- sctp/src/association/mod.rs | 65 +++++++++++++++++-------------------- 1 file changed, 29 insertions(+), 36 deletions(-) diff --git a/sctp/src/association/mod.rs b/sctp/src/association/mod.rs index fdf2cda4f..08baca447 100644 --- a/sctp/src/association/mod.rs +++ b/sctp/src/association/mod.rs @@ -14,7 +14,7 @@ use association_internal::*; use association_stats::*; use bytes::{Bytes, BytesMut}; use rand::random; -use tokio::sync::{broadcast, mpsc, Mutex, Semaphore}; +use tokio::sync::{broadcast, mpsc, Mutex}; use util::Conn; use crate::chunk::chunk_abort::ChunkAbort; @@ -487,18 +487,6 @@ impl Association { let done = Arc::new(AtomicBool::new(false)); let name = Arc::new(name); - let limit = { - #[cfg(test)] - { - 1 - } - #[cfg(not(test))] - { - 8 - } - }; - - let sem = Arc::new(Semaphore::new(limit)); while !done.load(Ordering::Relaxed) { //log::debug!("[{}] gather_outbound begin", name); let (packets, continue_loop) = { @@ -507,35 +495,40 @@ impl Association { }; //log::debug!("[{}] gather_outbound done with {}", name, packets.len()); - // We schedule a new task here for a reason: - // If we don't tokio tends to run the write_loop and read_loop of one connection on the same OS thread - // This means that even though we release the lock above, the read_loop isn't able to take it, simply because it is not being scheduled by tokio - // Doing it this way, tokio schedules this to a new thread, this future is suspended, and the read_loop can make progress let net_conn = Arc::clone(&net_conn); let bytes_sent = Arc::clone(&bytes_sent); let name2 = Arc::clone(&name); let done2 = Arc::clone(&done); - let sem = Arc::clone(&sem); - sem.acquire().await.unwrap().forget(); - tokio::task::spawn(async move { - let mut buf = BytesMut::with_capacity(16 * 1024); - for raw in packets { - buf.clear(); - if let Err(err) = raw.marshal_to(&mut buf) { - log::warn!("[{}] failed to serialize a packet: {:?}", name2, err); - } else { - let raw = buf.as_ref(); - if let Err(err) = net_conn.send(raw.as_ref()).await { - log::warn!("[{}] failed to write packets on net_conn: {}", name2, err); - done2.store(true, Ordering::Relaxed) - } else { - bytes_sent.fetch_add(raw.len(), Ordering::SeqCst); + let mut buffer = None; + for raw in packets { + let mut buf = buffer.take().unwrap_or_else(|| BytesMut::with_capacity(16 * 1024)); + + // We do the marshalling work in a blocking task here for a reason: + // If we don't tokio tends to run the write_loop and read_loop of one connection on the same OS thread + // This means that even though we release the lock above, the read_loop isn't able to take it, simply because it is not being scheduled by tokio + // Doing it this way, tokio schedules this work on a dedicated blocking thread, this future is suspended, and the read_loop can make progress + match tokio::task::spawn_blocking(move || { + raw.marshal_to(&mut buf).map(|_| buf) + }).await.unwrap() { + Ok(mut buf) => { + let raw = buf.as_ref(); + if let Err(err) = net_conn.send(raw.as_ref()).await { + log::warn!("[{}] failed to write packets on net_conn: {}", name2, err); + done2.store(true, Ordering::Relaxed) + } else { + bytes_sent.fetch_add(raw.len(), Ordering::SeqCst); + } + + // Reuse allocation. Have to use options, since spawn blocking can't borrow, has to take owernship. + buf.clear(); + buffer = Some(buf); + }, + Err(err) => { + log::warn!("[{}] failed to serialize a packet: {:?}", name2, err); } } - //log::debug!("[{}] sending {} bytes done", name, raw.len()); - } - sem.add_permits(1); - }); + //log::debug!("[{}] sending {} bytes done", name, raw.len()); + } if !continue_loop { break;