From c17ab2c00922d1e17fe2f625608390d1fb972f36 Mon Sep 17 00:00:00 2001 From: Vita Batrla Date: Tue, 21 Jan 2020 19:17:43 +0100 Subject: [PATCH 01/10] Port MIO library to Solaris OS Solaris OS (unlike its fork Illumos) has no epoll(7) interface. The port of MIO library to Solaris OS adds support of standard POSIX poll(2) interface and implements edge-triggered semantic in a user-space. --- src/io_source.rs | 135 ++++++----- src/lib.rs | 1 + src/macros/mod.rs | 71 ++++++ src/net/uds/listener.rs | 2 +- src/poll.rs | 43 ++-- src/sys/unix/mod.rs | 120 ++++++++-- src/sys/unix/selector/mod.rs | 35 +-- src/sys/unix/selector/poll.rs | 410 ++++++++++++++++++++++++++++++++++ src/sys/unix/waker.rs | 49 ++-- tests/tcp.rs | 28 +-- tests/tcp_close.rs | 41 ++++ tests/tcp_stream.rs | 35 ++- tests/unix_datagram.rs | 4 +- tests/unix_stream.rs | 12 +- tests/util/mod.rs | 58 ++++- 15 files changed, 867 insertions(+), 177 deletions(-) create mode 100644 src/sys/unix/selector/poll.rs create mode 100644 tests/tcp_close.rs diff --git a/src/io_source.rs b/src/io_source.rs index 6939c0d03..f69883dec 100644 --- a/src/io_source.rs +++ b/src/io_source.rs @@ -129,72 +129,91 @@ impl DerefMut for IoSource { } } -#[cfg(unix)] -impl event::Source for IoSource -where - T: AsRawFd, -{ - fn register( - &mut self, - registry: &Registry, - token: Token, - interests: Interest, - ) -> io::Result<()> { - #[cfg(debug_assertions)] - self.selector_id.associate(registry)?; - poll::selector(registry).register(self.inner.as_raw_fd(), token, interests) - } +cfg_epoll_or_kqueue! { + impl event::Source for IoSource + where + T: AsRawFd, + { + fn register( + &mut self, + registry: &Registry, + token: Token, + interests: Interest, + ) -> io::Result<()> { + #[cfg(debug_assertions)] + self.selector_id.associate(registry)?; + poll::selector(registry).register(self.inner.as_raw_fd(), token, + interests) + } - fn reregister( - &mut self, - registry: &Registry, - token: Token, - interests: Interest, - ) -> io::Result<()> { - #[cfg(debug_assertions)] - self.selector_id.check_association(registry)?; - poll::selector(registry).reregister(self.inner.as_raw_fd(), token, interests) - } + fn reregister( + &mut self, + registry: &Registry, + token: Token, + interests: Interest, + ) -> io::Result<()> { + #[cfg(debug_assertions)] + self.selector_id.check_association(registry)?; + poll::selector(registry).reregister(self.inner.as_raw_fd(), token, + interests) + } - fn deregister(&mut self, registry: &Registry) -> io::Result<()> { - #[cfg(debug_assertions)] - self.selector_id.remove_association(registry)?; - poll::selector(registry).deregister(self.inner.as_raw_fd()) + fn deregister(&mut self, registry: &Registry) -> io::Result<()> { + #[cfg(debug_assertions)] + self.selector_id.remove_association(registry)?; + poll::selector(registry).deregister(self.inner.as_raw_fd()) + } } } +cfg_neither_epoll_nor_kqueue! { + #[cfg(not(windows))] + pub trait IoSourceTrait = AsRawFd; + #[cfg(windows)] + pub trait IoSourceTrait = AsRawSocket; + + impl event::Source for IoSource + where + T: IoSourceTrait, + { + fn register( + &mut self, + registry: &Registry, + token: Token, + interests: Interest, + ) -> io::Result<()> { + #[cfg(debug_assertions)] + self.selector_id.associate(registry)?; #[cfg(windows)] -impl event::Source for IoSource -where - T: AsRawSocket, -{ - fn register( - &mut self, - registry: &Registry, - token: Token, - interests: Interest, - ) -> io::Result<()> { - #[cfg(debug_assertions)] - self.selector_id.associate(registry)?; - self.state - .register(registry, token, interests, self.inner.as_raw_socket()) - } + { + self.state + .register(registry, token, interests, + self.inner.as_raw_socket()) + } +#[cfg(not(windows))] + { + self.state + .register(registry, token, interests, + self.inner.as_raw_fd()) + } + } - fn reregister( - &mut self, - registry: &Registry, - token: Token, - interests: Interest, - ) -> io::Result<()> { - #[cfg(debug_assertions)] - self.selector_id.check_association(registry)?; - self.state.reregister(registry, token, interests) - } + fn reregister( + &mut self, + registry: &Registry, + token: Token, + interests: Interest, + ) -> io::Result<()> { + #[cfg(debug_assertions)] + self.selector_id.check_association(registry)?; + self.state.reregister(registry, token, interests) + } - fn deregister(&mut self, _registry: &Registry) -> io::Result<()> { - #[cfg(debug_assertions)] - self.selector_id.remove_association(_registry)?; - self.state.deregister() + fn deregister(&mut self, _registry: &Registry) -> io::Result<()> { + #[cfg(debug_assertions)] + self.selector_id.remove_association(_registry)?; + self.state.deregister() + } } } diff --git a/src/lib.rs b/src/lib.rs index 1cc198f2e..6297d71b6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,4 @@ +#![feature(trait_alias)] #![doc(html_root_url = "https://docs.rs/mio/0.7.0-alpha.1")] #![deny( missing_docs, diff --git a/src/macros/mod.rs b/src/macros/mod.rs index 7db25795d..44989e358 100644 --- a/src/macros/mod.rs +++ b/src/macros/mod.rs @@ -105,3 +105,74 @@ macro_rules! cfg_any_os_util { )* } } + +/// OS supports epoll(7) interface +macro_rules! cfg_epoll { + ($($item:item)*) => { + $( + #[cfg(any( + target_os = "linux", + target_os = "android", + target_os = "illumos" + ))] + $item + )* + } +} + +/// OS supports kqueue(2) interface +macro_rules! cfg_kqueue { + ($($item:item)*) => { + $( + #[cfg(any( + target_os = "dragonfly", + target_os = "freebsd", + target_os = "ios", + target_os = "macos", + target_os = "netbsd", + target_os = "openbsd" + ))] + $item + )* + } +} + +/// OS supports either epoll(7) or kqueue(2) interface +macro_rules! cfg_epoll_or_kqueue { + ($($item:item)*) => { + $( + #[cfg(any( + target_os = "dragonfly", + target_os = "freebsd", + target_os = "ios", + target_os = "macos", + target_os = "netbsd", + target_os = "openbsd", + target_os = "linux", + target_os = "android", + target_os = "illumos" + ))] + $item + )* + } +} + +/// OS neither supports epoll(7) nor kqueue(2) interfaces +macro_rules! cfg_neither_epoll_nor_kqueue { + ($($item:item)*) => { + $( + #[cfg(not(any( + target_os = "dragonfly", + target_os = "freebsd", + target_os = "ios", + target_os = "macos", + target_os = "netbsd", + target_os = "openbsd", + target_os = "linux", + target_os = "android", + target_os = "illumos" + )))] + $item + )* + } +} diff --git a/src/net/uds/listener.rs b/src/net/uds/listener.rs index 37e8106d8..d9b515384 100644 --- a/src/net/uds/listener.rs +++ b/src/net/uds/listener.rs @@ -35,7 +35,7 @@ impl UnixListener { /// The call is responsible for ensuring that the listening socket is in /// non-blocking mode. pub fn accept(&self) -> io::Result<(UnixStream, SocketAddr)> { - sys::uds::listener::accept(&self.inner) + self.inner.do_io(|inner| sys::uds::listener::accept(inner)) } /// Returns the local socket address of this listener. diff --git a/src/poll.rs b/src/poll.rs index 16528bd7b..0f052d488 100644 --- a/src/poll.rs +++ b/src/poll.rs @@ -154,19 +154,22 @@ use std::{fmt, io}; /// # Implementation notes /// /// `Poll` is backed by the selector provided by the operating system. -/// -/// | OS | Selector | -/// |---------------|-----------| -/// | Android | [epoll] | -/// | DragonFly BSD | [kqueue] | -/// | FreeBSD | [kqueue] | -/// | Linux | [epoll] | -/// | NetBSD | [kqueue] | -/// | OpenBSD | [kqueue] | -/// | Solaris | [epoll] | -/// | Windows | [IOCP] | -/// | iOS | [kqueue] | -/// | macOS | [kqueue] | +/// On Unix like systems it's either [epoll](4) or [kqueue](2) if the system +/// supports it and falls back to use [poll](2) system call otherwise. +/// On Windows the backend is I/O completion ports. +/// +/// | OS | Selector | +/// |---------------|--------------| +/// | Android | [epoll] | +/// | DragonFly BSD | [kqueue] | +/// | FreeBSD | [kqueue] | +/// | Linux | [epoll] | +/// | NetBSD | [kqueue] | +/// | OpenBSD | [kqueue] | +/// | Windows | [IOCP] | +/// | iOS | [kqueue] | +/// | macOS | [kqueue] | +/// | other Unix | [poll] | /// /// On all supported platforms, socket operations are handled by using the /// system selector. Platform specific extensions (e.g. [`SourceFd`]) allow @@ -174,16 +177,22 @@ use std::{fmt, io}; /// example, Linux's [`signalfd`] feature can be used by registering the FD with /// `Poll` via [`SourceFd`]. /// -/// On all platforms except windows, a call to [`Poll::poll`] is mostly just a -/// direct call to the system selector. However, [IOCP] uses a completion model -/// instead of a readiness model. In this case, `Poll` must adapt the completion -/// model Mio's API. While non-trivial, the bridge layer is still quite +/// On all platforms using [epoll](4) and [kqueue](2) interfaces, a call to +/// [`Poll::poll`] is mostly just a direct call to the system selector. +/// However, [IOCP] uses a completion model instead of a readiness model. +/// In this case, `Poll` must adapt the completion +/// model Mio's of API. While non-trivial, the bridge layer is still quite /// efficient. The most expensive part being calls to `read` and `write` require /// data to be copied into an intermediate buffer before it is passed to the /// kernel. /// +/// In case of [poll](2) backend, `Poll` has to emulate edge triggered model of +/// Mio's API using level triggered model of [poll](2) system call. The work, +/// that done by [epoll](4) in the kernel is shifted to user space. +/// /// [epoll]: http://man7.org/linux/man-pages/man7/epoll.7.html /// [kqueue]: https://www.freebsd.org/cgi/man.cgi?query=kqueue&sektion=2 +/// [poll]: https://docs.oracle.com/cd/E36784_01/html/E36872/poll-2.html /// [IOCP]: https://msdn.microsoft.com/en-us/library/windows/desktop/aa365198(v=vs.85).aspx /// [`signalfd`]: http://man7.org/linux/man-pages/man2/signalfd.2.html /// [`SourceFd`]: unix/struct.SourceFd.html diff --git a/src/sys/unix/mod.rs b/src/sys/unix/mod.rs index 96d7f4dc2..dfb8e415d 100644 --- a/src/sys/unix/mod.rs +++ b/src/sys/unix/mod.rs @@ -38,24 +38,116 @@ cfg_os_poll! { pub use self::uds::SocketAddr; } - cfg_net! { - use std::io; + cfg_epoll_or_kqueue! { + cfg_net! { + use std::io; - // Both `kqueue` and `epoll` don't need to hold any user space state. - pub(crate) struct IoSourceState; + // Both `kqueue` and `epoll` don't need to hold any user space state. + pub(crate) struct IoSourceState; - impl IoSourceState { - pub fn new() -> IoSourceState { - IoSourceState + impl IoSourceState { + pub fn new() -> IoSourceState { + IoSourceState + } + + pub fn do_io(&self, f: F, io: &T) -> io::Result + where + F: FnOnce(&T) -> io::Result, + { + // We don't hold state, so we can just call the function and + // return. + f(io) + } + } + } + } + + cfg_neither_epoll_nor_kqueue! { + cfg_net! { + use std::io; + use crate::{poll, Interest, Registry, Token}; + use std::os::unix::io::RawFd; + + #[derive(Debug)] + struct InternalState { + selector: Selector, + token: Token, + interests: Interest, + socket: RawFd, } - pub fn do_io(&self, f: F, io: &T) -> io::Result - where - F: FnOnce(&T) -> io::Result, - { - // We don't hold state, so we can just call the function and - // return. - f(io) + pub struct IoSourceState { + inner: Option>, + } + + impl IoSourceState { + pub fn new() -> IoSourceState { + IoSourceState { inner: None } + } + + pub fn do_io(&self, f: F, io: &T) -> io::Result + where + F: FnOnce(&T) -> io::Result, + { + let result = f(io); + self.inner.as_ref().map_or(Ok(()), |state| { + state + .selector + .rearm(state.socket, state.interests) + })?; + result + } + + pub fn register(&mut self, + registry: &Registry, + token: Token, + interests: Interest, + socket: RawFd + ) -> io::Result<()> { + if self.inner.is_some() { + Err(io::ErrorKind::AlreadyExists.into()) + } else { + let selector = poll::selector(registry); + selector + .register(socket, token, interests) + .map(|_state| { + self.inner = Some(Box::new(InternalState { + selector: selector.try_clone().unwrap(), + socket, token, interests + })); + }) + } + } + + pub fn reregister( + &mut self, + _registry: &Registry, + token: Token, + interests: Interest, + ) -> io::Result<()> { + match self.inner.as_mut() { + Some(state) => { + state.selector + .reregister(state.socket, token, interests) + .map(|()| { + state.token = token; + state.interests = interests; + }) + } + None => Err(io::ErrorKind::NotFound.into()), + } + } + + pub fn deregister(&mut self) -> io::Result<()> { + match self.inner.as_mut() { + Some(state) => { + state.selector.deregister(state.socket).unwrap(); + self.inner = None; + Ok(()) + } + None => Err(io::ErrorKind::NotFound.into()), + } + } } } } diff --git a/src/sys/unix/selector/mod.rs b/src/sys/unix/selector/mod.rs index 82475c897..afcce572d 100644 --- a/src/sys/unix/selector/mod.rs +++ b/src/sys/unix/selector/mod.rs @@ -1,25 +1,14 @@ -#[cfg(any(target_os = "linux", target_os = "android", target_os = "solaris"))] -mod epoll; +cfg_epoll! { + mod epoll; + pub(crate) use self::epoll::{event, Event, Events, Selector}; +} -#[cfg(any(target_os = "linux", target_os = "android", target_os = "solaris"))] -pub(crate) use self::epoll::{event, Event, Events, Selector}; +cfg_kqueue! { + mod kqueue; + pub(crate) use self::kqueue::{event, Event, Events, Selector}; +} -#[cfg(any( - target_os = "dragonfly", - target_os = "freebsd", - target_os = "ios", - target_os = "macos", - target_os = "netbsd", - target_os = "openbsd" -))] -mod kqueue; - -#[cfg(any( - target_os = "dragonfly", - target_os = "freebsd", - target_os = "ios", - target_os = "macos", - target_os = "netbsd", - target_os = "openbsd" -))] -pub(crate) use self::kqueue::{event, Event, Events, Selector}; +cfg_neither_epoll_nor_kqueue! { + mod poll; + pub(crate) use self::poll::{event, Event, Events, Selector}; +} diff --git a/src/sys/unix/selector/poll.rs b/src/sys/unix/selector/poll.rs new file mode 100644 index 000000000..28d3a913d --- /dev/null +++ b/src/sys/unix/selector/poll.rs @@ -0,0 +1,410 @@ +use std::fmt; +use std::os::raw::c_void; +use std::os::unix::io::{AsRawFd, RawFd}; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Arc, Mutex}; +use std::time::Duration; +use std::{cmp, io}; + +use crate::interest::Interest; +use crate::sys::unix::waker::pipe::Waker; +use crate::token::Token; + +use libc::{c_int, c_short, nfds_t}; +use libc::{POLLIN, POLLNVAL, POLLOUT, POLLPRI, POLLRDBAND, POLLRDNORM, POLLWRBAND, POLLWRNORM}; + +static NEXT_ID: AtomicUsize = AtomicUsize::new(1); + +#[derive(Debug)] +enum Task { + Add { + fd: RawFd, + token: Token, + interests: Interest, + is_waker: bool, + }, + Update { + fd: RawFd, + token: Token, + interests: Interest, + }, + Rearm { + fd: RawFd, + interests: Interest, + }, + Del { + fd: RawFd, + }, +} + +impl Task { + fn apply(&self, poll: &mut PollImpl) { + match self { + Task::Add { + fd, + token, + interests, + is_waker, + } => { + poll.fdarr.push(libc::pollfd { + fd: *fd, + events: interests_to_poll(*interests), + revents: 0, + }); + poll.fdmeta.push(FdMetadata { + token: usize::from(*token), + is_waker: *is_waker, + }); + // Store index for quick lookup. + if (poll.fdhash.len() as i32) <= *fd { + poll.fdhash.resize((*fd as usize) + 1, None); + } + poll.fdhash[*fd as usize] = Some((poll.fdarr.len() as usize) - 1); + } + Task::Del { fd } => { + let pos = poll.fdhash[*fd as usize].unwrap(); + poll.fdarr.remove(pos); + poll.fdmeta.remove(pos); + // Re-calculate hashed indexes after arrays shrunk down by 1. + poll.fdhash.iter_mut().for_each(|opt| match opt.as_mut() { + Some(ref mut x) if **x > pos => **x -= 1, + Some(ref mut x) if **x == pos => *opt = None, + _ => (), + }); + } + Task::Update { + fd, + token, + interests, + } => { + let pos = poll.fdhash[*fd as usize].unwrap(); + poll.fdarr[pos].events = interests_to_poll(*interests); + poll.fdmeta[pos].token = usize::from(*token); + } + Task::Rearm { fd, interests } => { + let pos = poll.fdhash[*fd as usize].unwrap(); + poll.fdarr[pos].events = interests_to_poll(*interests); + } + } + } +} + +struct FdMetadata { + token: usize, + is_waker: bool, +} + +struct PollImpl { + fdarr: Box>, + fdmeta: Box>, + fdhash: Box>>, +} + +impl fmt::Debug for PollImpl { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "PollImpl {{\n")?; + for (fd, meta) in self.fdarr.iter().zip(self.fdmeta.iter()) { + write!( + f, + "fd={:?} token={:?} events={:?} revents={:?}\n", + fd.fd, meta.token, fd.events, fd.revents + )?; + } + write!(f, "fdhash {{\n")?; + for (i, h) in self.fdhash.iter().enumerate() { + if let Some(x) = h { + write!(f, "{} => {}\n", i, x)? + } + } + write!(f, "}}\n")?; + write!(f, "}}\n")?; + Ok(()) + } +} + +#[derive(Debug)] +struct SelectorImpl { + tasks: Mutex>, + poll: Mutex, + waker: Mutex>>, + as_fd: Mutex, +} + +#[derive(Debug)] +pub struct Selector { + #[cfg(debug_assertions)] + id: usize, + state: Arc, +} + +impl Selector { + pub fn new() -> io::Result { + let state = Arc::new(SelectorImpl { + tasks: Mutex::new(vec![]), + poll: Mutex::new(PollImpl { + fdarr: Box::new(vec![]), + fdmeta: Box::new(vec![]), + fdhash: Box::new(vec![]), + }), + waker: Mutex::new(None), + as_fd: Mutex::new(-1), + }); + let id = NEXT_ID.fetch_add(1, Ordering::Relaxed); + let sel = Selector { id, state }; + sel.add_waker()?; + Ok(sel) + } + + fn add_waker(&self) -> io::Result<()> { + let poll = self.get_impl(); + let mut guard = poll.waker.lock().unwrap(); + *guard = Some(Box::new(Waker::new(self, Token(0))?)); + Ok(()) + } + + fn get_impl(&self) -> &SelectorImpl { + &*self.state + } + + pub fn try_clone(&self) -> io::Result { + let state = Arc::clone(&self.state); + Ok(Selector { id: self.id, state }) + } + + pub fn select(&self, events: &mut Events, timeout: Option) -> io::Result<()> { + let me = self.get_impl(); + let mut poll = me.poll.lock().unwrap(); + loop { + { + // process pending tasks to (re|de)-register IO sources + let mut tasks = me.tasks.lock().unwrap(); + for task in tasks.iter() { + task.apply(&mut *poll); + } + tasks.clear(); + } + + events.clear(); + let timeout = timeout + .map(|to| cmp::min(to.as_millis(), c_int::max_value() as u128) as c_int) + .unwrap_or(-1); + let poll_rv = unsafe { + let rv = libc::poll(poll.fdarr.as_mut_ptr(), poll.fdarr.len() as nfds_t, timeout); + if rv < 0 { + return Err(std::io::Error::last_os_error()); + } + rv + }; + + for i in 0..poll.fdarr.len() { + let revents = poll.fdarr[i].revents; + let token = poll.fdmeta[i].token; + + // Skip over internal waker at index 0 + if i > 0 && revents != 0 && revents & !POLLNVAL != 0 { + events.push(Event { + data: token, + events: revents, + }); + } + if revents & POLLNVAL != 0 { + // This FD died. Someone closed this FD before + // deregistering first or there was a race condition + // between poll and deregister. Set it to -1 to let + // poll know it should be ignored. + poll.fdarr[i].fd = -1; + } + // Emulate edge triggered events + if !poll.fdmeta[i].is_waker { + poll.fdarr[i].events &= !(revents | POLLNVAL); + } + // Empty waker's queue + if revents != 0 && poll.fdmeta[i].is_waker { + let mut buf: [u8; 8] = [0; 8]; + unsafe { + libc::read(poll.fdarr[i].fd, buf.as_mut_ptr() as *mut c_void, 8); + } + } + } + if events.len() > 0 || poll_rv == 0 { + // Something is ready or poll(2) timed out + break; + } + } + return Ok(()); + } + + fn add_task(&self, task: Task) { + let me = self.get_impl(); + if let Ok(mut poll) = me.poll.try_lock() { + // poll not running, apply task directly if nothing sits in queue + let mut tasks = self.get_impl().tasks.lock().unwrap(); + match tasks.len() { + 0 => task.apply(&mut poll), + _ => tasks.push(task), + } + } else { + // poll is running, queue task and wake up poll thread + if let Some(ref waker) = me.waker.lock().unwrap().as_ref() { + self.get_impl().tasks.lock().unwrap().push(task); + waker.wake().unwrap(); + } + } + } + + pub fn register(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()> { + self.add_task(Task::Add { + fd, + token, + interests, + is_waker: false, + }); + Ok(()) + } + + pub(crate) fn register_pipe_waker( + &self, + fd: RawFd, + token: Token, + interests: Interest, + ) -> io::Result<()> { + self.add_task(Task::Add { + fd, + token, + interests, + is_waker: true, + }); + let mut as_fd = self.get_impl().as_fd.lock().unwrap(); + if *as_fd == -1 { + *as_fd = fd + } + Ok(()) + } + + pub fn rearm(&self, fd: RawFd, interests: Interest) -> io::Result<()> { + self.add_task(Task::Rearm { fd, interests }); + Ok(()) + } + + pub fn reregister(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()> { + self.add_task(Task::Update { + fd, + token, + interests, + }); + Ok(()) + } + + pub fn deregister(&self, fd: RawFd) -> io::Result<()> { + self.add_task(Task::Del { fd }); + Ok(()) + } +} + +cfg_net! { + impl Selector { + #[cfg(debug_assertions)] + pub fn id(&self) -> usize { + self.id + } + } +} + +impl AsRawFd for Selector { + fn as_raw_fd(&self) -> RawFd { + *self.get_impl().as_fd.lock().unwrap() + } +} + +fn interests_to_poll(interests: Interest) -> c_short { + let mut kind = 0; + + if interests.is_readable() { + kind |= POLLIN | POLLPRI | POLLRDBAND | POLLRDNORM; + } + if interests.is_writable() { + kind |= POLLOUT | POLLWRNORM | POLLWRBAND; + } + kind +} + +pub struct Event { + data: usize, + events: c_short, +} + +pub type Events = Vec; + +pub mod event { + use std::fmt; + + use crate::sys::Event; + use crate::Token; + use libc::c_short; + use libc::{POLLERR, POLLHUP, POLLIN, POLLOUT, POLLPRI}; + + pub fn token(event: &Event) -> Token { + Token(event.data) + } + + pub fn is_readable(event: &Event) -> bool { + event.events & (POLLIN | POLLPRI) != 0 + } + + pub fn is_writable(event: &Event) -> bool { + event.events & POLLOUT != 0 + } + + pub fn is_error(event: &Event) -> bool { + event.events & POLLERR != 0 + } + + pub fn is_read_closed(_event: &Event) -> bool { + // Not supported. Use read(2) to detect EOF. + false + } + + pub fn is_write_closed(event: &Event) -> bool { + event.events & POLLHUP != 0 + } + + pub fn is_priority(event: &Event) -> bool { + event.events & POLLPRI != 0 + } + + pub fn is_aio(_: &Event) -> bool { + // Not supported. + false + } + + pub fn is_lio(_: &Event) -> bool { + // Not supported. + false + } + + pub fn debug_details(f: &mut fmt::Formatter<'_>, _event: &Event) -> fmt::Result { + #[allow(clippy::trivially_copy_pass_by_ref)] + fn check_flag(got: &c_short, want: &c_short) -> bool { + (got & want) != 0 + } + + debug_detail!( + FlagsDetails(c_short), + check_flag, + libc::POLLIN, + libc::POLLOUT, + libc::POLLPRI, + libc::POLLRDBAND, + libc::POLLRDNORM, + libc::POLLWRBAND, + libc::POLLWRNORM, + libc::POLLNVAL, + libc::POLLERR + ); + + f.debug_struct("event") + .field("flags", &FlagsDetails(_event.events)) + .field("data", &_event.data) + .finish() + } +} diff --git a/src/sys/unix/waker.rs b/src/sys/unix/waker.rs index 16707cd19..ff65c9ff5 100644 --- a/src/sys/unix/waker.rs +++ b/src/sys/unix/waker.rs @@ -104,13 +104,14 @@ pub use self::kqueue::Waker; target_os = "openbsd", target_os = "solaris" ))] -mod pipe { +pub(crate) mod pipe { + use crate::sys::unix::Selector; use crate::{Interest, Token}; use std::fs::File; - use std::io::{self, Read, Write}; - use std::os::unix::io::FromRawFd; + use std::io::{self, Write}; + use std::os::unix::io::{FromRawFd, RawFd}; /// Waker backed by a unix pipe. /// @@ -130,9 +131,7 @@ mod pipe { // they're closed when dropped, e.g. when register below fails. let sender = unsafe { File::from_raw_fd(fds[1]) }; let receiver = unsafe { File::from_raw_fd(fds[0]) }; - selector - .register(fds[0], token, Interest::READABLE) - .map(|()| Waker { sender, receiver }) + Self::register_waker(fds[0], selector, token).map(|()| Waker { sender, receiver }) } pub fn wake(&self) -> io::Result<()> { @@ -148,15 +147,37 @@ mod pipe { Err(err) => Err(err), } } + } - /// Empty the pipe's buffer, only need to call this if `wake` fails. - /// This ignores any errors. - fn empty(&self) { - let mut buf = [0; 4096]; - loop { - match (&self.receiver).read(&mut buf) { - Ok(n) if n > 0 => continue, - _ => return, + cfg_neither_epoll_nor_kqueue! { + impl Waker { + // Empty not needed in poll(2) case, emptying is done by selector. + fn empty(&self) {} + fn register_waker(fd: RawFd, selector: &Selector, token: Token) + -> io::Result<()> { + // Register fd using a custom down-call to ensure + // level-triggered semantic required by a waker. + selector.register_pipe_waker(fd, token, Interest::READABLE) + } + } + } + + cfg_epoll_or_kqueue! { + use std::io::Read; + impl Waker { + fn register_waker(fd: RawFd, selector: &Selector, token: Token) + -> io::Result<()> { + selector.register(fd, token, Interest::READABLE) + } + /// Empty the pipe's buffer, only need to call this if `wake` fails. + /// This ignores any errors. + fn empty(&self) { + let mut buf = [0; 4096]; + loop { + match (&self.receiver).read(&mut buf) { + Ok(n) if n > 0 => continue, + _ => return, + } } } } diff --git a/tests/tcp.rs b/tests/tcp.rs index 5c1b7200c..5e68a5225 100644 --- a/tests/tcp.rs +++ b/tests/tcp.rs @@ -374,30 +374,6 @@ fn connect_then_close() { } } -#[test] -fn listen_then_close() { - init(); - - let mut poll = Poll::new().unwrap(); - let mut l = TcpListener::bind("127.0.0.1:0".parse().unwrap()).unwrap(); - - poll.registry() - .register(&mut l, Token(1), Interest::READABLE) - .unwrap(); - drop(l); - - let mut events = Events::with_capacity(128); - - poll.poll(&mut events, Some(Duration::from_millis(100))) - .unwrap(); - - for event in &events { - if event.token() == Token(1) { - panic!("recieved ready() on a closed TcpListener") - } - } -} - #[test] fn bind_twice_bad() { init(); @@ -658,7 +634,8 @@ macro_rules! wait { }}; } -#[test] +test_read_write_closed! { +["event.is_write_closed() not supported"] fn write_shutdown() { init(); @@ -695,6 +672,7 @@ fn write_shutdown() { wait!(poll, is_readable, true); } +} // test_read_write_closed! struct MyHandler { listener: TcpListener, diff --git a/tests/tcp_close.rs b/tests/tcp_close.rs new file mode 100644 index 000000000..000a98def --- /dev/null +++ b/tests/tcp_close.rs @@ -0,0 +1,41 @@ +#![cfg(all(feature = "os-poll", feature = "tcp"))] + +use mio::net::TcpListener; +use mio::{Events, Interest, Poll, Token}; +use std::time::Duration; +mod util; +use util::init; + +// +// There's a race condition if this test run in parallel with other tests. +// Right after fd is closed below, anyone can open a new file/socket and system +// might allocate the same fd for him. In such case test fails because it +// receives an event generated by another test running at the same time since +// it monitors fd owned by someone else. +// +// The test is alone in this source file on purpose to run single threaded and +// avoid above race condition. +// +#[test] +fn listen_then_close() { + init(); + + let mut poll = Poll::new().unwrap(); + let mut l = TcpListener::bind("127.0.0.1:0".parse().unwrap()).unwrap(); + + poll.registry() + .register(&mut l, Token(1), Interest::READABLE) + .unwrap(); + drop(l); + + let mut events = Events::with_capacity(128); + + poll.poll(&mut events, Some(Duration::from_millis(100))) + .unwrap(); + + for event in &events { + if event.token() == Token(1) { + panic!("received ready() on a closed TcpListener") + } + } +} diff --git a/tests/tcp_stream.rs b/tests/tcp_stream.rs index 11d8d841d..b8579f194 100644 --- a/tests/tcp_stream.rs +++ b/tests/tcp_stream.rs @@ -293,7 +293,8 @@ fn shutdown_read() { target_os = "ios", target_os = "macos", target_os = "netbsd", - target_os = "openbsd" + target_os = "openbsd", + target_os = "solaris" ))] { let mut buf = [0; 20]; @@ -381,7 +382,8 @@ fn shutdown_both() { target_os = "ios", target_os = "macos", target_os = "netbsd", - target_os = "openbsd" + target_os = "openbsd", + target_os = "solaris" ))] { let mut buf = [0; 20]; @@ -502,11 +504,8 @@ fn no_events_after_deregister() { thread_handle.join().expect("unable to join thread"); } -#[test] -#[cfg_attr( - windows, - ignore = "fails on Windows; client read closed events are not triggered" -)] +test_shutdown_client! { +["client close events are not found"] fn tcp_shutdown_client_read_close_event() { let (mut poll, mut events) = init_with_poll(); let barrier = Arc::new(Barrier::new(2)); @@ -536,13 +535,10 @@ fn tcp_shutdown_client_read_close_event() { barrier.wait(); handle.join().expect("failed to join thread"); } +} // test_shutdown_client! -#[test] -#[cfg_attr(windows, ignore = "fails; client write_closed events are not found")] -#[cfg_attr( - any(target_os = "linux", target_os = "android", target_os = "solaris"), - ignore = "fails; client write_closed events are not found" -)] +test_shutdown_client! { +["client write closed events are not found"] fn tcp_shutdown_client_write_close_event() { let (mut poll, mut events) = init_with_poll(); let barrier = Arc::new(Barrier::new(2)); @@ -572,8 +568,10 @@ fn tcp_shutdown_client_write_close_event() { barrier.wait(); handle.join().expect("failed to join thread"); } +} // test_shutdown_client! -#[test] +test_read_write_closed! { +["event.is_read_closed() not supported"] fn tcp_shutdown_server_write_close_event() { let (mut poll, mut events) = init_with_poll(); let barrier = Arc::new(Barrier::new(2)); @@ -602,12 +600,10 @@ fn tcp_shutdown_server_write_close_event() { barrier.wait(); handle.join().expect("failed to join thread"); } +} // test_read_write_closed! -#[test] -#[cfg_attr( - windows, - ignore = "fails on Windows; client close events are not found" -)] +test_shutdown_client! { +["client close events are not found"] fn tcp_shutdown_client_both_close_event() { let (mut poll, mut events) = init_with_poll(); let barrier = Arc::new(Barrier::new(2)); @@ -635,6 +631,7 @@ fn tcp_shutdown_client_both_close_event() { barrier.wait(); handle.join().expect("failed to join thread"); } +} // test_shutdown_client! /// Start a listener that accepts `n_connections` connections on the returned /// address. It echos back any data it reads from the connection before diff --git a/tests/unix_datagram.rs b/tests/unix_datagram.rs index 24c05c7ec..61416fcc9 100644 --- a/tests/unix_datagram.rs +++ b/tests/unix_datagram.rs @@ -165,7 +165,8 @@ fn unix_datagram_pair() { assert!(datagram2.take_error().unwrap().is_none()); } -#[test] +test_read_write_closed! { +["event.is_read_closed() not supported"] fn unix_datagram_shutdown() { let (mut poll, mut events) = init_with_poll(); let path1 = temp_file("unix_datagram_shutdown1"); @@ -222,6 +223,7 @@ fn unix_datagram_shutdown() { assert!(datagram1.take_error().unwrap().is_none()); } +} // test_read_write_closed! #[test] fn unix_datagram_register() { diff --git a/tests/unix_stream.rs b/tests/unix_stream.rs index 7159ec828..50d6ce83a 100644 --- a/tests/unix_stream.rs +++ b/tests/unix_stream.rs @@ -143,7 +143,8 @@ fn unix_stream_peer_addr() { handle.join().unwrap(); } -#[test] +test_read_write_closed! { +["event.is_read_closed() not supported"] fn unix_stream_shutdown_read() { let (mut poll, mut events) = init_with_poll(); let (handle, remote_addr) = new_echo_listener(1, "unix_stream_shutdown_read"); @@ -196,6 +197,7 @@ fn unix_stream_shutdown_read() { drop(stream); handle.join().unwrap(); } +} // test_read_write_closed! #[test] fn unix_stream_shutdown_write() { @@ -252,7 +254,8 @@ fn unix_stream_shutdown_write() { handle.join().unwrap(); } -#[test] +test_read_write_closed! { +["event.is_write_closed() not supported"] fn unix_stream_shutdown_both() { let (mut poll, mut events) = init_with_poll(); let (handle, remote_addr) = new_echo_listener(1, "unix_stream_shutdown_both"); @@ -311,8 +314,10 @@ fn unix_stream_shutdown_both() { drop(stream); handle.join().unwrap(); } +} // test_read_write_closed! -#[test] +test_read_write_closed! { +["event.is_read_closed() not supported"] fn unix_stream_shutdown_listener_write() { let (mut poll, mut events) = init_with_poll(); let barrier = Arc::new(Barrier::new(2)); @@ -344,6 +349,7 @@ fn unix_stream_shutdown_listener_write() { barrier.wait(); handle.join().unwrap(); } +} //test_read_write_closed! #[test] fn unix_stream_register() { diff --git a/tests/util/mod.rs b/tests/util/mod.rs index 75d866f02..4a37a8f3c 100644 --- a/tests/util/mod.rs +++ b/tests/util/mod.rs @@ -196,14 +196,23 @@ pub fn assert_would_block(result: io::Result) { } } -/// Assert that `NONBLOCK` is set on `socket`. +/// Assert that socket is non-blocking. #[cfg(unix)] pub fn assert_socket_non_blocking(socket: &S) where S: AsRawFd, { + let expected = if cfg!(target_os = "solaris") { + /* + * On Solaris O_NDELAY and O_NONBLOCK behave the same for sockets. + * For more details, see read(2) man page. + */ + libc::O_NONBLOCK | libc::O_NDELAY + } else { + libc::O_NONBLOCK + }; let flags = unsafe { libc::fcntl(socket.as_raw_fd(), libc::F_GETFL) }; - assert!(flags & libc::O_NONBLOCK != 0, "socket not non-blocking"); + assert!(flags & expected != 0, "socket not non-blocking"); } #[cfg(windows)] @@ -311,3 +320,48 @@ macro_rules! expect_read { assert_eq!(address, source); }}; } + +/// tests READ_CLOSED or WRITE_CLOSED events, not supported using poll(2), +/// but expected to work on Windows +macro_rules! test_read_write_closed { + ([$msg:expr] $($item:item)*) => { + $( + #[test] + #[cfg_attr(not(any( + windows, + target_os = "dragonfly", + target_os = "freebsd", + target_os = "ios", + target_os = "macos", + target_os = "netbsd", + target_os = "openbsd", + target_os = "linux", + target_os = "android", + target_os = "illumos" + )), ignore = $msg)] + $item + )* + } +} + +/// tests READ_CLOSED or WRITE_CLOSED events, after shutdown on client side, +/// neither supported using poll(2) nor Windows +macro_rules! test_shutdown_client { + ([$msg:expr] $($item:item)*) => { + $( + #[test] + #[cfg_attr(not(any( + target_os = "dragonfly", + target_os = "freebsd", + target_os = "ios", + target_os = "macos", + target_os = "netbsd", + target_os = "openbsd", + target_os = "linux", + target_os = "android", + target_os = "illumos" + )), ignore = $msg)] + $item + )* + } +} From b2a1ceb181b6ddd33160973ec50ddf619e2277c4 Mon Sep 17 00:00:00 2001 From: Vita Batrla Date: Wed, 22 Jan 2020 19:25:10 +0100 Subject: [PATCH 02/10] feature(trait_alias) breaks cargo check --- src/io_source.rs | 14 +++++++++----- src/lib.rs | 1 - 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/src/io_source.rs b/src/io_source.rs index f69883dec..097197e64 100644 --- a/src/io_source.rs +++ b/src/io_source.rs @@ -168,13 +168,17 @@ cfg_epoll_or_kqueue! { cfg_neither_epoll_nor_kqueue! { #[cfg(not(windows))] - pub trait IoSourceTrait = AsRawFd; + pub trait AsRawFdOrSocket: AsRawFd {} + #[cfg(not(windows))] + impl AsRawFdOrSocket for T {} + #[cfg(windows)] + pub trait AsRawFdOrSocket: AsRawSocket {} #[cfg(windows)] - pub trait IoSourceTrait = AsRawSocket; + impl AsRawFdOrSocket for T {} impl event::Source for IoSource where - T: IoSourceTrait, + T: AsRawFdOrSocket, { fn register( &mut self, @@ -184,13 +188,13 @@ cfg_neither_epoll_nor_kqueue! { ) -> io::Result<()> { #[cfg(debug_assertions)] self.selector_id.associate(registry)?; -#[cfg(windows)] + #[cfg(windows)] { self.state .register(registry, token, interests, self.inner.as_raw_socket()) } -#[cfg(not(windows))] + #[cfg(not(windows))] { self.state .register(registry, token, interests, diff --git a/src/lib.rs b/src/lib.rs index 6297d71b6..1cc198f2e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,3 @@ -#![feature(trait_alias)] #![doc(html_root_url = "https://docs.rs/mio/0.7.0-alpha.1")] #![deny( missing_docs, From e0201af128e7016c878a55f7c021c68d9fc9b3b0 Mon Sep 17 00:00:00 2001 From: Vita Batrla Date: Wed, 22 Jan 2020 23:04:45 +0100 Subject: [PATCH 03/10] tcp_shutdown_client_write_close_event was accidentaly enabled on Linux --- tests/tcp.rs | 5 ++--- tests/tcp_stream.rs | 12 ++++-------- tests/unix_datagram.rs | 5 ++--- tests/unix_stream.rs | 15 ++++++--------- tests/util/mod.rs | 39 ++++++++++++++++++++++++++++++--------- 5 files changed, 44 insertions(+), 32 deletions(-) diff --git a/tests/tcp.rs b/tests/tcp.rs index 5e68a5225..e87c855da 100644 --- a/tests/tcp.rs +++ b/tests/tcp.rs @@ -634,8 +634,7 @@ macro_rules! wait { }}; } -test_read_write_closed! { -["event.is_write_closed() not supported"] +test_shutdown_server! { fn write_shutdown() { init(); @@ -672,7 +671,7 @@ fn write_shutdown() { wait!(poll, is_readable, true); } -} // test_read_write_closed! +} // test_shutdown_server! struct MyHandler { listener: TcpListener, diff --git a/tests/tcp_stream.rs b/tests/tcp_stream.rs index b8579f194..c60363d12 100644 --- a/tests/tcp_stream.rs +++ b/tests/tcp_stream.rs @@ -505,7 +505,6 @@ fn no_events_after_deregister() { } test_shutdown_client! { -["client close events are not found"] fn tcp_shutdown_client_read_close_event() { let (mut poll, mut events) = init_with_poll(); let barrier = Arc::new(Barrier::new(2)); @@ -537,8 +536,7 @@ fn tcp_shutdown_client_read_close_event() { } } // test_shutdown_client! -test_shutdown_client! { -["client write closed events are not found"] +test_shutdown_client_write! { fn tcp_shutdown_client_write_close_event() { let (mut poll, mut events) = init_with_poll(); let barrier = Arc::new(Barrier::new(2)); @@ -568,10 +566,9 @@ fn tcp_shutdown_client_write_close_event() { barrier.wait(); handle.join().expect("failed to join thread"); } -} // test_shutdown_client! +} // test_shutdown_client_write! -test_read_write_closed! { -["event.is_read_closed() not supported"] +test_shutdown_server! { fn tcp_shutdown_server_write_close_event() { let (mut poll, mut events) = init_with_poll(); let barrier = Arc::new(Barrier::new(2)); @@ -600,10 +597,9 @@ fn tcp_shutdown_server_write_close_event() { barrier.wait(); handle.join().expect("failed to join thread"); } -} // test_read_write_closed! +} // test_shutdown_server! test_shutdown_client! { -["client close events are not found"] fn tcp_shutdown_client_both_close_event() { let (mut poll, mut events) = init_with_poll(); let barrier = Arc::new(Barrier::new(2)); diff --git a/tests/unix_datagram.rs b/tests/unix_datagram.rs index 61416fcc9..b5b58f795 100644 --- a/tests/unix_datagram.rs +++ b/tests/unix_datagram.rs @@ -165,8 +165,7 @@ fn unix_datagram_pair() { assert!(datagram2.take_error().unwrap().is_none()); } -test_read_write_closed! { -["event.is_read_closed() not supported"] +test_shutdown_client! { fn unix_datagram_shutdown() { let (mut poll, mut events) = init_with_poll(); let path1 = temp_file("unix_datagram_shutdown1"); @@ -223,7 +222,7 @@ fn unix_datagram_shutdown() { assert!(datagram1.take_error().unwrap().is_none()); } -} // test_read_write_closed! +} // test_shutdown_client! #[test] fn unix_datagram_register() { diff --git a/tests/unix_stream.rs b/tests/unix_stream.rs index 50d6ce83a..0f0689d38 100644 --- a/tests/unix_stream.rs +++ b/tests/unix_stream.rs @@ -143,8 +143,7 @@ fn unix_stream_peer_addr() { handle.join().unwrap(); } -test_read_write_closed! { -["event.is_read_closed() not supported"] +test_shutdown_client! { fn unix_stream_shutdown_read() { let (mut poll, mut events) = init_with_poll(); let (handle, remote_addr) = new_echo_listener(1, "unix_stream_shutdown_read"); @@ -197,7 +196,7 @@ fn unix_stream_shutdown_read() { drop(stream); handle.join().unwrap(); } -} // test_read_write_closed! +} // test_shutdown_client #[test] fn unix_stream_shutdown_write() { @@ -254,8 +253,7 @@ fn unix_stream_shutdown_write() { handle.join().unwrap(); } -test_read_write_closed! { -["event.is_write_closed() not supported"] +test_shutdown_client! { fn unix_stream_shutdown_both() { let (mut poll, mut events) = init_with_poll(); let (handle, remote_addr) = new_echo_listener(1, "unix_stream_shutdown_both"); @@ -314,10 +312,9 @@ fn unix_stream_shutdown_both() { drop(stream); handle.join().unwrap(); } -} // test_read_write_closed! +} // test_shutdown_client! -test_read_write_closed! { -["event.is_read_closed() not supported"] +test_shutdown_server! { fn unix_stream_shutdown_listener_write() { let (mut poll, mut events) = init_with_poll(); let barrier = Arc::new(Barrier::new(2)); @@ -349,7 +346,7 @@ fn unix_stream_shutdown_listener_write() { barrier.wait(); handle.join().unwrap(); } -} //test_read_write_closed! +} // test_shutdown_server! #[test] fn unix_stream_register() { diff --git a/tests/util/mod.rs b/tests/util/mod.rs index 4a37a8f3c..12810b526 100644 --- a/tests/util/mod.rs +++ b/tests/util/mod.rs @@ -321,10 +321,10 @@ macro_rules! expect_read { }}; } -/// tests READ_CLOSED or WRITE_CLOSED events, not supported using poll(2), -/// but expected to work on Windows -macro_rules! test_read_write_closed { - ([$msg:expr] $($item:item)*) => { +/// tests READ_CLOSED event after shutdown on server side, +/// not supported using poll(2) +macro_rules! test_shutdown_server { + ($($item:item)*) => { $( #[test] #[cfg_attr(not(any( @@ -338,16 +338,17 @@ macro_rules! test_read_write_closed { target_os = "linux", target_os = "android", target_os = "illumos" - )), ignore = $msg)] + )), ignore = "read closed and write closed events not supported")] $item )* } } -/// tests READ_CLOSED or WRITE_CLOSED events, after shutdown on client side, -/// neither supported using poll(2) nor Windows +/// tests READ_CLOSED events after shutdown read on client side, +/// or WRITE_CLOSED events after shutdown both on client side, +/// neither supported using poll(2) nor on Windows macro_rules! test_shutdown_client { - ([$msg:expr] $($item:item)*) => { + ($($item:item)*) => { $( #[test] #[cfg_attr(not(any( @@ -360,7 +361,27 @@ macro_rules! test_shutdown_client { target_os = "linux", target_os = "android", target_os = "illumos" - )), ignore = $msg)] + )), ignore = "client close events are not found")] + $item + )* + } +} + +/// tests WRITE_CLOSED event after shutdown write on client side, +/// neither supported using poll(2) nor Windows nor Linux / Android +macro_rules! test_shutdown_client_write { + ($($item:item)*) => { + $( + #[test] + #[cfg_attr(not(any( + target_os = "dragonfly", + target_os = "freebsd", + target_os = "ios", + target_os = "macos", + target_os = "netbsd", + target_os = "openbsd", + target_os = "illumos" + )), ignore = "client write closed events are not found")] $item )* } From aab91e47ec6cd798ba62591bf3b9aea27c89b270 Mon Sep 17 00:00:00 2001 From: Vita Batrla Date: Wed, 22 Jan 2020 23:43:02 +0100 Subject: [PATCH 04/10] tcp_shutdown_client_write_close_event was accidentaly enabled on Linux (fix rustfmt) --- tests/util/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/util/mod.rs b/tests/util/mod.rs index 12810b526..395f79a93 100644 --- a/tests/util/mod.rs +++ b/tests/util/mod.rs @@ -368,7 +368,7 @@ macro_rules! test_shutdown_client { } /// tests WRITE_CLOSED event after shutdown write on client side, -/// neither supported using poll(2) nor Windows nor Linux / Android +/// neither supported using poll(2) nor Windows nor Linux / Android macro_rules! test_shutdown_client_write { ($($item:item)*) => { $( From b4554e67a1469a1650b10d3b888636939b3ac940 Mon Sep 17 00:00:00 2001 From: Vita Batrla Date: Thu, 23 Jan 2020 11:03:35 +0100 Subject: [PATCH 05/10] test posix poll(2) I/O selector in Azure pipeline --- Cargo.toml | 10 ++- azure-pipelines.yml | 8 +++ ci/azure-test-stable.yml | 15 +++-- src/macros/mod.rs | 128 ++++++++++++++++++++++++++++++--------- src/sys/unix/waker.rs | 23 ++----- tests/util/mod.rs | 80 ++++++++++++++++-------- 6 files changed, 187 insertions(+), 77 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 7dffe0002..d0b1608dc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,6 +26,12 @@ publish = false default = [] # Include implementations of I/O source readiness polling. os-poll = [] +# Enable epoll(7) backend for I/O source polling. +os-epoll = ["os-poll"] +# Enable kqueue(7) backend for I/O source polling. +os-kqueue = ["os-poll"] +# Enable epoll(7) or kqueue(7) backend for I/O source polling. +os-poll-accel = ["os-epoll", "os-kqueue"] # Include adapters for underlying OS I/O sources. # Note: This is currently only supported on Unix and provides `SourceFd` os-util = [] @@ -61,8 +67,8 @@ rustdoc-args = ["--cfg", "docsrs"] [[example]] name = "tcp_server" -required-features = ["os-poll", "tcp"] +required-features = ["os-poll-accel", "tcp"] [[example]] name = "udp_server" -required-features = ["os-poll", "udp"] +required-features = ["os-poll-accel", "udp"] diff --git a/azure-pipelines.yml b/azure-pipelines.yml index e673e28a9..9d4733560 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -21,6 +21,14 @@ jobs: displayName: Test --release cmd: test --release + # Stable --release (posix poll(2)) + - template: ci/azure-test-stable.yml + parameters: + name: stable_release + displayName: Test --release + cmd: test --release + posix_poll: true + # Nightly - template: ci/azure-test-stable.yml parameters: diff --git a/ci/azure-test-stable.yml b/ci/azure-test-stable.yml index 31b92064a..14afb57b4 100644 --- a/ci/azure-test-stable.yml +++ b/ci/azure-test-stable.yml @@ -32,10 +32,17 @@ jobs: cargo hack check --feature-powerset --skip guide displayName: Check feature powerset - - script: cargo ${{ parameters.cmd }} --all-features - displayName: cargo ${{ parameters.cmd }} --all-features - env: - CI: "True" + - ${{ if eq(parameters.posix_poll, false) }}: + - script: cargo ${{ parameters.cmd }} --all-features + displayName: cargo ${{ parameters.cmd }} --all-features + env: + CI: "True" + + - ${{ if eq(parameters.posix_poll, true) }}: + - script: cargo ${{ parameters.cmd }} --features "os-poll os-util tcp udp uds" + displayName: cargo ${{ parameters.cmd }} --features "os-poll os-util tcp udp uds" + env: + CI: "True" - ${{ if eq(parameters.cmd, 'test') }}: - script: cargo doc --no-deps diff --git a/src/macros/mod.rs b/src/macros/mod.rs index 44989e358..3a16db31f 100644 --- a/src/macros/mod.rs +++ b/src/macros/mod.rs @@ -110,10 +110,13 @@ macro_rules! cfg_any_os_util { macro_rules! cfg_epoll { ($($item:item)*) => { $( - #[cfg(any( - target_os = "linux", - target_os = "android", - target_os = "illumos" + #[cfg(all( + any( + target_os = "linux", + target_os = "android", + target_os = "illumos", + ), + feature = "os-epoll", ))] $item )* @@ -124,13 +127,16 @@ macro_rules! cfg_epoll { macro_rules! cfg_kqueue { ($($item:item)*) => { $( - #[cfg(any( - target_os = "dragonfly", - target_os = "freebsd", - target_os = "ios", - target_os = "macos", - target_os = "netbsd", - target_os = "openbsd" + #[cfg(all( + any( + target_os = "dragonfly", + target_os = "freebsd", + target_os = "ios", + target_os = "macos", + target_os = "netbsd", + target_os = "openbsd", + ), + feature = "os-kqueue" ))] $item )* @@ -142,15 +148,25 @@ macro_rules! cfg_epoll_or_kqueue { ($($item:item)*) => { $( #[cfg(any( - target_os = "dragonfly", - target_os = "freebsd", - target_os = "ios", - target_os = "macos", - target_os = "netbsd", - target_os = "openbsd", - target_os = "linux", - target_os = "android", - target_os = "illumos" + all( + any( + target_os = "linux", + target_os = "android", + target_os = "illumos", + ), + feature = "os-epoll", + ), + all( + any( + target_os = "dragonfly", + target_os = "freebsd", + target_os = "ios", + target_os = "macos", + target_os = "netbsd", + target_os = "openbsd", + ), + feature = "os-kqueue" + ) ))] $item )* @@ -162,15 +178,69 @@ macro_rules! cfg_neither_epoll_nor_kqueue { ($($item:item)*) => { $( #[cfg(not(any( - target_os = "dragonfly", - target_os = "freebsd", - target_os = "ios", - target_os = "macos", - target_os = "netbsd", - target_os = "openbsd", - target_os = "linux", - target_os = "android", - target_os = "illumos" + all( + any( + target_os = "linux", + target_os = "android", + target_os = "illumos", + ), + feature = "os-epoll", + ), + all( + any( + target_os = "dragonfly", + target_os = "freebsd", + target_os = "ios", + target_os = "macos", + target_os = "netbsd", + target_os = "openbsd", + ), + feature = "os-kqueue" + ) + )))] + $item + )* + } +} + +/// Enable Waker backed by kqueue(2) interface +macro_rules! cfg_kqueue_waker { + ($($item:item)*) => { + $( + #[cfg(all( + any( + target_os = "freebsd", + target_os = "ios", + target_os = "macos", + ), + feature = "os-kqueue" + ))] + $item + )* + } +} + +/// Enable Waker backed by pipe(2) interface +macro_rules! cfg_pipe_waker { + ($($item:item)*) => { + $( + #[cfg(not(any( + all( + any( + target_os = "linux", + target_os = "android", + target_os = "illumos", + ), + feature = "os-epoll", + ), + all( + any( + target_os = "freebsd", + target_os = "ios", + target_os = "macos", + ), + feature = "os-kqueue" + ) )))] $item )* diff --git a/src/sys/unix/waker.rs b/src/sys/unix/waker.rs index ff65c9ff5..72ed346d9 100644 --- a/src/sys/unix/waker.rs +++ b/src/sys/unix/waker.rs @@ -1,4 +1,4 @@ -#[cfg(any(target_os = "linux", target_os = "android"))] +cfg_epoll! { mod eventfd { use crate::sys::Selector; use crate::{Interest, Token}; @@ -58,10 +58,10 @@ mod eventfd { } } -#[cfg(any(target_os = "linux", target_os = "android"))] pub use self::eventfd::Waker; +} // cfg_epoll! -#[cfg(any(target_os = "freebsd", target_os = "ios", target_os = "macos"))] +cfg_kqueue_waker! { mod kqueue { use crate::sys::Selector; use crate::Token; @@ -95,17 +95,11 @@ mod kqueue { } } -#[cfg(any(target_os = "freebsd", target_os = "ios", target_os = "macos"))] pub use self::kqueue::Waker; +} // cfg_kqueue! -#[cfg(any( - target_os = "dragonfly", - target_os = "netbsd", - target_os = "openbsd", - target_os = "solaris" -))] +cfg_pipe_waker! { pub(crate) mod pipe { - use crate::sys::unix::Selector; use crate::{Interest, Token}; @@ -184,10 +178,5 @@ pub(crate) mod pipe { } } -#[cfg(any( - target_os = "dragonfly", - target_os = "netbsd", - target_os = "openbsd", - target_os = "solaris" -))] pub use self::pipe::Waker; +} // cfg_pipe_waker! diff --git a/tests/util/mod.rs b/tests/util/mod.rs index 395f79a93..372302d1c 100644 --- a/tests/util/mod.rs +++ b/tests/util/mod.rs @@ -329,15 +329,25 @@ macro_rules! test_shutdown_server { #[test] #[cfg_attr(not(any( windows, - target_os = "dragonfly", - target_os = "freebsd", - target_os = "ios", - target_os = "macos", - target_os = "netbsd", - target_os = "openbsd", - target_os = "linux", - target_os = "android", - target_os = "illumos" + all( + any( + target_os = "dragonfly", + target_os = "freebsd", + target_os = "ios", + target_os = "macos", + target_os = "netbsd", + target_os = "openbsd", + ), + feature = "os-kqueue" + ), + all( + any( + target_os = "linux", + target_os = "android", + target_os = "illumos" + ), + feature = "os-epoll", + ) )), ignore = "read closed and write closed events not supported")] $item )* @@ -352,15 +362,25 @@ macro_rules! test_shutdown_client { $( #[test] #[cfg_attr(not(any( - target_os = "dragonfly", - target_os = "freebsd", - target_os = "ios", - target_os = "macos", - target_os = "netbsd", - target_os = "openbsd", - target_os = "linux", - target_os = "android", - target_os = "illumos" + all( + any( + target_os = "dragonfly", + target_os = "freebsd", + target_os = "ios", + target_os = "macos", + target_os = "netbsd", + target_os = "openbsd", + ), + feature = "os-kqueue" + ), + all( + any( + target_os = "linux", + target_os = "android", + target_os = "illumos" + ), + feature = "os-epoll", + ) )), ignore = "client close events are not found")] $item )* @@ -374,13 +394,23 @@ macro_rules! test_shutdown_client_write { $( #[test] #[cfg_attr(not(any( - target_os = "dragonfly", - target_os = "freebsd", - target_os = "ios", - target_os = "macos", - target_os = "netbsd", - target_os = "openbsd", - target_os = "illumos" + all( + any( + target_os = "dragonfly", + target_os = "freebsd", + target_os = "ios", + target_os = "macos", + target_os = "netbsd", + target_os = "openbsd", + ), + feature = "os-kqueue" + ), + all( + any( + target_os = "illumos" + ), + feature = "os-epoll", + ) )), ignore = "client write closed events are not found")] $item )* From 9de98f0e4d92efd0876c3fddd965db65fdb14202 Mon Sep 17 00:00:00 2001 From: Vita Batrla Date: Thu, 23 Jan 2020 12:34:01 +0100 Subject: [PATCH 06/10] test posix poll(2) I/O selector in Azure pipeline (fix duplicate job name) --- azure-pipelines.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/azure-pipelines.yml b/azure-pipelines.yml index 9d4733560..af088caff 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -21,11 +21,11 @@ jobs: displayName: Test --release cmd: test --release - # Stable --release (posix poll(2)) + # Stable --release (POSIX poll(2)) - template: ci/azure-test-stable.yml parameters: - name: stable_release - displayName: Test --release + name: stable_release_posix_poll + displayName: Test --release (using POSIX poll(2) I/O Selector) cmd: test --release posix_poll: true From 03e282a96753e05841adad52cafb6311a9c2c193 Mon Sep 17 00:00:00 2001 From: Vita Batrla Date: Thu, 23 Jan 2020 13:59:42 +0100 Subject: [PATCH 07/10] fix testing individual features and missing pipe2 on MacOS --- Cargo.toml | 3 +++ src/io_source.rs | 4 +++- src/sys/unix/selector/poll.rs | 18 ++++++++++++++++-- src/sys/unix/waker.rs | 16 ++++++++++++++++ tests/registering.rs | 4 +++- tests/size.rs | 32 +++++++++++++++++++++++++------- 6 files changed, 66 insertions(+), 11 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index d0b1608dc..6a5287202 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -48,6 +48,9 @@ guide = [] [dependencies] log = "0.4.8" +[target.'cfg(macos)'.dependencies] +cvt = "0.1" + [target.'cfg(unix)'.dependencies] libc = "0.2.62" diff --git a/src/io_source.rs b/src/io_source.rs index 097197e64..a8bf6f098 100644 --- a/src/io_source.rs +++ b/src/io_source.rs @@ -7,7 +7,7 @@ use std::os::windows::io::AsRawSocket; use std::sync::atomic::{AtomicUsize, Ordering}; use std::{fmt, io}; -#[cfg(any(unix, debug_assertions))] +#[cfg(debug_assertions)] use crate::poll; use crate::sys::IoSourceState; use crate::{event, Interest, Registry, Token}; @@ -130,6 +130,8 @@ impl DerefMut for IoSource { } cfg_epoll_or_kqueue! { + #[cfg(not(debug_assertions))] + use crate::poll; impl event::Source for IoSource where T: AsRawFd, diff --git a/src/sys/unix/selector/poll.rs b/src/sys/unix/selector/poll.rs index 28d3a913d..24634afc3 100644 --- a/src/sys/unix/selector/poll.rs +++ b/src/sys/unix/selector/poll.rs @@ -1,6 +1,7 @@ use std::fmt; use std::os::raw::c_void; use std::os::unix::io::{AsRawFd, RawFd}; +#[cfg(debug_assertions)] use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Mutex}; use std::time::Duration; @@ -13,6 +14,7 @@ use crate::token::Token; use libc::{c_int, c_short, nfds_t}; use libc::{POLLIN, POLLNVAL, POLLOUT, POLLPRI, POLLRDBAND, POLLRDNORM, POLLWRBAND, POLLWRNORM}; +#[cfg(debug_assertions)] static NEXT_ID: AtomicUsize = AtomicUsize::new(1); #[derive(Debug)] @@ -28,6 +30,7 @@ enum Task { token: Token, interests: Interest, }, + #[allow(dead_code)] Rearm { fd: RawFd, interests: Interest, @@ -81,6 +84,7 @@ impl Task { poll.fdarr[pos].events = interests_to_poll(*interests); poll.fdmeta[pos].token = usize::from(*token); } + #[allow(dead_code)] Task::Rearm { fd, interests } => { let pos = poll.fdhash[*fd as usize].unwrap(); poll.fdarr[pos].events = interests_to_poll(*interests); @@ -149,8 +153,13 @@ impl Selector { waker: Mutex::new(None), as_fd: Mutex::new(-1), }); + #[cfg(debug_assertions)] let id = NEXT_ID.fetch_add(1, Ordering::Relaxed); - let sel = Selector { id, state }; + let sel = Selector { + #[cfg(debug_assertions)] + id, + state, + }; sel.add_waker()?; Ok(sel) } @@ -168,7 +177,11 @@ impl Selector { pub fn try_clone(&self) -> io::Result { let state = Arc::clone(&self.state); - Ok(Selector { id: self.id, state }) + Ok(Selector { + #[cfg(debug_assertions)] + id: self.id, + state, + }) } pub fn select(&self, events: &mut Events, timeout: Option) -> io::Result<()> { @@ -281,6 +294,7 @@ impl Selector { Ok(()) } + #[allow(dead_code)] pub fn rearm(&self, fd: RawFd, interests: Interest) -> io::Result<()> { self.add_task(Task::Rearm { fd, interests }); Ok(()) diff --git a/src/sys/unix/waker.rs b/src/sys/unix/waker.rs index 72ed346d9..745862796 100644 --- a/src/sys/unix/waker.rs +++ b/src/sys/unix/waker.rs @@ -120,7 +120,23 @@ pub(crate) mod pipe { impl Waker { pub fn new(selector: &Selector, token: Token) -> io::Result { let mut fds = [-1; 2]; + #[cfg(not(macos))] syscall!(pipe2(fds.as_mut_ptr(), libc::O_NONBLOCK | libc::O_CLOEXEC))?; + + // MacOS is missing pipe2 system-call. + #[cfg(macos)] + { + use cvt::cvt; + use libc::{fcntl, F_SETFL, F_SETFD, O_NONBLOCK, FD_CLOEXEC}; + syscall!(pipe(fds.as_mut_ptr()))?; + unsafe { + cvt(fcntl(fds[0], F_SETFL, O_NONBLOCK))?; + cvt(fcntl(fds[0], F_SETFD, FD_CLOEXEC))?; + cvt(fcntl(fds[1], F_SETFL, O_NONBLOCK))?; + cvt(fcntl(fds[1], F_SETFD, FD_CLOEXEC))?; + } + } + // Turn the file descriptors into files first so we're ensured // they're closed when dropped, e.g. when register below fails. let sender = unsafe { File::from_raw_fd(fds[1]) }; diff --git a/tests/registering.rs b/tests/registering.rs index 31b8006f0..706725a7c 100644 --- a/tests/registering.rs +++ b/tests/registering.rs @@ -10,7 +10,9 @@ use std::thread::sleep; use std::time::Duration; mod util; -use util::{any_local_address, assert_error, init}; +#[cfg(debug_assertions)] +use util::assert_error; +use util::{any_local_address, init}; const SERVER: Token = Token(0); const CLIENT: Token = Token(1); diff --git a/tests/size.rs b/tests/size.rs index d4fb9a03c..39ce320b2 100644 --- a/tests/size.rs +++ b/tests/size.rs @@ -1,13 +1,31 @@ #[test] +#[cfg(any( + all( + any(target_os = "linux", target_os = "android", target_os = "illumos",), + feature = "os-epoll", + ), + all( + any( + target_os = "dragonfly", + target_os = "freebsd", + target_os = "ios", + target_os = "macos", + target_os = "netbsd", + target_os = "openbsd", + ), + feature = "os-kqueue" + ) +))] #[cfg(unix)] #[cfg(not(debug_assertions))] fn assert_size() { - use mio::net::*; - use std::mem::size_of; - // Without debug assertions enabled `TcpListener`, `TcpStream` and `UdpSocket` should have the - // same size as the system specific socket, i.e. just a file descriptor on Unix platforms. - assert_eq!(size_of::(), size_of::()); - assert_eq!(size_of::(), size_of::()); - assert_eq!(size_of::(), size_of::()); + // same size as the system specific socket, i.e. just a file descriptor on Unix platforms unless I/O selector backend uses POSIX poll(2). + { + use mio::net::*; + use std::mem::size_of; + assert_eq!(size_of::(), size_of::()); + assert_eq!(size_of::(), size_of::()); + assert_eq!(size_of::(), size_of::()); + } } From d067cfbff7f706136ac71f93392436b30f602962 Mon Sep 17 00:00:00 2001 From: Vita Batrla Date: Thu, 23 Jan 2020 14:59:47 +0100 Subject: [PATCH 08/10] fix cargo check --no-default-features --features --- src/sys/shell/mod.rs | 28 ++++++++++++++++++++++------ 1 file changed, 22 insertions(+), 6 deletions(-) diff --git a/src/sys/shell/mod.rs b/src/sys/shell/mod.rs index 830379705..c34eb049d 100644 --- a/src/sys/shell/mod.rs +++ b/src/sys/shell/mod.rs @@ -25,11 +25,6 @@ cfg_uds! { cfg_net! { use std::io; - #[cfg(windows)] - use std::os::windows::io::RawSocket; - - #[cfg(windows)] - use crate::{Registry, Token, Interest}; pub(crate) struct IoSourceState; @@ -48,9 +43,18 @@ cfg_net! { } } +cfg_neither_epoll_nor_kqueue! { + use crate::{Registry, Token, Interest}; + #[cfg(windows)] + use std::os::windows::io::RawSocket; + + #[cfg(not(windows))] + use std::os::unix::io::RawFd; + impl IoSourceState { - pub fn register( + #[cfg(windows)] + pub fn register( &mut self, _: &Registry, _: Token, @@ -60,6 +64,17 @@ cfg_net! { os_required!() } + #[cfg(not(windows))] + pub fn register( + &mut self, + _: &Registry, + _: Token, + _: Interest, + _: RawFd, + ) -> io::Result<()> { + os_required!() + } + pub fn reregister( &mut self, _: &Registry, @@ -73,4 +88,5 @@ cfg_net! { os_required!() } } +} // cfg_neither_epoll_nor_kqueue! } From 6555d2b4228df18a862c02365df93a4b05b9bf0d Mon Sep 17 00:00:00 2001 From: Vita Batrla Date: Thu, 23 Jan 2020 15:27:16 +0100 Subject: [PATCH 09/10] typo: cfg(macos) -> cfg(target_os = "macos") --- src/sys/unix/waker.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/sys/unix/waker.rs b/src/sys/unix/waker.rs index 745862796..687c12157 100644 --- a/src/sys/unix/waker.rs +++ b/src/sys/unix/waker.rs @@ -120,11 +120,11 @@ pub(crate) mod pipe { impl Waker { pub fn new(selector: &Selector, token: Token) -> io::Result { let mut fds = [-1; 2]; - #[cfg(not(macos))] + #[cfg(not(target_os = "macos"))] syscall!(pipe2(fds.as_mut_ptr(), libc::O_NONBLOCK | libc::O_CLOEXEC))?; // MacOS is missing pipe2 system-call. - #[cfg(macos)] + #[cfg(target_os = "macos")] { use cvt::cvt; use libc::{fcntl, F_SETFL, F_SETFD, O_NONBLOCK, FD_CLOEXEC}; From 18f402ea9b8cd08f688cee4053f7d573279924cc Mon Sep 17 00:00:00 2001 From: Vita Batrla Date: Thu, 23 Jan 2020 15:51:20 +0100 Subject: [PATCH 10/10] typo: cfg(target_os = "macos") also in Cargo.toml --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 6a5287202..1983a221c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -48,7 +48,7 @@ guide = [] [dependencies] log = "0.4.8" -[target.'cfg(macos)'.dependencies] +[target.'cfg(target_os = "macos")'.dependencies] cvt = "0.1" [target.'cfg(unix)'.dependencies]