Skip to content

Commit

Permalink
Auto merge of #98748 - saethlin:optimize-bufreader, r=Mark-Simulacrum
Browse files Browse the repository at this point in the history
Remove some redundant checks from BufReader

The implementation of BufReader contains a lot of redundant checks. While any one of these checks is not particularly expensive to execute, especially when taken together they dramatically inhibit LLVM's ability to make subsequent optimizations by confusing data flow increasing the code size of anything that uses BufReader.

In particular, these changes have a ~2x increase on the benchmark that this adds a `black_box` to. I'm adding that `black_box` here just in case LLVM gets clever enough to remove the reads entirely. Right now it can't, but these optimizations are really setting it up to do so.

We get this optimization by factoring all the actual buffer management and bounds-checking logic into a new module inside `bufreader` with a new `Buffer` type. This makes it much easier to ensure that we have correctly encapsulated the management of the region of the buffer that we have read bytes into, and it lets us provide a new faster way to do small reads. `Buffer::consume_with` lets a caller do a read from the buffer with a single bounds check, instead of the double-check that's required to use `buffer` + `consume`.

Unfortunately I'm not aware of a lot of open-source usage of `BufReader` in perf-critical environments. Some time ago I tweaked this code because I saw `BufReader` in a profile at work, and I contributed some benchmarks to the `bincode` crate which exercise `BufReader::buffer`. These changes appear to help those benchmarks at little, but all these sorts of benchmarks are kind of fragile so I'm wary of quoting anything specific.
  • Loading branch information
bors committed Jul 27, 2022
2 parents ff693dc + 5fa1926 commit 50166d5
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 53 deletions.
81 changes: 28 additions & 53 deletions library/std/src/io/buffered/bufreader.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use crate::cmp;
mod buffer;

use crate::fmt;
use crate::io::{
self, BufRead, IoSliceMut, Read, ReadBuf, Seek, SeekFrom, SizeHint, DEFAULT_BUF_SIZE,
};
use crate::mem::MaybeUninit;
use buffer::Buffer;

/// The `BufReader<R>` struct adds buffering to any reader.
///
Expand Down Expand Up @@ -48,10 +49,7 @@ use crate::mem::MaybeUninit;
#[stable(feature = "rust1", since = "1.0.0")]
pub struct BufReader<R> {
inner: R,
buf: Box<[MaybeUninit<u8>]>,
pos: usize,
cap: usize,
init: usize,
buf: Buffer,
}

