Skip to content

Commit

Permalink
Fixed lack of guarentee on SCTP -> DTLS packet ordering by removing t…
Browse files Browse the repository at this point in the history
…okio::spawn. The ordering mixup triggered SCTPs congestion control, severely limitting throughput in practice.
  • Loading branch information
onnoowl committed Nov 14, 2023
1 parent d751e94 commit be89907
Showing 1 changed file with 29 additions and 36 deletions.
65 changes: 29 additions & 36 deletions sctp/src/association/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use association_internal::*;
use association_stats::*;
use bytes::{Bytes, BytesMut};
use rand::random;
use tokio::sync::{broadcast, mpsc, Mutex, Semaphore};
use tokio::sync::{broadcast, mpsc, Mutex};
use util::Conn;

use crate::chunk::chunk_abort::ChunkAbort;
Expand Down Expand Up @@ -487,18 +487,6 @@ impl Association {
let done = Arc::new(AtomicBool::new(false));
let name = Arc::new(name);

let limit = {
#[cfg(test)]
{
1
}
#[cfg(not(test))]
{
8
}
};

let sem = Arc::new(Semaphore::new(limit));
while !done.load(Ordering::Relaxed) {
//log::debug!("[{}] gather_outbound begin", name);
let (packets, continue_loop) = {
Expand All @@ -507,35 +495,40 @@ impl Association {
};
//log::debug!("[{}] gather_outbound done with {}", name, packets.len());

// We schedule a new task here for a reason:
// If we don't tokio tends to run the write_loop and read_loop of one connection on the same OS thread
// This means that even though we release the lock above, the read_loop isn't able to take it, simply because it is not being scheduled by tokio
// Doing it this way, tokio schedules this to a new thread, this future is suspended, and the read_loop can make progress
let net_conn = Arc::clone(&net_conn);
let bytes_sent = Arc::clone(&bytes_sent);
let name2 = Arc::clone(&name);
let done2 = Arc::clone(&done);
let sem = Arc::clone(&sem);
sem.acquire().await.unwrap().forget();
tokio::task::spawn(async move {
let mut buf = BytesMut::with_capacity(16 * 1024);
for raw in packets {
buf.clear();
if let Err(err) = raw.marshal_to(&mut buf) {
log::warn!("[{}] failed to serialize a packet: {:?}", name2, err);
} else {
let raw = buf.as_ref();
if let Err(err) = net_conn.send(raw.as_ref()).await {
log::warn!("[{}] failed to write packets on net_conn: {}", name2, err);
done2.store(true, Ordering::Relaxed)
} else {
bytes_sent.fetch_add(raw.len(), Ordering::SeqCst);
let mut buffer = None;
for raw in packets {
let mut buf = buffer.take().unwrap_or_else(|| BytesMut::with_capacity(16 * 1024));

// We do the marshalling work in a blocking task here for a reason:
// If we don't tokio tends to run the write_loop and read_loop of one connection on the same OS thread
// This means that even though we release the lock above, the read_loop isn't able to take it, simply because it is not being scheduled by tokio
// Doing it this way, tokio schedules this work on a dedicated blocking thread, this future is suspended, and the read_loop can make progress
match tokio::task::spawn_blocking(move || {
raw.marshal_to(&mut buf).map(|_| buf)
}).await.unwrap() {
Ok(mut buf) => {
let raw = buf.as_ref();
if let Err(err) = net_conn.send(raw.as_ref()).await {
log::warn!("[{}] failed to write packets on net_conn: {}", name2, err);
done2.store(true, Ordering::Relaxed)
} else {
bytes_sent.fetch_add(raw.len(), Ordering::SeqCst);
}

// Reuse allocation. Have to use options, since spawn blocking can't borrow, has to take owernship.
buf.clear();
buffer = Some(buf);
},
Err(err) => {
log::warn!("[{}] failed to serialize a packet: {:?}", name2, err);
}
}
//log::debug!("[{}] sending {} bytes done", name, raw.len());
}
sem.add_permits(1);
});
//log::debug!("[{}] sending {} bytes done", name, raw.len());
}

if !continue_loop {
break;
Expand Down

0 comments on commit be89907

Please sign in to comment.