Skip to content

Commit

Permalink
Use sendmmsg to reduce system call overhead
Browse files Browse the repository at this point in the history
  • Loading branch information
Ralith committed Nov 18, 2019
1 parent da08f06 commit 623c73d
Show file tree
Hide file tree
Showing 6 changed files with 127 additions and 84 deletions.
41 changes: 21 additions & 20 deletions quinn/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,35 +252,36 @@ impl EndpointInner {
}

fn drive_send(&mut self, cx: &mut Context) -> Result<bool, io::Error> {
let mut sent = 0;
while let Some(t) = self
.outgoing
.pop_front()
.or_else(|| self.inner.poll_transmit())
{
match self
.socket
.poll_send(cx, &t.destination, t.ecn, &t.contents)
{
Poll::Ready(Ok(_)) => {}
let mut calls = 0;
loop {
while self.outgoing.len() < crate::udp::BATCH_SIZE {
match self.inner.poll_transmit() {
Some(x) => self.outgoing.push_back(x),
None => break,
}
}
if self.outgoing.is_empty() {
return Ok(false);
}
match self.socket.poll_send(cx, self.outgoing.as_slices().0) {
Poll::Ready(Ok(n)) => {
self.outgoing.drain(..n);
calls += 1;
if calls == IO_LOOP_BOUND {
return Ok(true);
}
}
Poll::Pending => {
self.outgoing.push_front(t);
break;
return Ok(false);
}
Poll::Ready(Err(ref e)) if e.kind() == io::ErrorKind::PermissionDenied => {
self.outgoing.push_front(t);
break;
return Ok(false);
}
Poll::Ready(Err(e)) => {
return Err(e);
}
}
sent += 1;
if sent == IO_LOOP_BOUND {
return Ok(true);
}
}
Ok(false)
}

fn handle_events(&mut self, cx: &mut Context) {
Expand Down
1 change: 1 addition & 0 deletions quinn/src/platform/cmsg.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::{mem, ptr};

#[derive(Copy, Clone)]
#[repr(align(8))] // Conservative bound for align_of<cmsghdr>
pub struct Aligned<T>(pub T);

Expand Down
28 changes: 20 additions & 8 deletions quinn/src/platform/fallback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,32 @@ use std::{io, net::SocketAddr};

use mio::net::UdpSocket;

use proto::EcnCodepoint;
use proto::{EcnCodepoint, Transmit};

impl super::UdpExt for UdpSocket {
fn init_ext(&self) -> io::Result<()> {
Ok(())
}

fn send_ext(
&self,
remote: &SocketAddr,
_: Option<EcnCodepoint>,
msg: &[u8],
) -> io::Result<usize> {
self.send_to(msg, remote)
fn send_ext(&self, transmits: &[Transmit]) -> io::Result<usize> {
let mut sent = 0;
for transmit in transmits {
match self.send_to(&transmit.contents, &transmit.destination) {
Ok(_) => {
sent += 1;
}
Err(_) if sent != 0 => {
// We need to report that some packets were sent in this case, so we rely on
// errors being either harmlessly transient (in the case of WouldBlock) or
// recurring on the next call.
return Ok(sent);
}
Err(e) => {
return Err(e);
}
}
}
Ok(sent)
}

fn recv_ext(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr, Option<EcnCodepoint>)> {
Expand Down
9 changes: 2 additions & 7 deletions quinn/src/platform/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//! Uniform interface to send/recv UDP packets with ECN information.
use proto::EcnCodepoint;
use proto::{EcnCodepoint, Transmit};
use std::{io, net::SocketAddr};

#[cfg(unix)]
Expand All @@ -13,11 +13,6 @@ mod fallback;

pub trait UdpExt {
fn init_ext(&self) -> io::Result<()>;
fn send_ext(
&self,
remote: &SocketAddr,
ecn: Option<EcnCodepoint>,
msg: &[u8],
) -> io::Result<usize>;
fn send_ext(&self, transmits: &[Transmit]) -> io::Result<usize>;
fn recv_ext(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr, Option<EcnCodepoint>)>;
}
119 changes: 75 additions & 44 deletions quinn/src/platform/unix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::{
};

use mio::net::UdpSocket;
use proto::EcnCodepoint;
use proto::{EcnCodepoint, Transmit};

use super::cmsg;

Expand Down Expand Up @@ -71,51 +71,27 @@ impl super::UdpExt for UdpSocket {
Ok(())
}

fn send_ext(
&self,
remote: &SocketAddr,
ecn: Option<EcnCodepoint>,
msg: &[u8],
) -> io::Result<usize> {
let (name, namelen) = match *remote {
SocketAddr::V4(ref addr) => {
(addr as *const _ as _, mem::size_of::<libc::sockaddr_in>())
}
SocketAddr::V6(ref addr) => {
(addr as *const _ as _, mem::size_of::<libc::sockaddr_in6>())
}
};
let ecn = ecn.map_or(0, |x| x as libc::c_int);
let mut iov = libc::iovec {
iov_base: msg.as_ptr() as *const _ as *mut _,
iov_len: msg.len(),
};
let mut hdr: libc::msghdr = unsafe { mem::zeroed() };
hdr.msg_name = name;
hdr.msg_namelen = namelen as _;
hdr.msg_iov = &mut iov;
hdr.msg_iovlen = 1;
hdr.msg_control = ptr::null_mut();
hdr.msg_controllen = 0;
hdr.msg_flags = 0;
// We may never fully initialize this, and it's only written/read via `ptr::write`/syscalls,
// so no `assume_init` call can or should be made.
let mut ctrl = cmsg::Aligned(MaybeUninit::<[u8; CMSG_LEN]>::uninit());
hdr.msg_control = ctrl.0.as_mut_ptr() as _;
hdr.msg_controllen = CMSG_LEN as _;
let is_ipv4 = match remote {
SocketAddr::V4(_) => true,
SocketAddr::V6(ref addr) => addr.ip().segments().starts_with(&[0, 0, 0, 0, 0, 0xffff]),
};
let mut encoder = unsafe { cmsg::Encoder::new(&mut hdr) };
if is_ipv4 {
encoder.push(libc::IPPROTO_IP, libc::IP_TOS, ecn as IpTosTy);
} else {
encoder.push(libc::IPPROTO_IPV6, libc::IPV6_TCLASS, ecn);
#[cfg(not(target_os = "macos"))]
fn send_ext(&self, transmits: &[Transmit]) -> io::Result<usize> {
use crate::udp::BATCH_SIZE;
let mut msgs: [libc::mmsghdr; BATCH_SIZE] = unsafe { mem::zeroed() };
let mut iovecs: [libc::iovec; BATCH_SIZE] = unsafe { mem::zeroed() };
let mut cmsgs = [cmsg::Aligned(MaybeUninit::uninit()); BATCH_SIZE];
for (hdr, (iov, (ctrl, transmit))) in msgs
.iter_mut()
.zip(iovecs.iter_mut().zip(cmsgs.iter_mut().zip(transmits)))
{
prepare_msg(transmit, &mut hdr.msg_hdr, iov, ctrl);
}
encoder.finish();
loop {
let n = unsafe { libc::sendmsg(self.as_raw_fd(), &hdr, 0) };
let n = unsafe {
libc::sendmmsg(
self.as_raw_fd(),
msgs.as_mut_ptr(),
transmits.len().min(crate::udp::BATCH_SIZE) as _,
0,
)
};
if n == -1 {
let e = io::Error::last_os_error();
if e.kind() == io::ErrorKind::Interrupted {
Expand All @@ -127,6 +103,31 @@ impl super::UdpExt for UdpSocket {
}
}

#[cfg(target_os = "macos")]
fn send_ext(&self, transmits: &[Transmit]) -> io::Result<usize> {
let mut hdr: libc::msghdr = unsafe { mem::zeroed() };
let mut iov: libc::iovec = unsafe { mem::zeroed() };
let mut ctrl = cmsg::Aligned(MaybeUninit::uninit());
let mut sent = 0;
while sent < transmits.len() {
prepare_msg(&transmits[sent], &mut hdr, &mut iov, &mut ctrl);
let n = unsafe { libc::sendmsg(self.as_raw_fd(), &hdr, 0) };
if n == -1 {
let e = io::Error::last_os_error();
if e.kind() == io::ErrorKind::Interrupted {
continue;
}
if sent != 0 {
return Ok(sent);
}
return Err(e);
} else {
sent += 1;
}
}
Ok(sent)
}

fn recv_ext(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr, Option<EcnCodepoint>)> {
let mut name = MaybeUninit::<libc::sockaddr_storage>::uninit();
let mut iov = libc::iovec {
Expand Down Expand Up @@ -186,3 +187,33 @@ impl super::UdpExt for UdpSocket {
}

const CMSG_LEN: usize = 24;

fn prepare_msg(
transmit: &Transmit,
hdr: &mut libc::msghdr,
iov: &mut libc::iovec,
ctrl: &mut cmsg::Aligned<MaybeUninit<[u8; CMSG_LEN]>>,
) {
iov.iov_base = transmit.contents.as_ptr() as *const _ as *mut _;
iov.iov_len = transmit.contents.len();

let (name, namelen) = match transmit.destination {
SocketAddr::V4(ref addr) => (addr as *const _ as _, mem::size_of::<libc::sockaddr_in>()),
SocketAddr::V6(ref addr) => (addr as *const _ as _, mem::size_of::<libc::sockaddr_in6>()),
};
hdr.msg_name = name;
hdr.msg_namelen = namelen as _;
hdr.msg_iov = iov;
hdr.msg_iovlen = 1;

hdr.msg_control = ctrl.0.as_mut_ptr() as _;
hdr.msg_controllen = CMSG_LEN as _;
let mut encoder = unsafe { cmsg::Encoder::new(hdr) };
let ecn = transmit.ecn.map_or(0, |x| x as libc::c_int);
if transmit.destination.is_ipv4() {
encoder.push(libc::IPPROTO_IP, libc::IP_TOS, ecn as IpTosTy);
} else {
encoder.push(libc::IPPROTO_IPV6, libc::IPV6_TCLASS, ecn);
}
encoder.finish();
}
13 changes: 8 additions & 5 deletions quinn/src/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use mio;
use tokio_net::driver::Handle;
use tokio_net::util::PollEvented;

use proto::EcnCodepoint;
use proto::{EcnCodepoint, Transmit};

use crate::platform::UdpExt;

Expand All @@ -32,12 +32,10 @@ impl UdpSocket {
pub fn poll_send(
&self,
cx: &mut Context,
remote: &SocketAddr,
ecn: Option<EcnCodepoint>,
msg: &[u8],
transmits: &[Transmit],
) -> Poll<Result<usize, io::Error>> {
ready!(self.io.poll_write_ready(cx))?;
match self.io.get_ref().send_ext(remote, ecn, msg) {
match self.io.get_ref().send_ext(transmits) {
Ok(n) => Poll::Ready(Ok(n)),
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
self.io.clear_write_ready(cx)?;
Expand Down Expand Up @@ -67,3 +65,8 @@ impl UdpSocket {
self.io.get_ref().local_addr()
}
}

/// Number of UDP packets to send at a time
///
/// Chosen somewhat arbitrarily; might benefit from additional tuning.
pub const BATCH_SIZE: usize = 32;

0 comments on commit 623c73d

Please sign in to comment.