Skip to content
This repository has been archived by the owner on Oct 15, 2022. It is now read-only.

Commit

Permalink
socket/ReadWriter: move timeout logic to a poll(2) syscall
Browse files Browse the repository at this point in the history
Previously, we tried to use `set_read/write_timeout` methods on
`UnixStream`s to set a timeout on the sockets. This worked fine
everywhere, except on MacOS for some elusive reason (possibly a bug in
the rust stdlib).

There is an alternative way to achieve a read/write with timeouts,
implemented in the timeout-readwrite crate:
https://docs.rs/crate/timeout-readwrite/0.2.0/source/src/writer.rs

We don’t add it as dependency, instead we specialize the interesting
`poll(2)` trick to `UnixSocket` since it’s only a few lines of code.

The timeout-readwrite library is Apache-2.0, the same license as our
project.
  • Loading branch information
Profpatsch committed May 27, 2019
1 parent 32b8d21 commit dcc55ab
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 39 deletions.
64 changes: 45 additions & 19 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,4 @@ bincode = "1.1.3"
tempfile = "3.0.7"
vec1 = "1.1.0"
proptest = "0.9.1"
nix = "0.14.0"
2 changes: 1 addition & 1 deletion default.nix
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ pkgs.rustPlatform.buildRustPackage rec {
BUILD_REV_COUNT = src.revCount or 1;
RUN_TIME_CLOSURE = pkgs.callPackage ./nix/runtime.nix {};

cargoSha256 = "0rcjqv3xhxgcf21dnizhw2v0pb5q5grmii11kh07xcp1s87af7jv";
cargoSha256 = "1ix9rmpfhs06xzccvx8xpdl60849ygm25rmixa69wkk0vs3s1bz2";

NIX_PATH = "nixpkgs=${./nix/bogus-nixpkgs}";

Expand Down
99 changes: 80 additions & 19 deletions src/socket/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,20 +51,6 @@ impl Timeout {
pub fn from_millis(m: u16) -> Timeout {
Timeout::D(Millis(m))
}

/// Convert to a timeout understood by `UnixStream`s.
fn to_socket_timeout(&self) -> Option<Duration> {
match self {
Timeout::Infinite => None,
// Socket timeout must not be 0 (or it crashes).
// instead, use a very short duration
Timeout::D(m) => Some(if *m == Millis(0) {
Duration::from_millis(1)
} else {
Duration::from(*m)
}),
}
}
}

/// Reading from a `ReadWriter<'a, R, W>` failed.
Expand Down Expand Up @@ -204,12 +190,11 @@ impl<'a, R, W> ReadWriter<'a, R, W> {
where
R: serde::de::DeserializeOwned,
{
into_bincode_io_error(self.socket.set_read_timeout(timeout.to_socket_timeout()))
.map_err(ReadError::Deserialize)?;
let timeout_socket = timeout::TimeoutReadWriter::new(self.socket, timeout);

// XXX: “If this returns an Error, `reader` may be in an invalid state”.
// what the heck does that mean.
bincode::deserialize_from(self.socket).map_err(|e| {
bincode::deserialize_from(timeout_socket).map_err(|e| {
if Self::is_timed_out(&e) {
ReadError::Timeout
} else {
Expand All @@ -223,9 +208,9 @@ impl<'a, R, W> ReadWriter<'a, R, W> {
where
W: serde::Serialize,
{
into_bincode_io_error(self.socket.set_write_timeout(timeout.to_socket_timeout()))?;
let timeout_socket = timeout::TimeoutReadWriter::new(self.socket, timeout);

bincode::serialize_into(self.socket, mes).map_err(|e| {
bincode::serialize_into(timeout_socket, mes).map_err(|e| {
if Self::is_timed_out(&e) {
WriteError::Timeout
} else {
Expand All @@ -238,3 +223,79 @@ impl<'a, R, W> ReadWriter<'a, R, W> {
Ok(())
}
}

/// Wrap a socket with a timeout. Inspired by https://docs.rs/crate/timeout-readwrite/0.2.0/
mod timeout {
extern crate nix;

use self::nix::libc;
use self::nix::poll;
use super::{Millis, Timeout};
use std::os::unix::io::AsRawFd;
use std::os::unix::net::UnixStream;

/// Wait until `to_fd` receives the poll event from `events`, up to `timeout` length
/// of time.
/// Copied from https://docs.rs/crate/timeout-readwrite/0.2.0/source/src/utils.rs
/// written by Jonathan Creekmore and published under Apache-2.0.
fn wait_until_ready<R: AsRawFd>(
timeout: libc::c_int,
to_fd: &R,
events: poll::PollFlags,
) -> std::io::Result<()> {
let mut pfd = poll::PollFd::new(to_fd.as_raw_fd(), events);
let mut s = unsafe { std::slice::from_raw_parts_mut(&mut pfd, 1) };

let retval = poll::poll(&mut s, timeout)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
if retval == 0 {
return Err(std::io::Error::new(
std::io::ErrorKind::TimedOut,
"timed out waiting for fd to be ready",
));
}
Ok(())
}

pub struct TimeoutReadWriter<'a> {
socket: &'a UnixStream,
timeout: libc::c_int,
}

fn to_poll_2_timeout(t: &Timeout) -> libc::c_int {
match t {
// negative number is infinite timeout
Timeout::Infinite => -1,
// otherwise a duration in milliseconds
Timeout::D(Millis(u)) => libc::c_int::from(*u),
}
}

impl<'a> TimeoutReadWriter<'a> {
pub fn new(socket: &'a UnixStream, timeout: &Timeout) -> TimeoutReadWriter<'a> {
TimeoutReadWriter {
socket,
timeout: to_poll_2_timeout(timeout),
}
}
}

impl<'a> std::io::Read for TimeoutReadWriter<'a> {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
wait_until_ready(self.timeout, self.socket, poll::PollFlags::POLLIN)?;
self.socket.read(buf)
}
}

impl<'a> std::io::Write for TimeoutReadWriter<'a> {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
wait_until_ready(self.timeout, self.socket, poll::PollFlags::POLLOUT)?;
self.socket.write(buf)
}

fn flush(&mut self) -> std::io::Result<()> {
wait_until_ready(self.timeout, self.socket, poll::PollFlags::POLLOUT)?;
self.socket.flush()
}
}
}

0 comments on commit dcc55ab

Please sign in to comment.