impl<R: Read> BufReader<R> {
Expand Down Expand Up @@ -93,8 +91,7 @@ impl<R: Read> BufReader<R> {
/// ```
#[stable(feature = "rust1", since = "1.0.0")]
pub fn with_capacity(capacity: usize, inner: R) -> BufReader<R> {
let buf = Box::new_uninit_slice(capacity);
BufReader { inner, buf, pos: 0, cap: 0, init: 0 }
BufReader { inner, buf: Buffer::with_capacity(capacity) }
}
}

Expand Down Expand Up @@ -170,8 +167,7 @@ impl<R> BufReader<R> {
/// ```
#[stable(feature = "bufreader_buffer", since = "1.37.0")]
pub fn buffer(&self) -> &[u8] {
// SAFETY: self.cap is always <= self.init, so self.buf[self.pos..self.cap] is always init
unsafe { MaybeUninit::slice_assume_init_ref(&self.buf[self.pos..self.cap]) }
self.buf.buffer()
}

/// Returns the number of bytes the internal buffer can hold at once.
Expand All @@ -194,7 +190,7 @@ impl<R> BufReader<R> {
/// ```
#[stable(feature = "buffered_io_capacity", since = "1.46.0")]
pub fn capacity(&self) -> usize {
self.buf.len()
self.buf.capacity()
}

/// Unwraps this `BufReader<R>`, returning the underlying reader.
Expand Down Expand Up @@ -224,8 +220,7 @@ impl<R> BufReader<R> {
/// Invalidates all data in the internal buffer.
#[inline]
fn discard_buffer(&mut self) {
self.pos = 0;
self.cap = 0;
self.buf.discard_buffer()
}
}

Expand All @@ -236,15 +231,15 @@ impl<R: Seek> BufReader<R> {
/// must track this information themselves if it is required.
#[stable(feature = "bufreader_seek_relative", since = "1.53.0")]
pub fn seek_relative(&mut self, offset: i64) -> io::Result<()> {
let pos = self.pos as u64;
let pos = self.buf.pos() as u64;
if offset < 0 {
if let Some(new_pos) = pos.checked_sub((-offset) as u64) {
self.pos = new_pos as usize;
if let Some(_) = pos.checked_sub((-offset) as u64) {
self.buf.unconsume((-offset) as usize);
return Ok(());
}
} else if let Some(new_pos) = pos.checked_add(offset as u64) {
if new_pos <= self.cap as u64 {
self.pos = new_pos as usize;
if new_pos <= self.buf.filled() as u64 {
self.buf.consume(offset as usize);
return Ok(());
}
}
Expand All @@ -259,7 +254,7 @@ impl<R: Read> Read for BufReader<R> {
// If we don't have any buffered data and we're doing a massive read
// (larger than our internal buffer), bypass our internal buffer
// entirely.
if self.pos == self.cap && buf.len() >= self.buf.len() {
if self.buf.pos() == self.buf.filled() && buf.len() >= self.capacity() {
self.discard_buffer();
return self.inner.read(buf);
}
Expand All @@ -275,7 +270,7 @@ impl<R: Read> Read for BufReader<R> {
// If we don't have any buffered data and we're doing a massive read
// (larger than our internal buffer), bypass our internal buffer
// entirely.
if self.pos == self.cap && buf.remaining() >= self.buf.len() {
if self.buf.pos() == self.buf.filled() && buf.remaining() >= self.capacity() {
self.discard_buffer();
return self.inner.read_buf(buf);
}
Expand All @@ -295,9 +290,7 @@ impl<R: Read> Read for BufReader<R> {
// generation for the common path where the buffer has enough bytes to fill the passed-in
// buffer.
fn read_exact(&mut self, buf: &mut [u8]) -> io::Result<()> {
if self.buffer().len() >= buf.len() {
buf.copy_from_slice(&self.buffer()[..buf.len()]);
self.consume(buf.len());
if self.buf.consume_with(buf.len(), |claimed| buf.copy_from_slice(claimed)) {
return Ok(());
}

Expand All @@ -306,7 +299,7 @@ impl<R: Read> Read for BufReader<R> {

fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
let total_len = bufs.iter().map(|b| b.len()).sum::<usize>();
if self.pos == self.cap && total_len >= self.buf.len() {
if self.buf.pos() == self.buf.filled() && total_len >= self.capacity() {
self.discard_buffer();
return self.inner.read_vectored(bufs);
}
Expand All @@ -325,8 +318,9 @@ impl<R: Read> Read for BufReader<R> {
// The inner reader might have an optimized `read_to_end`. Drain our buffer and then
// delegate to the inner implementation.
fn read_to_end(&mut self, buf: &mut Vec<u8>) -> io::Result<usize> {
let nread = self.cap - self.pos;
buf.extend_from_slice(&self.buffer());
let inner_buf = self.buffer();
buf.extend_from_slice(inner_buf);
let nread = inner_buf.len();
self.discard_buffer();
Ok(nread + self.inner.read_to_end(buf)?)
}
Expand Down Expand Up @@ -371,33 +365,11 @@ impl<R: Read> Read for BufReader<R> {
#[stable(feature = "rust1", since = "1.0.0")]
impl<R: Read> BufRead for BufReader<R> {
fn fill_buf(&mut self) -> io::Result<&[u8]> {
// If we've reached the end of our internal buffer then we need to fetch
// some more data from the underlying reader.
// Branch using `>=` instead of the more correct `==`
// to tell the compiler that the pos..cap slice is always valid.
if self.pos >= self.cap {
debug_assert!(self.pos == self.cap);

let mut readbuf = ReadBuf::uninit(&mut self.buf);

// SAFETY: `self.init` is either 0 or set to `readbuf.initialized_len()`
// from the last time this function was called
unsafe {
readbuf.assume_init(self.init);
}

self.inner.read_buf(&mut readbuf)?;

self.cap = readbuf.filled_len();
self.init = readbuf.initialized_len();

self.pos = 0;
}
Ok(self.buffer())
self.buf.fill_buf(&mut self.inner)
}

fn consume(&mut self, amt: usize) {
self.pos = cmp::min(self.pos + amt, self.cap);
self.buf.consume(amt)
}
}

Expand All @@ -409,7 +381,10 @@ where
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("BufReader")
.field("reader", &self.inner)
.field("buffer", &format_args!("{}/{}", self.cap - self.pos, self.buf.len()))
.field(
"buffer",
&format_args!("{}/{}", self.buf.filled() - self.buf.pos(), self.capacity()),
)
.finish()
}
}
Expand Down Expand Up @@ -441,7 +416,7 @@ impl<R: Seek> Seek for BufReader<R> {
fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
let result: u64;
if let SeekFrom::Current(n) = pos {
let remainder = (self.cap - self.pos) as i64;
let remainder = (self.buf.filled() - self.buf.pos()) as i64;
// it should be safe to assume that remainder fits within an i64 as the alternative
// means we managed to allocate 8 exbibytes and that's absurd.
// But it's not out of the realm of possibility for some weird underlying reader to
Expand Down Expand Up @@ -499,7 +474,7 @@ impl<R: Seek> Seek for BufReader<R> {
/// }
/// ```
fn stream_position(&mut self) -> io::Result<u64> {
let remainder = (self.cap - self.pos) as u64;
let remainder = (self.buf.filled() - self.buf.pos()) as u64;
self.inner.stream_position().map(|pos| {
pos.checked_sub(remainder).expect(
"overflow when subtracting remaining buffer size from inner stream position",
Expand Down
105 changes: 105 additions & 0 deletions library/std/src/io/buffered/bufreader/buffer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
///! An encapsulation of `BufReader`'s buffer management logic.
///
/// This module factors out the basic functionality of `BufReader` in order to protect two core
/// invariants:
/// * `filled` bytes of `buf` are always initialized
/// * `pos` is always <= `filled`
/// Since this module encapsulates the buffer management logic, we can ensure that the range
/// `pos..filled` is always a valid index into the initialized region of the buffer. This means
/// that user code which wants to do reads from a `BufReader` via `buffer` + `consume` can do so
/// without encountering any runtime bounds checks.
use crate::cmp;
use crate::io::{self, Read, ReadBuf};
use crate::mem::MaybeUninit;

pub struct Buffer {
// The buffer.
buf: Box<[MaybeUninit<u8>]>,
// The current seek offset into `buf`, must always be <= `filled`.
pos: usize,
// Each call to `fill_buf` sets `filled` to indicate how many bytes at the start of `buf` are
// initialized with bytes from a read.
filled: usize,
}

impl Buffer {
#[inline]
pub fn with_capacity(capacity: usize) -> Self {
let buf = Box::new_uninit_slice(capacity);
Self { buf, pos: 0, filled: 0 }
}

#[inline]
pub fn buffer(&self) -> &[u8] {
// SAFETY: self.pos and self.cap are valid, and self.cap => self.pos, and
// that region is initialized because those are all invariants of this type.
unsafe { MaybeUninit::slice_assume_init_ref(self.buf.get_unchecked(self.pos..self.filled)) }
}

#[inline]
pub fn capacity(&self) -> usize {
self.buf.len()
}

#[inline]
pub fn filled(&self) -> usize {
self.filled
}

#[inline]
pub fn pos(&self) -> usize {
self.pos
}

#[inline]
pub fn discard_buffer(&mut self) {
self.pos = 0;
self.filled = 0;
}

#[inline]
pub fn consume(&mut self, amt: usize) {
self.pos = cmp::min(self.pos + amt, self.filled);
}

/// If there are `amt` bytes available in the buffer, pass a slice containing those bytes to
/// `visitor` and return true. If there are not enough bytes available, return false.
#[inline]
pub fn consume_with<V>(&mut self, amt: usize, mut visitor: V) -> bool
where
V: FnMut(&[u8]),
{
if let Some(claimed) = self.buffer().get(..amt) {
visitor(claimed);
// If the indexing into self.buffer() succeeds, amt must be a valid increment.
self.pos += amt;
true
} else {
false
}
}

#[inline]
pub fn unconsume(&mut self, amt: usize) {
self.pos = self.pos.saturating_sub(amt);
}

#[inline]
pub fn fill_buf(&mut self, mut reader: impl Read) -> io::Result<&[u8]> {
// If we've reached the end of our internal buffer then we need to fetch
// some more data from the reader.
// Branch using `>=` instead of the more correct `==`
// to tell the compiler that the pos..cap slice is always valid.
if self.pos >= self.filled {
debug_assert!(self.pos == self.filled);

let mut readbuf = ReadBuf::uninit(&mut self.buf);

reader.read_buf(&mut readbuf)?;

self.filled = readbuf.filled_len();
self.pos = 0;
}
Ok(self.buffer())
}
}
1 change: 1 addition & 0 deletions library/std/src/io/buffered/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,7 @@ fn bench_buffered_reader_small_reads(b: &mut test::Bencher) {
let mut buf = [0u8; 4];
for _ in 0..1024 {
reader.read_exact(&mut buf).unwrap();
core::hint::black_box(&buf);
}
});
}
Expand Down

0 comments on commit 50166d5

Please sign in to comment.