diff --git a/Cargo.toml b/Cargo.toml index 7dffe0002..1983a221c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,6 +26,12 @@ publish = false default = [] # Include implementations of I/O source readiness polling. os-poll = [] +# Enable epoll(7) backend for I/O source polling. +os-epoll = ["os-poll"] +# Enable kqueue(7) backend for I/O source polling. +os-kqueue = ["os-poll"] +# Enable epoll(7) or kqueue(7) backend for I/O source polling. +os-poll-accel = ["os-epoll", "os-kqueue"] # Include adapters for underlying OS I/O sources. # Note: This is currently only supported on Unix and provides `SourceFd` os-util = [] @@ -42,6 +48,9 @@ guide = [] [dependencies] log = "0.4.8" +[target.'cfg(target_os = "macos")'.dependencies] +cvt = "0.1" + [target.'cfg(unix)'.dependencies] libc = "0.2.62" @@ -61,8 +70,8 @@ rustdoc-args = ["--cfg", "docsrs"] [[example]] name = "tcp_server" -required-features = ["os-poll", "tcp"] +required-features = ["os-poll-accel", "tcp"] [[example]] name = "udp_server" -required-features = ["os-poll", "udp"] +required-features = ["os-poll-accel", "udp"] diff --git a/azure-pipelines.yml b/azure-pipelines.yml index e673e28a9..af088caff 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -21,6 +21,14 @@ jobs: displayName: Test --release cmd: test --release + # Stable --release (POSIX poll(2)) + - template: ci/azure-test-stable.yml + parameters: + name: stable_release_posix_poll + displayName: Test --release (using POSIX poll(2) I/O Selector) + cmd: test --release + posix_poll: true + # Nightly - template: ci/azure-test-stable.yml parameters: diff --git a/ci/azure-test-stable.yml b/ci/azure-test-stable.yml index 31b92064a..14afb57b4 100644 --- a/ci/azure-test-stable.yml +++ b/ci/azure-test-stable.yml @@ -32,10 +32,17 @@ jobs: cargo hack check --feature-powerset --skip guide displayName: Check feature powerset - - script: cargo ${{ parameters.cmd }} --all-features - displayName: cargo ${{ parameters.cmd }} --all-features - env: - CI: "True" + - ${{ if eq(parameters.posix_poll, false) }}: + - script: cargo ${{ parameters.cmd }} --all-features + displayName: cargo ${{ parameters.cmd }} --all-features + env: + CI: "True" + + - ${{ if eq(parameters.posix_poll, true) }}: + - script: cargo ${{ parameters.cmd }} --features "os-poll os-util tcp udp uds" + displayName: cargo ${{ parameters.cmd }} --features "os-poll os-util tcp udp uds" + env: + CI: "True" - ${{ if eq(parameters.cmd, 'test') }}: - script: cargo doc --no-deps diff --git a/src/io_source.rs b/src/io_source.rs index 6939c0d03..a8bf6f098 100644 --- a/src/io_source.rs +++ b/src/io_source.rs @@ -7,7 +7,7 @@ use std::os::windows::io::AsRawSocket; use std::sync::atomic::{AtomicUsize, Ordering}; use std::{fmt, io}; -#[cfg(any(unix, debug_assertions))] +#[cfg(debug_assertions)] use crate::poll; use crate::sys::IoSourceState; use crate::{event, Interest, Registry, Token}; @@ -129,72 +129,97 @@ impl DerefMut for IoSource { } } -#[cfg(unix)] -impl event::Source for IoSource -where - T: AsRawFd, -{ - fn register( - &mut self, - registry: &Registry, - token: Token, - interests: Interest, - ) -> io::Result<()> { - #[cfg(debug_assertions)] - self.selector_id.associate(registry)?; - poll::selector(registry).register(self.inner.as_raw_fd(), token, interests) - } +cfg_epoll_or_kqueue! { + #[cfg(not(debug_assertions))] + use crate::poll; + impl event::Source for IoSource + where + T: AsRawFd, + { + fn register( + &mut self, + registry: &Registry, + token: Token, + interests: Interest, + ) -> io::Result<()> { + #[cfg(debug_assertions)] + self.selector_id.associate(registry)?; + poll::selector(registry).register(self.inner.as_raw_fd(), token, + interests) + } - fn reregister( - &mut self, - registry: &Registry, - token: Token, - interests: Interest, - ) -> io::Result<()> { - #[cfg(debug_assertions)] - self.selector_id.check_association(registry)?; - poll::selector(registry).reregister(self.inner.as_raw_fd(), token, interests) - } + fn reregister( + &mut self, + registry: &Registry, + token: Token, + interests: Interest, + ) -> io::Result<()> { + #[cfg(debug_assertions)] + self.selector_id.check_association(registry)?; + poll::selector(registry).reregister(self.inner.as_raw_fd(), token, + interests) + } - fn deregister(&mut self, registry: &Registry) -> io::Result<()> { - #[cfg(debug_assertions)] - self.selector_id.remove_association(registry)?; - poll::selector(registry).deregister(self.inner.as_raw_fd()) + fn deregister(&mut self, registry: &Registry) -> io::Result<()> { + #[cfg(debug_assertions)] + self.selector_id.remove_association(registry)?; + poll::selector(registry).deregister(self.inner.as_raw_fd()) + } } } -#[cfg(windows)] -impl event::Source for IoSource -where - T: AsRawSocket, -{ - fn register( - &mut self, - registry: &Registry, - token: Token, - interests: Interest, - ) -> io::Result<()> { - #[cfg(debug_assertions)] - self.selector_id.associate(registry)?; - self.state - .register(registry, token, interests, self.inner.as_raw_socket()) - } +cfg_neither_epoll_nor_kqueue! { + #[cfg(not(windows))] + pub trait AsRawFdOrSocket: AsRawFd {} + #[cfg(not(windows))] + impl AsRawFdOrSocket for T {} + #[cfg(windows)] + pub trait AsRawFdOrSocket: AsRawSocket {} + #[cfg(windows)] + impl AsRawFdOrSocket for T {} - fn reregister( - &mut self, - registry: &Registry, - token: Token, - interests: Interest, - ) -> io::Result<()> { - #[cfg(debug_assertions)] - self.selector_id.check_association(registry)?; - self.state.reregister(registry, token, interests) - } + impl event::Source for IoSource + where + T: AsRawFdOrSocket, + { + fn register( + &mut self, + registry: &Registry, + token: Token, + interests: Interest, + ) -> io::Result<()> { + #[cfg(debug_assertions)] + self.selector_id.associate(registry)?; + #[cfg(windows)] + { + self.state + .register(registry, token, interests, + self.inner.as_raw_socket()) + } + #[cfg(not(windows))] + { + self.state + .register(registry, token, interests, + self.inner.as_raw_fd()) + } + } - fn deregister(&mut self, _registry: &Registry) -> io::Result<()> { - #[cfg(debug_assertions)] - self.selector_id.remove_association(_registry)?; - self.state.deregister() + fn reregister( + &mut self, + registry: &Registry, + token: Token, + interests: Interest, + ) -> io::Result<()> { + #[cfg(debug_assertions)] + self.selector_id.check_association(registry)?; + self.state.reregister(registry, token, interests) + } + + fn deregister(&mut self, _registry: &Registry) -> io::Result<()> { + #[cfg(debug_assertions)] + self.selector_id.remove_association(_registry)?; + self.state.deregister() + } } } diff --git a/src/macros/mod.rs b/src/macros/mod.rs index 7db25795d..3a16db31f 100644 --- a/src/macros/mod.rs +++ b/src/macros/mod.rs @@ -105,3 +105,144 @@ macro_rules! cfg_any_os_util { )* } } + +/// OS supports epoll(7) interface +macro_rules! cfg_epoll { + ($($item:item)*) => { + $( + #[cfg(all( + any( + target_os = "linux", + target_os = "android", + target_os = "illumos", + ), + feature = "os-epoll", + ))] + $item + )* + } +} + +/// OS supports kqueue(2) interface +macro_rules! cfg_kqueue { + ($($item:item)*) => { + $( + #[cfg(all( + any( + target_os = "dragonfly", + target_os = "freebsd", + target_os = "ios", + target_os = "macos", + target_os = "netbsd", + target_os = "openbsd", + ), + feature = "os-kqueue" + ))] + $item + )* + } +} + +/// OS supports either epoll(7) or kqueue(2) interface +macro_rules! cfg_epoll_or_kqueue { + ($($item:item)*) => { + $( + #[cfg(any( + all( + any( + target_os = "linux", + target_os = "android", + target_os = "illumos", + ), + feature = "os-epoll", + ), + all( + any( + target_os = "dragonfly", + target_os = "freebsd", + target_os = "ios", + target_os = "macos", + target_os = "netbsd", + target_os = "openbsd", + ), + feature = "os-kqueue" + ) + ))] + $item + )* + } +} + +/// OS neither supports epoll(7) nor kqueue(2) interfaces +macro_rules! cfg_neither_epoll_nor_kqueue { + ($($item:item)*) => { + $( + #[cfg(not(any( + all( + any( + target_os = "linux", + target_os = "android", + target_os = "illumos", + ), + feature = "os-epoll", + ), + all( + any( + target_os = "dragonfly", + target_os = "freebsd", + target_os = "ios", + target_os = "macos", + target_os = "netbsd", + target_os = "openbsd", + ), + feature = "os-kqueue" + ) + )))] + $item + )* + } +} + +/// Enable Waker backed by kqueue(2) interface +macro_rules! cfg_kqueue_waker { + ($($item:item)*) => { + $( + #[cfg(all( + any( + target_os = "freebsd", + target_os = "ios", + target_os = "macos", + ), + feature = "os-kqueue" + ))] + $item + )* + } +} + +/// Enable Waker backed by pipe(2) interface +macro_rules! cfg_pipe_waker { + ($($item:item)*) => { + $( + #[cfg(not(any( + all( + any( + target_os = "linux", + target_os = "android", + target_os = "illumos", + ), + feature = "os-epoll", + ), + all( + any( + target_os = "freebsd", + target_os = "ios", + target_os = "macos", + ), + feature = "os-kqueue" + ) + )))] + $item + )* + } +} diff --git a/src/net/uds/listener.rs b/src/net/uds/listener.rs index 37e8106d8..d9b515384 100644 --- a/src/net/uds/listener.rs +++ b/src/net/uds/listener.rs @@ -35,7 +35,7 @@ impl UnixListener { /// The call is responsible for ensuring that the listening socket is in /// non-blocking mode. pub fn accept(&self) -> io::Result<(UnixStream, SocketAddr)> { - sys::uds::listener::accept(&self.inner) + self.inner.do_io(|inner| sys::uds::listener::accept(inner)) } /// Returns the local socket address of this listener. diff --git a/src/poll.rs b/src/poll.rs index 16528bd7b..0f052d488 100644 --- a/src/poll.rs +++ b/src/poll.rs @@ -154,19 +154,22 @@ use std::{fmt, io}; /// # Implementation notes /// /// `Poll` is backed by the selector provided by the operating system. -/// -/// | OS | Selector | -/// |---------------|-----------| -/// | Android | [epoll] | -/// | DragonFly BSD | [kqueue] | -/// | FreeBSD | [kqueue] | -/// | Linux | [epoll] | -/// | NetBSD | [kqueue] | -/// | OpenBSD | [kqueue] | -/// | Solaris | [epoll] | -/// | Windows | [IOCP] | -/// | iOS | [kqueue] | -/// | macOS | [kqueue] | +/// On Unix like systems it's either [epoll](4) or [kqueue](2) if the system +/// supports it and falls back to use [poll](2) system call otherwise. +/// On Windows the backend is I/O completion ports. +/// +/// | OS | Selector | +/// |---------------|--------------| +/// | Android | [epoll] | +/// | DragonFly BSD | [kqueue] | +/// | FreeBSD | [kqueue] | +/// | Linux | [epoll] | +/// | NetBSD | [kqueue] | +/// | OpenBSD | [kqueue] | +/// | Windows | [IOCP] | +/// | iOS | [kqueue] | +/// | macOS | [kqueue] | +/// | other Unix | [poll] | /// /// On all supported platforms, socket operations are handled by using the /// system selector. Platform specific extensions (e.g. [`SourceFd`]) allow @@ -174,16 +177,22 @@ use std::{fmt, io}; /// example, Linux's [`signalfd`] feature can be used by registering the FD with /// `Poll` via [`SourceFd`]. /// -/// On all platforms except windows, a call to [`Poll::poll`] is mostly just a -/// direct call to the system selector. However, [IOCP] uses a completion model -/// instead of a readiness model. In this case, `Poll` must adapt the completion -/// model Mio's API. While non-trivial, the bridge layer is still quite +/// On all platforms using [epoll](4) and [kqueue](2) interfaces, a call to +/// [`Poll::poll`] is mostly just a direct call to the system selector. +/// However, [IOCP] uses a completion model instead of a readiness model. +/// In this case, `Poll` must adapt the completion +/// model Mio's of API. While non-trivial, the bridge layer is still quite /// efficient. The most expensive part being calls to `read` and `write` require /// data to be copied into an intermediate buffer before it is passed to the /// kernel. /// +/// In case of [poll](2) backend, `Poll` has to emulate edge triggered model of +/// Mio's API using level triggered model of [poll](2) system call. The work, +/// that done by [epoll](4) in the kernel is shifted to user space. +/// /// [epoll]: http://man7.org/linux/man-pages/man7/epoll.7.html /// [kqueue]: https://www.freebsd.org/cgi/man.cgi?query=kqueue&sektion=2 +/// [poll]: https://docs.oracle.com/cd/E36784_01/html/E36872/poll-2.html /// [IOCP]: https://msdn.microsoft.com/en-us/library/windows/desktop/aa365198(v=vs.85).aspx /// [`signalfd`]: http://man7.org/linux/man-pages/man2/signalfd.2.html /// [`SourceFd`]: unix/struct.SourceFd.html diff --git a/src/sys/shell/mod.rs b/src/sys/shell/mod.rs index 830379705..c34eb049d 100644 --- a/src/sys/shell/mod.rs +++ b/src/sys/shell/mod.rs @@ -25,11 +25,6 @@ cfg_uds! { cfg_net! { use std::io; - #[cfg(windows)] - use std::os::windows::io::RawSocket; - - #[cfg(windows)] - use crate::{Registry, Token, Interest}; pub(crate) struct IoSourceState; @@ -48,9 +43,18 @@ cfg_net! { } } +cfg_neither_epoll_nor_kqueue! { + use crate::{Registry, Token, Interest}; + #[cfg(windows)] + use std::os::windows::io::RawSocket; + + #[cfg(not(windows))] + use std::os::unix::io::RawFd; + impl IoSourceState { - pub fn register( + #[cfg(windows)] + pub fn register( &mut self, _: &Registry, _: Token, @@ -60,6 +64,17 @@ cfg_net! { os_required!() } + #[cfg(not(windows))] + pub fn register( + &mut self, + _: &Registry, + _: Token, + _: Interest, + _: RawFd, + ) -> io::Result<()> { + os_required!() + } + pub fn reregister( &mut self, _: &Registry, @@ -73,4 +88,5 @@ cfg_net! { os_required!() } } +} // cfg_neither_epoll_nor_kqueue! } diff --git a/src/sys/unix/mod.rs b/src/sys/unix/mod.rs index 96d7f4dc2..dfb8e415d 100644 --- a/src/sys/unix/mod.rs +++ b/src/sys/unix/mod.rs @@ -38,24 +38,116 @@ cfg_os_poll! { pub use self::uds::SocketAddr; } - cfg_net! { - use std::io; + cfg_epoll_or_kqueue! { + cfg_net! { + use std::io; - // Both `kqueue` and `epoll` don't need to hold any user space state. - pub(crate) struct IoSourceState; + // Both `kqueue` and `epoll` don't need to hold any user space state. + pub(crate) struct IoSourceState; - impl IoSourceState { - pub fn new() -> IoSourceState { - IoSourceState + impl IoSourceState { + pub fn new() -> IoSourceState { + IoSourceState + } + + pub fn do_io(&self, f: F, io: &T) -> io::Result + where + F: FnOnce(&T) -> io::Result, + { + // We don't hold state, so we can just call the function and + // return. + f(io) + } + } + } + } + + cfg_neither_epoll_nor_kqueue! { + cfg_net! { + use std::io; + use crate::{poll, Interest, Registry, Token}; + use std::os::unix::io::RawFd; + + #[derive(Debug)] + struct InternalState { + selector: Selector, + token: Token, + interests: Interest, + socket: RawFd, } - pub fn do_io(&self, f: F, io: &T) -> io::Result - where - F: FnOnce(&T) -> io::Result, - { - // We don't hold state, so we can just call the function and - // return. - f(io) + pub struct IoSourceState { + inner: Option>, + } + + impl IoSourceState { + pub fn new() -> IoSourceState { + IoSourceState { inner: None } + } + + pub fn do_io(&self, f: F, io: &T) -> io::Result + where + F: FnOnce(&T) -> io::Result, + { + let result = f(io); + self.inner.as_ref().map_or(Ok(()), |state| { + state + .selector + .rearm(state.socket, state.interests) + })?; + result + } + + pub fn register(&mut self, + registry: &Registry, + token: Token, + interests: Interest, + socket: RawFd + ) -> io::Result<()> { + if self.inner.is_some() { + Err(io::ErrorKind::AlreadyExists.into()) + } else { + let selector = poll::selector(registry); + selector + .register(socket, token, interests) + .map(|_state| { + self.inner = Some(Box::new(InternalState { + selector: selector.try_clone().unwrap(), + socket, token, interests + })); + }) + } + } + + pub fn reregister( + &mut self, + _registry: &Registry, + token: Token, + interests: Interest, + ) -> io::Result<()> { + match self.inner.as_mut() { + Some(state) => { + state.selector + .reregister(state.socket, token, interests) + .map(|()| { + state.token = token; + state.interests = interests; + }) + } + None => Err(io::ErrorKind::NotFound.into()), + } + } + + pub fn deregister(&mut self) -> io::Result<()> { + match self.inner.as_mut() { + Some(state) => { + state.selector.deregister(state.socket).unwrap(); + self.inner = None; + Ok(()) + } + None => Err(io::ErrorKind::NotFound.into()), + } + } } } } diff --git a/src/sys/unix/selector/mod.rs b/src/sys/unix/selector/mod.rs index 82475c897..afcce572d 100644 --- a/src/sys/unix/selector/mod.rs +++ b/src/sys/unix/selector/mod.rs @@ -1,25 +1,14 @@ -#[cfg(any(target_os = "linux", target_os = "android", target_os = "solaris"))] -mod epoll; +cfg_epoll! { + mod epoll; + pub(crate) use self::epoll::{event, Event, Events, Selector}; +} -#[cfg(any(target_os = "linux", target_os = "android", target_os = "solaris"))] -pub(crate) use self::epoll::{event, Event, Events, Selector}; +cfg_kqueue! { + mod kqueue; + pub(crate) use self::kqueue::{event, Event, Events, Selector}; +} -#[cfg(any( - target_os = "dragonfly", - target_os = "freebsd", - target_os = "ios", - target_os = "macos", - target_os = "netbsd", - target_os = "openbsd" -))] -mod kqueue; - -#[cfg(any( - target_os = "dragonfly", - target_os = "freebsd", - target_os = "ios", - target_os = "macos", - target_os = "netbsd", - target_os = "openbsd" -))] -pub(crate) use self::kqueue::{event, Event, Events, Selector}; +cfg_neither_epoll_nor_kqueue! { + mod poll; + pub(crate) use self::poll::{event, Event, Events, Selector}; +} diff --git a/src/sys/unix/selector/poll.rs b/src/sys/unix/selector/poll.rs new file mode 100644 index 000000000..24634afc3 --- /dev/null +++ b/src/sys/unix/selector/poll.rs @@ -0,0 +1,424 @@ +use std::fmt; +use std::os::raw::c_void; +use std::os::unix::io::{AsRawFd, RawFd}; +#[cfg(debug_assertions)] +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Arc, Mutex}; +use std::time::Duration; +use std::{cmp, io}; + +use crate::interest::Interest; +use crate::sys::unix::waker::pipe::Waker; +use crate::token::Token; + +use libc::{c_int, c_short, nfds_t}; +use libc::{POLLIN, POLLNVAL, POLLOUT, POLLPRI, POLLRDBAND, POLLRDNORM, POLLWRBAND, POLLWRNORM}; + +#[cfg(debug_assertions)] +static NEXT_ID: AtomicUsize = AtomicUsize::new(1); + +#[derive(Debug)] +enum Task { + Add { + fd: RawFd, + token: Token, + interests: Interest, + is_waker: bool, + }, + Update { + fd: RawFd, + token: Token, + interests: Interest, + }, + #[allow(dead_code)] + Rearm { + fd: RawFd, + interests: Interest, + }, + Del { + fd: RawFd, + }, +} + +impl Task { + fn apply(&self, poll: &mut PollImpl) { + match self { + Task::Add { + fd, + token, + interests, + is_waker, + } => { + poll.fdarr.push(libc::pollfd { + fd: *fd, + events: interests_to_poll(*interests), + revents: 0, + }); + poll.fdmeta.push(FdMetadata { + token: usize::from(*token), + is_waker: *is_waker, + }); + // Store index for quick lookup. + if (poll.fdhash.len() as i32) <= *fd { + poll.fdhash.resize((*fd as usize) + 1, None); + } + poll.fdhash[*fd as usize] = Some((poll.fdarr.len() as usize) - 1); + } + Task::Del { fd } => { + let pos = poll.fdhash[*fd as usize].unwrap(); + poll.fdarr.remove(pos); + poll.fdmeta.remove(pos); + // Re-calculate hashed indexes after arrays shrunk down by 1. + poll.fdhash.iter_mut().for_each(|opt| match opt.as_mut() { + Some(ref mut x) if **x > pos => **x -= 1, + Some(ref mut x) if **x == pos => *opt = None, + _ => (), + }); + } + Task::Update { + fd, + token, + interests, + } => { + let pos = poll.fdhash[*fd as usize].unwrap(); + poll.fdarr[pos].events = interests_to_poll(*interests); + poll.fdmeta[pos].token = usize::from(*token); + } + #[allow(dead_code)] + Task::Rearm { fd, interests } => { + let pos = poll.fdhash[*fd as usize].unwrap(); + poll.fdarr[pos].events = interests_to_poll(*interests); + } + } + } +} + +struct FdMetadata { + token: usize, + is_waker: bool, +} + +struct PollImpl { + fdarr: Box>, + fdmeta: Box>, + fdhash: Box>>, +} + +impl fmt::Debug for PollImpl { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "PollImpl {{\n")?; + for (fd, meta) in self.fdarr.iter().zip(self.fdmeta.iter()) { + write!( + f, + "fd={:?} token={:?} events={:?} revents={:?}\n", + fd.fd, meta.token, fd.events, fd.revents + )?; + } + write!(f, "fdhash {{\n")?; + for (i, h) in self.fdhash.iter().enumerate() { + if let Some(x) = h { + write!(f, "{} => {}\n", i, x)? + } + } + write!(f, "}}\n")?; + write!(f, "}}\n")?; + Ok(()) + } +} + +#[derive(Debug)] +struct SelectorImpl { + tasks: Mutex>, + poll: Mutex, + waker: Mutex>>, + as_fd: Mutex, +} + +#[derive(Debug)] +pub struct Selector { + #[cfg(debug_assertions)] + id: usize, + state: Arc, +} + +impl Selector { + pub fn new() -> io::Result { + let state = Arc::new(SelectorImpl { + tasks: Mutex::new(vec![]), + poll: Mutex::new(PollImpl { + fdarr: Box::new(vec![]), + fdmeta: Box::new(vec![]), + fdhash: Box::new(vec![]), + }), + waker: Mutex::new(None), + as_fd: Mutex::new(-1), + }); + #[cfg(debug_assertions)] + let id = NEXT_ID.fetch_add(1, Ordering::Relaxed); + let sel = Selector { + #[cfg(debug_assertions)] + id, + state, + }; + sel.add_waker()?; + Ok(sel) + } + + fn add_waker(&self) -> io::Result<()> { + let poll = self.get_impl(); + let mut guard = poll.waker.lock().unwrap(); + *guard = Some(Box::new(Waker::new(self, Token(0))?)); + Ok(()) + } + + fn get_impl(&self) -> &SelectorImpl { + &*self.state + } + + pub fn try_clone(&self) -> io::Result { + let state = Arc::clone(&self.state); + Ok(Selector { + #[cfg(debug_assertions)] + id: self.id, + state, + }) + } + + pub fn select(&self, events: &mut Events, timeout: Option) -> io::Result<()> { + let me = self.get_impl(); + let mut poll = me.poll.lock().unwrap(); + loop { + { + // process pending tasks to (re|de)-register IO sources + let mut tasks = me.tasks.lock().unwrap(); + for task in tasks.iter() { + task.apply(&mut *poll); + } + tasks.clear(); + } + + events.clear(); + let timeout = timeout + .map(|to| cmp::min(to.as_millis(), c_int::max_value() as u128) as c_int) + .unwrap_or(-1); + let poll_rv = unsafe { + let rv = libc::poll(poll.fdarr.as_mut_ptr(), poll.fdarr.len() as nfds_t, timeout); + if rv < 0 { + return Err(std::io::Error::last_os_error()); + } + rv + }; + + for i in 0..poll.fdarr.len() { + let revents = poll.fdarr[i].revents; + let token = poll.fdmeta[i].token; + + // Skip over internal waker at index 0 + if i > 0 && revents != 0 && revents & !POLLNVAL != 0 { + events.push(Event { + data: token, + events: revents, + }); + } + if revents & POLLNVAL != 0 { + // This FD died. Someone closed this FD before + // deregistering first or there was a race condition + // between poll and deregister. Set it to -1 to let + // poll know it should be ignored. + poll.fdarr[i].fd = -1; + } + // Emulate edge triggered events + if !poll.fdmeta[i].is_waker { + poll.fdarr[i].events &= !(revents | POLLNVAL); + } + // Empty waker's queue + if revents != 0 && poll.fdmeta[i].is_waker { + let mut buf: [u8; 8] = [0; 8]; + unsafe { + libc::read(poll.fdarr[i].fd, buf.as_mut_ptr() as *mut c_void, 8); + } + } + } + if events.len() > 0 || poll_rv == 0 { + // Something is ready or poll(2) timed out + break; + } + } + return Ok(()); + } + + fn add_task(&self, task: Task) { + let me = self.get_impl(); + if let Ok(mut poll) = me.poll.try_lock() { + // poll not running, apply task directly if nothing sits in queue + let mut tasks = self.get_impl().tasks.lock().unwrap(); + match tasks.len() { + 0 => task.apply(&mut poll), + _ => tasks.push(task), + } + } else { + // poll is running, queue task and wake up poll thread + if let Some(ref waker) = me.waker.lock().unwrap().as_ref() { + self.get_impl().tasks.lock().unwrap().push(task); + waker.wake().unwrap(); + } + } + } + + pub fn register(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()> { + self.add_task(Task::Add { + fd, + token, + interests, + is_waker: false, + }); + Ok(()) + } + + pub(crate) fn register_pipe_waker( + &self, + fd: RawFd, + token: Token, + interests: Interest, + ) -> io::Result<()> { + self.add_task(Task::Add { + fd, + token, + interests, + is_waker: true, + }); + let mut as_fd = self.get_impl().as_fd.lock().unwrap(); + if *as_fd == -1 { + *as_fd = fd + } + Ok(()) + } + + #[allow(dead_code)] + pub fn rearm(&self, fd: RawFd, interests: Interest) -> io::Result<()> { + self.add_task(Task::Rearm { fd, interests }); + Ok(()) + } + + pub fn reregister(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()> { + self.add_task(Task::Update { + fd, + token, + interests, + }); + Ok(()) + } + + pub fn deregister(&self, fd: RawFd) -> io::Result<()> { + self.add_task(Task::Del { fd }); + Ok(()) + } +} + +cfg_net! { + impl Selector { + #[cfg(debug_assertions)] + pub fn id(&self) -> usize { + self.id + } + } +} + +impl AsRawFd for Selector { + fn as_raw_fd(&self) -> RawFd { + *self.get_impl().as_fd.lock().unwrap() + } +} + +fn interests_to_poll(interests: Interest) -> c_short { + let mut kind = 0; + + if interests.is_readable() { + kind |= POLLIN | POLLPRI | POLLRDBAND | POLLRDNORM; + } + if interests.is_writable() { + kind |= POLLOUT | POLLWRNORM | POLLWRBAND; + } + kind +} + +pub struct Event { + data: usize, + events: c_short, +} + +pub type Events = Vec; + +pub mod event { + use std::fmt; + + use crate::sys::Event; + use crate::Token; + use libc::c_short; + use libc::{POLLERR, POLLHUP, POLLIN, POLLOUT, POLLPRI}; + + pub fn token(event: &Event) -> Token { + Token(event.data) + } + + pub fn is_readable(event: &Event) -> bool { + event.events & (POLLIN | POLLPRI) != 0 + } + + pub fn is_writable(event: &Event) -> bool { + event.events & POLLOUT != 0 + } + + pub fn is_error(event: &Event) -> bool { + event.events & POLLERR != 0 + } + + pub fn is_read_closed(_event: &Event) -> bool { + // Not supported. Use read(2) to detect EOF. + false + } + + pub fn is_write_closed(event: &Event) -> bool { + event.events & POLLHUP != 0 + } + + pub fn is_priority(event: &Event) -> bool { + event.events & POLLPRI != 0 + } + + pub fn is_aio(_: &Event) -> bool { + // Not supported. + false + } + + pub fn is_lio(_: &Event) -> bool { + // Not supported. + false + } + + pub fn debug_details(f: &mut fmt::Formatter<'_>, _event: &Event) -> fmt::Result { + #[allow(clippy::trivially_copy_pass_by_ref)] + fn check_flag(got: &c_short, want: &c_short) -> bool { + (got & want) != 0 + } + + debug_detail!( + FlagsDetails(c_short), + check_flag, + libc::POLLIN, + libc::POLLOUT, + libc::POLLPRI, + libc::POLLRDBAND, + libc::POLLRDNORM, + libc::POLLWRBAND, + libc::POLLWRNORM, + libc::POLLNVAL, + libc::POLLERR + ); + + f.debug_struct("event") + .field("flags", &FlagsDetails(_event.events)) + .field("data", &_event.data) + .finish() + } +} diff --git a/src/sys/unix/waker.rs b/src/sys/unix/waker.rs index 16707cd19..687c12157 100644 --- a/src/sys/unix/waker.rs +++ b/src/sys/unix/waker.rs @@ -1,4 +1,4 @@ -#[cfg(any(target_os = "linux", target_os = "android"))] +cfg_epoll! { mod eventfd { use crate::sys::Selector; use crate::{Interest, Token}; @@ -58,10 +58,10 @@ mod eventfd { } } -#[cfg(any(target_os = "linux", target_os = "android"))] pub use self::eventfd::Waker; +} // cfg_epoll! -#[cfg(any(target_os = "freebsd", target_os = "ios", target_os = "macos"))] +cfg_kqueue_waker! { mod kqueue { use crate::sys::Selector; use crate::Token; @@ -95,22 +95,17 @@ mod kqueue { } } -#[cfg(any(target_os = "freebsd", target_os = "ios", target_os = "macos"))] pub use self::kqueue::Waker; +} // cfg_kqueue! -#[cfg(any( - target_os = "dragonfly", - target_os = "netbsd", - target_os = "openbsd", - target_os = "solaris" -))] -mod pipe { +cfg_pipe_waker! { +pub(crate) mod pipe { use crate::sys::unix::Selector; use crate::{Interest, Token}; use std::fs::File; - use std::io::{self, Read, Write}; - use std::os::unix::io::FromRawFd; + use std::io::{self, Write}; + use std::os::unix::io::{FromRawFd, RawFd}; /// Waker backed by a unix pipe. /// @@ -125,14 +120,28 @@ mod pipe { impl Waker { pub fn new(selector: &Selector, token: Token) -> io::Result { let mut fds = [-1; 2]; + #[cfg(not(target_os = "macos"))] syscall!(pipe2(fds.as_mut_ptr(), libc::O_NONBLOCK | libc::O_CLOEXEC))?; + + // MacOS is missing pipe2 system-call. + #[cfg(target_os = "macos")] + { + use cvt::cvt; + use libc::{fcntl, F_SETFL, F_SETFD, O_NONBLOCK, FD_CLOEXEC}; + syscall!(pipe(fds.as_mut_ptr()))?; + unsafe { + cvt(fcntl(fds[0], F_SETFL, O_NONBLOCK))?; + cvt(fcntl(fds[0], F_SETFD, FD_CLOEXEC))?; + cvt(fcntl(fds[1], F_SETFL, O_NONBLOCK))?; + cvt(fcntl(fds[1], F_SETFD, FD_CLOEXEC))?; + } + } + // Turn the file descriptors into files first so we're ensured // they're closed when dropped, e.g. when register below fails. let sender = unsafe { File::from_raw_fd(fds[1]) }; let receiver = unsafe { File::from_raw_fd(fds[0]) }; - selector - .register(fds[0], token, Interest::READABLE) - .map(|()| Waker { sender, receiver }) + Self::register_waker(fds[0], selector, token).map(|()| Waker { sender, receiver }) } pub fn wake(&self) -> io::Result<()> { @@ -148,25 +157,42 @@ mod pipe { Err(err) => Err(err), } } + } + + cfg_neither_epoll_nor_kqueue! { + impl Waker { + // Empty not needed in poll(2) case, emptying is done by selector. + fn empty(&self) {} + fn register_waker(fd: RawFd, selector: &Selector, token: Token) + -> io::Result<()> { + // Register fd using a custom down-call to ensure + // level-triggered semantic required by a waker. + selector.register_pipe_waker(fd, token, Interest::READABLE) + } + } + } - /// Empty the pipe's buffer, only need to call this if `wake` fails. - /// This ignores any errors. - fn empty(&self) { - let mut buf = [0; 4096]; - loop { - match (&self.receiver).read(&mut buf) { - Ok(n) if n > 0 => continue, - _ => return, + cfg_epoll_or_kqueue! { + use std::io::Read; + impl Waker { + fn register_waker(fd: RawFd, selector: &Selector, token: Token) + -> io::Result<()> { + selector.register(fd, token, Interest::READABLE) + } + /// Empty the pipe's buffer, only need to call this if `wake` fails. + /// This ignores any errors. + fn empty(&self) { + let mut buf = [0; 4096]; + loop { + match (&self.receiver).read(&mut buf) { + Ok(n) if n > 0 => continue, + _ => return, + } } } } } } -#[cfg(any( - target_os = "dragonfly", - target_os = "netbsd", - target_os = "openbsd", - target_os = "solaris" -))] pub use self::pipe::Waker; +} // cfg_pipe_waker! diff --git a/tests/registering.rs b/tests/registering.rs index 31b8006f0..706725a7c 100644 --- a/tests/registering.rs +++ b/tests/registering.rs @@ -10,7 +10,9 @@ use std::thread::sleep; use std::time::Duration; mod util; -use util::{any_local_address, assert_error, init}; +#[cfg(debug_assertions)] +use util::assert_error; +use util::{any_local_address, init}; const SERVER: Token = Token(0); const CLIENT: Token = Token(1); diff --git a/tests/size.rs b/tests/size.rs index d4fb9a03c..39ce320b2 100644 --- a/tests/size.rs +++ b/tests/size.rs @@ -1,13 +1,31 @@ #[test] +#[cfg(any( + all( + any(target_os = "linux", target_os = "android", target_os = "illumos",), + feature = "os-epoll", + ), + all( + any( + target_os = "dragonfly", + target_os = "freebsd", + target_os = "ios", + target_os = "macos", + target_os = "netbsd", + target_os = "openbsd", + ), + feature = "os-kqueue" + ) +))] #[cfg(unix)] #[cfg(not(debug_assertions))] fn assert_size() { - use mio::net::*; - use std::mem::size_of; - // Without debug assertions enabled `TcpListener`, `TcpStream` and `UdpSocket` should have the - // same size as the system specific socket, i.e. just a file descriptor on Unix platforms. - assert_eq!(size_of::(), size_of::()); - assert_eq!(size_of::(), size_of::()); - assert_eq!(size_of::(), size_of::()); + // same size as the system specific socket, i.e. just a file descriptor on Unix platforms unless I/O selector backend uses POSIX poll(2). + { + use mio::net::*; + use std::mem::size_of; + assert_eq!(size_of::(), size_of::()); + assert_eq!(size_of::(), size_of::()); + assert_eq!(size_of::(), size_of::()); + } } diff --git a/tests/tcp.rs b/tests/tcp.rs index 5c1b7200c..e87c855da 100644 --- a/tests/tcp.rs +++ b/tests/tcp.rs @@ -374,30 +374,6 @@ fn connect_then_close() { } } -#[test] -fn listen_then_close() { - init(); - - let mut poll = Poll::new().unwrap(); - let mut l = TcpListener::bind("127.0.0.1:0".parse().unwrap()).unwrap(); - - poll.registry() - .register(&mut l, Token(1), Interest::READABLE) - .unwrap(); - drop(l); - - let mut events = Events::with_capacity(128); - - poll.poll(&mut events, Some(Duration::from_millis(100))) - .unwrap(); - - for event in &events { - if event.token() == Token(1) { - panic!("recieved ready() on a closed TcpListener") - } - } -} - #[test] fn bind_twice_bad() { init(); @@ -658,7 +634,7 @@ macro_rules! wait { }}; } -#[test] +test_shutdown_server! { fn write_shutdown() { init(); @@ -695,6 +671,7 @@ fn write_shutdown() { wait!(poll, is_readable, true); } +} // test_shutdown_server! struct MyHandler { listener: TcpListener, diff --git a/tests/tcp_close.rs b/tests/tcp_close.rs new file mode 100644 index 000000000..000a98def --- /dev/null +++ b/tests/tcp_close.rs @@ -0,0 +1,41 @@ +#![cfg(all(feature = "os-poll", feature = "tcp"))] + +use mio::net::TcpListener; +use mio::{Events, Interest, Poll, Token}; +use std::time::Duration; +mod util; +use util::init; + +// +// There's a race condition if this test run in parallel with other tests. +// Right after fd is closed below, anyone can open a new file/socket and system +// might allocate the same fd for him. In such case test fails because it +// receives an event generated by another test running at the same time since +// it monitors fd owned by someone else. +// +// The test is alone in this source file on purpose to run single threaded and +// avoid above race condition. +// +#[test] +fn listen_then_close() { + init(); + + let mut poll = Poll::new().unwrap(); + let mut l = TcpListener::bind("127.0.0.1:0".parse().unwrap()).unwrap(); + + poll.registry() + .register(&mut l, Token(1), Interest::READABLE) + .unwrap(); + drop(l); + + let mut events = Events::with_capacity(128); + + poll.poll(&mut events, Some(Duration::from_millis(100))) + .unwrap(); + + for event in &events { + if event.token() == Token(1) { + panic!("received ready() on a closed TcpListener") + } + } +} diff --git a/tests/tcp_stream.rs b/tests/tcp_stream.rs index 11d8d841d..c60363d12 100644 --- a/tests/tcp_stream.rs +++ b/tests/tcp_stream.rs @@ -293,7 +293,8 @@ fn shutdown_read() { target_os = "ios", target_os = "macos", target_os = "netbsd", - target_os = "openbsd" + target_os = "openbsd", + target_os = "solaris" ))] { let mut buf = [0; 20]; @@ -381,7 +382,8 @@ fn shutdown_both() { target_os = "ios", target_os = "macos", target_os = "netbsd", - target_os = "openbsd" + target_os = "openbsd", + target_os = "solaris" ))] { let mut buf = [0; 20]; @@ -502,11 +504,7 @@ fn no_events_after_deregister() { thread_handle.join().expect("unable to join thread"); } -#[test] -#[cfg_attr( - windows, - ignore = "fails on Windows; client read closed events are not triggered" -)] +test_shutdown_client! { fn tcp_shutdown_client_read_close_event() { let (mut poll, mut events) = init_with_poll(); let barrier = Arc::new(Barrier::new(2)); @@ -536,13 +534,9 @@ fn tcp_shutdown_client_read_close_event() { barrier.wait(); handle.join().expect("failed to join thread"); } +} // test_shutdown_client! -#[test] -#[cfg_attr(windows, ignore = "fails; client write_closed events are not found")] -#[cfg_attr( - any(target_os = "linux", target_os = "android", target_os = "solaris"), - ignore = "fails; client write_closed events are not found" -)] +test_shutdown_client_write! { fn tcp_shutdown_client_write_close_event() { let (mut poll, mut events) = init_with_poll(); let barrier = Arc::new(Barrier::new(2)); @@ -572,8 +566,9 @@ fn tcp_shutdown_client_write_close_event() { barrier.wait(); handle.join().expect("failed to join thread"); } +} // test_shutdown_client_write! -#[test] +test_shutdown_server! { fn tcp_shutdown_server_write_close_event() { let (mut poll, mut events) = init_with_poll(); let barrier = Arc::new(Barrier::new(2)); @@ -602,12 +597,9 @@ fn tcp_shutdown_server_write_close_event() { barrier.wait(); handle.join().expect("failed to join thread"); } +} // test_shutdown_server! -#[test] -#[cfg_attr( - windows, - ignore = "fails on Windows; client close events are not found" -)] +test_shutdown_client! { fn tcp_shutdown_client_both_close_event() { let (mut poll, mut events) = init_with_poll(); let barrier = Arc::new(Barrier::new(2)); @@ -635,6 +627,7 @@ fn tcp_shutdown_client_both_close_event() { barrier.wait(); handle.join().expect("failed to join thread"); } +} // test_shutdown_client! /// Start a listener that accepts `n_connections` connections on the returned /// address. It echos back any data it reads from the connection before diff --git a/tests/unix_datagram.rs b/tests/unix_datagram.rs index 24c05c7ec..b5b58f795 100644 --- a/tests/unix_datagram.rs +++ b/tests/unix_datagram.rs @@ -165,7 +165,7 @@ fn unix_datagram_pair() { assert!(datagram2.take_error().unwrap().is_none()); } -#[test] +test_shutdown_client! { fn unix_datagram_shutdown() { let (mut poll, mut events) = init_with_poll(); let path1 = temp_file("unix_datagram_shutdown1"); @@ -222,6 +222,7 @@ fn unix_datagram_shutdown() { assert!(datagram1.take_error().unwrap().is_none()); } +} // test_shutdown_client! #[test] fn unix_datagram_register() { diff --git a/tests/unix_stream.rs b/tests/unix_stream.rs index 7159ec828..0f0689d38 100644 --- a/tests/unix_stream.rs +++ b/tests/unix_stream.rs @@ -143,7 +143,7 @@ fn unix_stream_peer_addr() { handle.join().unwrap(); } -#[test] +test_shutdown_client! { fn unix_stream_shutdown_read() { let (mut poll, mut events) = init_with_poll(); let (handle, remote_addr) = new_echo_listener(1, "unix_stream_shutdown_read"); @@ -196,6 +196,7 @@ fn unix_stream_shutdown_read() { drop(stream); handle.join().unwrap(); } +} // test_shutdown_client #[test] fn unix_stream_shutdown_write() { @@ -252,7 +253,7 @@ fn unix_stream_shutdown_write() { handle.join().unwrap(); } -#[test] +test_shutdown_client! { fn unix_stream_shutdown_both() { let (mut poll, mut events) = init_with_poll(); let (handle, remote_addr) = new_echo_listener(1, "unix_stream_shutdown_both"); @@ -311,8 +312,9 @@ fn unix_stream_shutdown_both() { drop(stream); handle.join().unwrap(); } +} // test_shutdown_client! -#[test] +test_shutdown_server! { fn unix_stream_shutdown_listener_write() { let (mut poll, mut events) = init_with_poll(); let barrier = Arc::new(Barrier::new(2)); @@ -344,6 +346,7 @@ fn unix_stream_shutdown_listener_write() { barrier.wait(); handle.join().unwrap(); } +} // test_shutdown_server! #[test] fn unix_stream_register() { diff --git a/tests/util/mod.rs b/tests/util/mod.rs index 75d866f02..372302d1c 100644 --- a/tests/util/mod.rs +++ b/tests/util/mod.rs @@ -196,14 +196,23 @@ pub fn assert_would_block(result: io::Result) { } } -/// Assert that `NONBLOCK` is set on `socket`. +/// Assert that socket is non-blocking. #[cfg(unix)] pub fn assert_socket_non_blocking(socket: &S) where S: AsRawFd, { + let expected = if cfg!(target_os = "solaris") { + /* + * On Solaris O_NDELAY and O_NONBLOCK behave the same for sockets. + * For more details, see read(2) man page. + */ + libc::O_NONBLOCK | libc::O_NDELAY + } else { + libc::O_NONBLOCK + }; let flags = unsafe { libc::fcntl(socket.as_raw_fd(), libc::F_GETFL) }; - assert!(flags & libc::O_NONBLOCK != 0, "socket not non-blocking"); + assert!(flags & expected != 0, "socket not non-blocking"); } #[cfg(windows)] @@ -311,3 +320,99 @@ macro_rules! expect_read { assert_eq!(address, source); }}; } + +/// tests READ_CLOSED event after shutdown on server side, +/// not supported using poll(2) +macro_rules! test_shutdown_server { + ($($item:item)*) => { + $( + #[test] + #[cfg_attr(not(any( + windows, + all( + any( + target_os = "dragonfly", + target_os = "freebsd", + target_os = "ios", + target_os = "macos", + target_os = "netbsd", + target_os = "openbsd", + ), + feature = "os-kqueue" + ), + all( + any( + target_os = "linux", + target_os = "android", + target_os = "illumos" + ), + feature = "os-epoll", + ) + )), ignore = "read closed and write closed events not supported")] + $item + )* + } +} + +/// tests READ_CLOSED events after shutdown read on client side, +/// or WRITE_CLOSED events after shutdown both on client side, +/// neither supported using poll(2) nor on Windows +macro_rules! test_shutdown_client { + ($($item:item)*) => { + $( + #[test] + #[cfg_attr(not(any( + all( + any( + target_os = "dragonfly", + target_os = "freebsd", + target_os = "ios", + target_os = "macos", + target_os = "netbsd", + target_os = "openbsd", + ), + feature = "os-kqueue" + ), + all( + any( + target_os = "linux", + target_os = "android", + target_os = "illumos" + ), + feature = "os-epoll", + ) + )), ignore = "client close events are not found")] + $item + )* + } +} + +/// tests WRITE_CLOSED event after shutdown write on client side, +/// neither supported using poll(2) nor Windows nor Linux / Android +macro_rules! test_shutdown_client_write { + ($($item:item)*) => { + $( + #[test] + #[cfg_attr(not(any( + all( + any( + target_os = "dragonfly", + target_os = "freebsd", + target_os = "ios", + target_os = "macos", + target_os = "netbsd", + target_os = "openbsd", + ), + feature = "os-kqueue" + ), + all( + any( + target_os = "illumos" + ), + feature = "os-epoll", + ) + )), ignore = "client write closed events are not found")] + $item + )* + } +}