Skip to content

Commit

Permalink
Rollup merge of #118222 - the8472:copy-use-vec-write, r=m-ou-se
Browse files Browse the repository at this point in the history
unify read_to_end and io::copy impls for reading into a Vec

This ports over the initial probe (to avoid allocation) and the dynamic read sizing from the io::copy specialization to the `default_read_to_end` implementation which already had its own optimizations for different cases.

I think it should be a best-of-both now.

suggested by `@a1phyr` in #117576 (comment)
  • Loading branch information
matthiaskrgr authored Nov 28, 2023
2 parents b8e1194 + bc7dd5f commit 97ef5a3
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 107 deletions.
69 changes: 3 additions & 66 deletions library/std/src/io/copy.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use super::{BorrowedBuf, BufReader, BufWriter, Read, Result, Write, DEFAULT_BUF_SIZE};
use crate::alloc::Allocator;
use crate::cmp;
use crate::cmp::min;
use crate::collections::VecDeque;
use crate::io::IoSlice;
use crate::mem::MaybeUninit;
Expand Down Expand Up @@ -256,79 +255,17 @@ impl<I: Write + ?Sized> BufferedWriterSpec for BufWriter<I> {
}
}

impl<A: Allocator> BufferedWriterSpec for Vec<u8, A> {
impl BufferedWriterSpec for Vec<u8> {
fn buffer_size(&self) -> usize {
cmp::max(DEFAULT_BUF_SIZE, self.capacity() - self.len())
}

fn copy_from<R: Read + ?Sized>(&mut self, reader: &mut R) -> Result<u64> {
let mut bytes = 0;

// avoid inflating empty/small vecs before we have determined that there's anything to read
if self.capacity() < DEFAULT_BUF_SIZE {
let stack_read_limit = DEFAULT_BUF_SIZE as u64;
bytes = stack_buffer_copy(&mut reader.take(stack_read_limit), self)?;
// fewer bytes than requested -> EOF reached
if bytes < stack_read_limit {
return Ok(bytes);
}
}

// don't immediately offer the vec's whole spare capacity, otherwise
// we might have to fully initialize it if the reader doesn't have a custom read_buf() impl
let mut max_read_size = DEFAULT_BUF_SIZE;

loop {
self.reserve(DEFAULT_BUF_SIZE);
let mut initialized_spare_capacity = 0;

loop {
let buf = self.spare_capacity_mut();
let read_size = min(max_read_size, buf.len());
let mut buf = BorrowedBuf::from(&mut buf[..read_size]);
// SAFETY: init is either 0 or the init_len from the previous iteration.
unsafe {
buf.set_init(initialized_spare_capacity);
}
match reader.read_buf(buf.unfilled()) {
Ok(()) => {
let bytes_read = buf.len();

// EOF
if bytes_read == 0 {
return Ok(bytes);
}

// the reader is returning short reads but it doesn't call ensure_init()
if buf.init_len() < buf.capacity() {
max_read_size = usize::MAX;
}
// the reader hasn't returned short reads so far
if bytes_read == buf.capacity() {
max_read_size *= 2;
}

initialized_spare_capacity = buf.init_len() - bytes_read;
bytes += bytes_read as u64;
// SAFETY: BorrowedBuf guarantees all of its filled bytes are init
// and the number of read bytes can't exceed the spare capacity since
// that's what the buffer is borrowing from.
unsafe { self.set_len(self.len() + bytes_read) };

// spare capacity full, reserve more
if self.len() == self.capacity() {
break;
}
}
Err(e) if e.is_interrupted() => continue,
Err(e) => return Err(e),
}
}
}
reader.read_to_end(self).map(|bytes| u64::try_from(bytes).expect("usize overflowed u64"))
}
}

fn stack_buffer_copy<R: Read + ?Sized, W: Write + ?Sized>(
pub fn stack_buffer_copy<R: Read + ?Sized, W: Write + ?Sized>(
reader: &mut R,
writer: &mut W,
) -> Result<u64> {
Expand Down
11 changes: 7 additions & 4 deletions library/std/src/io/copy/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,16 @@ fn copy_specializes_bufreader() {

#[test]
fn copy_specializes_to_vec() {
let cap = 123456;
let mut source = ShortReader { cap, observed_buffer: 0, read_size: 1337 };
let cap = DEFAULT_BUF_SIZE * 10;
let mut source = ShortReader { cap, observed_buffer: 0, read_size: DEFAULT_BUF_SIZE };
let mut sink = Vec::new();
assert_eq!(cap as u64, io::copy(&mut source, &mut sink).unwrap());
let copied = io::copy(&mut source, &mut sink).unwrap();
assert_eq!(cap as u64, copied);
assert_eq!(sink.len() as u64, copied);
assert!(
source.observed_buffer > DEFAULT_BUF_SIZE,
"expected a large buffer to be provided to the reader"
"expected a large buffer to be provided to the reader, got {}",
source.observed_buffer
);
}

Expand Down
118 changes: 81 additions & 37 deletions library/std/src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -397,12 +397,16 @@ where
}
}

// This uses an adaptive system to extend the vector when it fills. We want to
// avoid paying to allocate and zero a huge chunk of memory if the reader only
// has 4 bytes while still making large reads if the reader does have a ton
// of data to return. Simply tacking on an extra DEFAULT_BUF_SIZE space every
// time is 4,500 times (!) slower than a default reservation size of 32 if the
// reader has a very small amount of data to return.
// Here we must serve many masters with conflicting goals:
//
// - avoid allocating unless necessary
// - avoid overallocating if we know the exact size (#89165)
// - avoid passing large buffers to readers that always initialize the free capacity if they perform short reads (#23815, #23820)
// - pass large buffers to readers that do not initialize the spare capacity. this can amortize per-call overheads
// - and finally pass not-too-small and not-too-large buffers to Windows read APIs because they manage to suffer from both problems
// at the same time, i.e. small reads suffer from syscall overhead, all reads incur initialization cost
// proportional to buffer size (#110650)
//
pub(crate) fn default_read_to_end<R: Read + ?Sized>(
r: &mut R,
buf: &mut Vec<u8>,
Expand All @@ -412,20 +416,58 @@ pub(crate) fn default_read_to_end<R: Read + ?Sized>(
let start_cap = buf.capacity();
// Optionally limit the maximum bytes read on each iteration.
// This adds an arbitrary fiddle factor to allow for more data than we expect.
let max_read_size =
size_hint.and_then(|s| s.checked_add(1024)?.checked_next_multiple_of(DEFAULT_BUF_SIZE));
let mut max_read_size = size_hint
.and_then(|s| s.checked_add(1024)?.checked_next_multiple_of(DEFAULT_BUF_SIZE))
.unwrap_or(DEFAULT_BUF_SIZE);

let mut initialized = 0; // Extra initialized bytes from previous loop iteration

const PROBE_SIZE: usize = 32;

fn small_probe_read<R: Read + ?Sized>(r: &mut R, buf: &mut Vec<u8>) -> Result<usize> {
let mut probe = [0u8; PROBE_SIZE];

loop {
match r.read(&mut probe) {
Ok(n) => {
buf.extend_from_slice(&probe[..n]);
return Ok(n);
}
Err(ref e) if e.is_interrupted() => continue,
Err(e) => return Err(e),
}
}
}

// avoid inflating empty/small vecs before we have determined that there's anything to read
if (size_hint.is_none() || size_hint == Some(0)) && buf.capacity() - buf.len() < PROBE_SIZE {
let read = small_probe_read(r, buf)?;

if read == 0 {
return Ok(0);
}
}

loop {
if buf.len() == buf.capacity() && buf.capacity() == start_cap {
// The buffer might be an exact fit. Let's read into a probe buffer
// and see if it returns `Ok(0)`. If so, we've avoided an
// unnecessary doubling of the capacity. But if not, append the
// probe buffer to the primary buffer and let its capacity grow.
let read = small_probe_read(r, buf)?;

if read == 0 {
return Ok(buf.len() - start_len);
}
}

if buf.len() == buf.capacity() {
buf.reserve(32); // buf is full, need more space
buf.reserve(PROBE_SIZE); // buf is full, need more space
}

let mut spare = buf.spare_capacity_mut();
if let Some(size) = max_read_size {
let len = cmp::min(spare.len(), size);
spare = &mut spare[..len]
}
let buf_len = cmp::min(spare.len(), max_read_size);
spare = &mut spare[..buf_len];
let mut read_buf: BorrowedBuf<'_> = spare.into();

// SAFETY: These bytes were initialized but not filled in the previous loop
Expand All @@ -434,42 +476,44 @@ pub(crate) fn default_read_to_end<R: Read + ?Sized>(
}

let mut cursor = read_buf.unfilled();
match r.read_buf(cursor.reborrow()) {
Ok(()) => {}
Err(e) if e.is_interrupted() => continue,
Err(e) => return Err(e),
loop {
match r.read_buf(cursor.reborrow()) {
Ok(()) => break,
Err(e) if e.is_interrupted() => continue,
Err(e) => return Err(e),
}
}

if cursor.written() == 0 {
let unfilled_but_initialized = cursor.init_ref().len();
let bytes_read = cursor.written();
let was_fully_initialized = read_buf.init_len() == buf_len;

if bytes_read == 0 {
return Ok(buf.len() - start_len);
}

// store how much was initialized but not filled
initialized = cursor.init_ref().len();
initialized = unfilled_but_initialized;

// SAFETY: BorrowedBuf's invariants mean this much memory is initialized.
unsafe {
let new_len = read_buf.filled().len() + buf.len();
let new_len = bytes_read + buf.len();
buf.set_len(new_len);
}

if buf.len() == buf.capacity() && buf.capacity() == start_cap {
// The buffer might be an exact fit. Let's read into a probe buffer
// and see if it returns `Ok(0)`. If so, we've avoided an
// unnecessary doubling of the capacity. But if not, append the
// probe buffer to the primary buffer and let its capacity grow.
let mut probe = [0u8; 32];

loop {
match r.read(&mut probe) {
Ok(0) => return Ok(buf.len() - start_len),
Ok(n) => {
buf.extend_from_slice(&probe[..n]);
break;
}
Err(ref e) if e.is_interrupted() => continue,
Err(e) => return Err(e),
}
// Use heuristics to determine the max read size if no initial size hint was provided
if size_hint.is_none() {
// The reader is returning short reads but it doesn't call ensure_init().
// In that case we no longer need to restrict read sizes to avoid
// initialization costs.
if !was_fully_initialized {
max_read_size = usize::MAX;
}

// we have passed a larger buffer than previously and the
// reader still hasn't returned a short read
if buf_len >= max_read_size && bytes_read == buf_len {
max_read_size = max_read_size.saturating_mul(2);
}
}
}
Expand Down

0 comments on commit 97ef5a3

Please sign in to comment.