Skip to content

Commit

Permalink
std: Implement LineWriter::write_vectored
Browse files Browse the repository at this point in the history
This commit implements the `write_vectored` method of the `LineWriter`
type. First discovered in bytecodealliance/wasmtime#629 the
`write_vectored` method of `Stdout` bottoms out here but only ends up
writing the first buffer due to the default implementation of
`write_vectored`.

Like `BufWriter`, however, `LineWriter` can have a non-default
implementation of `write_vectored` which tries to preserve the
vectored-ness as much as possible. Namely we can have a vectored write
for everything before the newline and everything after the newline if
all the stars align well.

Also like `BufWriter`, though, special care is taken to ensure that
whenever bytes are written we're sure to signal success since that
represents a "commit" of writing bytes.
  • Loading branch information
alexcrichton committed Dec 13, 2019
1 parent e9469a6 commit 2fee28e
Showing 1 changed file with 171 additions and 1 deletion.
172 changes: 171 additions & 1 deletion src/libstd/io/buffered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -989,6 +989,68 @@ impl<W: Write> Write for LineWriter<W> {
}
}

// Vectored writes are very similar to the writes above, but adjusted for
// the list of buffers that we have to write.
fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
if self.need_flush {
self.flush()?;
}

// Find the last newline, and failing that write the whole buffer
let last_newline = bufs
.iter()
.enumerate()
.rev()
.filter_map(|(i, buf)| {
let pos = memchr::memrchr(b'\n', buf)?;
Some((i, pos))
})
.next();
let (i, j) = match last_newline {
Some(pair) => pair,
None => return self.inner.write_vectored(bufs),
};
let (prefix, suffix) = bufs.split_at(i);
let (buf, suffix) = suffix.split_at(1);
let buf = &buf[0];

// Write everything up to the last newline, flushing afterwards. Note
// that only if we finished our entire `write_vectored` do we try the
// subsequent
// `write`
let mut n = 0;
let prefix_amt = prefix.iter().map(|i| i.len()).sum();
if prefix_amt > 0 {
n += self.inner.write_vectored(prefix)?;
self.need_flush = true;
}
if n == prefix_amt {
match self.inner.write(&buf[..=j]) {
Ok(m) => n += m,
Err(e) if n == 0 => return Err(e),
Err(_) => return Ok(n),
}
self.need_flush = true;
}
if self.flush().is_err() || n != j + 1 + prefix_amt {
return Ok(n);
}

// ... and now write out everything remaining
match self.inner.write(&buf[j + 1..]) {
Ok(i) => n += i,
Err(_) => return Ok(n),
}

if suffix.iter().map(|s| s.len()).sum::<usize>() == 0 {
return Ok(n)
}
match self.inner.write_vectored(suffix) {
Ok(i) => Ok(n + i),
Err(_) => Ok(n),
}
}

fn flush(&mut self) -> io::Result<()> {
self.inner.flush()?;
self.need_flush = false;
Expand All @@ -1015,7 +1077,7 @@ where
#[cfg(test)]
mod tests {
use crate::io::prelude::*;
use crate::io::{self, BufReader, BufWriter, LineWriter, SeekFrom};
use crate::io::{self, BufReader, BufWriter, LineWriter, SeekFrom, IoSlice};
use crate::sync::atomic::{AtomicUsize, Ordering};
use crate::thread;

Expand Down Expand Up @@ -1483,4 +1545,112 @@ mod tests {

assert_eq!(l.write(b"a").unwrap_err().kind(), io::ErrorKind::Other)
}

#[test]
fn line_vectored() {
let mut a = LineWriter::new(Vec::new());
assert_eq!(
a.write_vectored(&[
IoSlice::new(&[]),
IoSlice::new(b"\n"),
IoSlice::new(&[]),
IoSlice::new(b"a"),
])
.unwrap(),
2,
);
assert_eq!(a.get_ref(), b"\n");

assert_eq!(
a.write_vectored(&[
IoSlice::new(&[]),
IoSlice::new(b"b"),
IoSlice::new(&[]),
IoSlice::new(b"a"),
IoSlice::new(&[]),
IoSlice::new(b"c"),
])
.unwrap(),
3,
);
assert_eq!(a.get_ref(), b"\n");
a.flush().unwrap();
assert_eq!(a.get_ref(), b"\nabac");
assert_eq!(a.write_vectored(&[]).unwrap(), 0);
assert_eq!(
a.write_vectored(&[
IoSlice::new(&[]),
IoSlice::new(&[]),
IoSlice::new(&[]),
IoSlice::new(&[]),
])
.unwrap(),
0,
);
assert_eq!(a.write_vectored(&[IoSlice::new(b"a\nb"),]).unwrap(), 3);
assert_eq!(a.get_ref(), b"\nabaca\n");
}

#[test]
fn line_vectored_partial_and_errors() {
enum Call {
Write { inputs: Vec<&'static [u8]>, output: io::Result<usize> },
Flush { output: io::Result<()> },
}
struct Writer {
calls: Vec<Call>,
}

impl Write for Writer {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.write_vectored(&[IoSlice::new(buf)])
}

fn write_vectored(&mut self, buf: &[IoSlice<'_>]) -> io::Result<usize> {
match self.calls.pop().unwrap() {
Call::Write { inputs, output } => {
assert_eq!(inputs, buf.iter().map(|b| &**b).collect::<Vec<_>>());
output
}
_ => panic!("unexpected call to write"),
}
}

fn flush(&mut self) -> io::Result<()> {
match self.calls.pop().unwrap() {
Call::Flush { output } => output,
_ => panic!("unexpected call to flush"),
}
}
}

impl Drop for Writer {
fn drop(&mut self) {
if !thread::panicking() {
assert_eq!(self.calls.len(), 0);
}
}
}

// partial writes keep going
let mut a = LineWriter::new(Writer { calls: Vec::new() });
a.write_vectored(&[IoSlice::new(&[]), IoSlice::new(b"abc")]).unwrap();
a.get_mut().calls.push(Call::Flush { output: Ok(()) });
a.get_mut().calls.push(Call::Write { inputs: vec![b"bcx\n"], output: Ok(4) });
a.get_mut().calls.push(Call::Write { inputs: vec![b"abcx\n"], output: Ok(1) });
a.write_vectored(&[IoSlice::new(b"x"), IoSlice::new(b"\n")]).unwrap();
a.get_mut().calls.push(Call::Flush { output: Ok(()) });
a.flush().unwrap();

// erroneous writes stop and don't write more
a.get_mut().calls.push(Call::Write { inputs: vec![b"x\n"], output: Err(err()) });
assert_eq!(a.write_vectored(&[IoSlice::new(b"x"), IoSlice::new(b"\na")]).unwrap(), 2);
a.get_mut().calls.push(Call::Flush { output: Ok(()) });
a.get_mut().calls.push(Call::Write { inputs: vec![b"x\n"], output: Ok(2) });
a.flush().unwrap();

fn err() -> io::Error {
io::Error::new(io::ErrorKind::Other, "x")
}
}
}

0 comments on commit 2fee28e

Please sign in to comment.