From 30ef24c8947b43832492a92828bf9b62f3b55c76 Mon Sep 17 00:00:00 2001 From: Cameron Bytheway Date: Tue, 14 May 2024 07:14:40 -0600 Subject: [PATCH] feat(s2n-quic-core): add buffer::Deque implemention (#2207) --- quic/s2n-quic-core/src/buffer.rs | 2 + quic/s2n-quic-core/src/buffer/deque.rs | 237 ++++++++++++++ .../s2n-quic-core/src/buffer/deque/storage.rs | 213 +++++++++++++ quic/s2n-quic-core/src/buffer/deque/tests.rs | 186 +++++++++++ .../src/buffer/duplex/interposer.rs | 4 +- .../src/buffer/reader/storage/chunk.rs | 7 + .../src/buffer/reader/storage/slice.rs | 57 +++- .../src/buffer/writer/storage.rs | 1 + .../src/buffer/writer/storage/vec_deque.rs | 16 + quic/s2n-quic-core/src/slice.rs | 2 + quic/s2n-quic-core/src/slice/deque.rs | 291 ++++++++++++++++++ quic/s2n-quic-core/src/slice/deque/pair.rs | 283 +++++++++++++++++ 12 files changed, 1292 insertions(+), 7 deletions(-) create mode 100644 quic/s2n-quic-core/src/buffer/deque.rs create mode 100644 quic/s2n-quic-core/src/buffer/deque/storage.rs create mode 100644 quic/s2n-quic-core/src/buffer/deque/tests.rs create mode 100644 quic/s2n-quic-core/src/buffer/writer/storage/vec_deque.rs create mode 100644 quic/s2n-quic-core/src/slice/deque.rs create mode 100644 quic/s2n-quic-core/src/slice/deque/pair.rs diff --git a/quic/s2n-quic-core/src/buffer.rs b/quic/s2n-quic-core/src/buffer.rs index 3ae9be32d3..d9558d03e8 100644 --- a/quic/s2n-quic-core/src/buffer.rs +++ b/quic/s2n-quic-core/src/buffer.rs @@ -1,12 +1,14 @@ // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 +pub mod deque; pub mod duplex; mod error; pub mod reader; pub mod reassembler; pub mod writer; +pub use deque::Deque; pub use duplex::Duplex; pub use error::Error; pub use reader::Reader; diff --git a/quic/s2n-quic-core/src/buffer/deque.rs b/quic/s2n-quic-core/src/buffer/deque.rs new file mode 100644 index 0000000000..10f23cccea --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/deque.rs @@ -0,0 +1,237 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use crate::slice::deque; +use alloc::{boxed::Box, vec::Vec}; +use core::{fmt, mem::MaybeUninit}; + +mod storage; + +#[cfg(test)] +mod tests; + +/// A fixed-capacity ring buffer for bytes +#[derive(Clone)] +pub struct Deque { + bytes: Box<[MaybeUninit]>, + head: usize, + len: usize, +} + +impl fmt::Debug for Deque { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("Deque") + .field("len", &self.len()) + .field("capacity", &self.capacity()) + .finish() + } +} + +impl From> for Deque { + #[inline] + fn from(mut buffer: Vec) -> Deque { + let len = buffer.len(); + let mut capacity = buffer.capacity(); + if !capacity.is_power_of_two() { + capacity = capacity.next_power_of_two(); + buffer.reserve_exact(capacity - len); + debug_assert!(capacity.is_power_of_two()); + } + + unsafe { + buffer.set_len(capacity); + } + + let bytes = buffer.into_boxed_slice(); + let ptr = Box::into_raw(bytes); + let bytes = unsafe { Box::from_raw(ptr as *mut [MaybeUninit]) }; + + Self { + bytes, + head: 0, + len, + } + } +} + +impl Deque { + #[inline] + pub fn new(mut capacity: usize) -> Self { + // Make sure capacity is set to a power of two + // https://doc.rust-lang.org/std/primitive.usize.html#method.next_power_of_two + //> Returns the smallest power of two greater than or equal to self. + capacity = capacity.next_power_of_two(); + + let mut bytes = Vec::>::with_capacity(capacity); + unsafe { + bytes.set_len(capacity); + } + let bytes = bytes.into_boxed_slice(); + + Self { + bytes, + head: 0, + len: 0, + } + } + + #[inline] + pub fn capacity(&self) -> usize { + self.bytes.len() + } + + #[inline] + pub fn remaining_capacity(&self) -> usize { + self.preconditions(); + self.capacity() - self.len() + } + + #[inline] + pub fn len(&self) -> usize { + self.len + } + + #[inline] + pub fn is_empty(&self) -> bool { + self.len == 0 + } + + /// Resets the filled bytes in the buffer + /// + /// Note that data is not actually wiped with this method. If that behavior is desired then + /// calling [`Self::consume_filled`] should be preferred. + #[inline] + pub fn clear(&mut self) { + self.head = 0; + self.len = 0; + } + + /// Consumes `len` bytes from the head of the buffer + /// + /// # Panics + /// + /// `len` MUST be less than or equal to [`Self::len`] + #[inline] + pub fn consume(&mut self, len: usize) { + self.preconditions(); + + assert!(self.len() >= len); + + if len >= self.len() { + self.clear(); + return; + } + + // Wrap the head around the capacity + self.head = deque::wrap(&self.bytes, self.head, len); + self.len -= len; + + self.postconditions() + } + + /// Returns the filled bytes in the buffer + #[inline] + pub fn filled(&mut self) -> deque::Pair<&mut [u8]> { + self.preconditions(); + + unsafe { + // SAFETY: cursors guarantee memory is filled + deque::filled(&mut self.bytes, self.head, self.len).assume_init_mut() + } + } + + /// Returns and consumes `len` filled bytes in the buffer + /// + /// # Panics + /// + /// `len` MUST be less than or equal to [`Self::len`] + #[inline] + pub fn consume_filled(&mut self, len: usize) -> deque::Pair<&mut [u8]> { + self.preconditions(); + + let head = self.head; + + self.consume(len); + + self.postconditions(); + + unsafe { + // SAFETY: cursors guarantee memory is filled + deque::filled(&mut self.bytes, head, len).assume_init_mut() + } + } + + /// Returns the unfilled bytes in the buffer + /// + /// Callers will need to call [`Self::fill`] to indicate any writes that occurred to returned + /// slices. + #[inline] + pub fn unfilled(&mut self) -> deque::Pair<&mut [MaybeUninit]> { + self.preconditions(); + deque::unfilled(&mut self.bytes, self.head, self.len) + } + + /// Makes the buffer contiguous and contained in a single slice + #[inline] + pub fn make_contiguous(&mut self) -> &mut [u8] { + self.preconditions(); + deque::make_contiguous(&mut self.bytes, &mut self.head, self.len); + self.postconditions(); + + let (head, tail) = self.filled().into(); + debug_assert!(tail.is_empty()); + head + } + + /// Notifies the buffer that `len` bytes were written to it + /// + /// # Safety + /// + /// Callers must ensure the filled bytes were actually initialized + #[inline] + pub unsafe fn fill(&mut self, len: usize) -> Result<(), FillError> { + ensure!(self.remaining_capacity() >= len, Err(FillError(()))); + + self.len += len; + + self.postconditions(); + + Ok(()) + } + + #[inline(always)] + fn preconditions(&self) { + unsafe { + assume!(deque::invariants(&self.bytes, self.head, self.len)); + assume!(self.capacity().is_power_of_two()); + } + } + + #[inline(always)] + fn postconditions(&self) { + debug_assert!(deque::invariants(&self.bytes, self.head, self.len)); + } +} + +#[derive(Clone, Copy, Debug, Default)] +pub struct FillError(()); + +impl fmt::Display for FillError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!( + f, + "the buffer does not have enough capacity for the provided fill amount" + ) + } +} + +#[cfg(feature = "std")] +impl std::error::Error for FillError {} + +#[cfg(feature = "std")] +impl From for std::io::Error { + #[inline] + fn from(value: FillError) -> Self { + Self::new(std::io::ErrorKind::InvalidInput, value) + } +} diff --git a/quic/s2n-quic-core/src/buffer/deque/storage.rs b/quic/s2n-quic-core/src/buffer/deque/storage.rs new file mode 100644 index 0000000000..9d29d02a9e --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/deque/storage.rs @@ -0,0 +1,213 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use crate::buffer::{ + reader::{self, storage::Chunk}, + writer, +}; +use bytes::buf::UninitSlice; + +impl writer::Storage for super::Deque { + #[inline] + fn put_slice(&mut self, bytes: &[u8]) { + self.unfilled().put_slice(bytes); + + unsafe { + // SAFETY: we write `len` bytes with `put_slice` + self.fill(bytes.len()).unwrap(); + } + } + + #[inline] + fn put_uninit_slice(&mut self, payload_len: usize, f: F) -> Result + where + F: FnOnce(&mut UninitSlice) -> Result<(), Error>, + { + let (mut head, _) = self.unfilled().into(); + + let did_write = head.put_uninit_slice(payload_len, f)?; + + if did_write { + unsafe { + // SAFETY: the caller wrote into the bytes + self.fill(payload_len).unwrap(); + } + } + + Ok(did_write) + } + + #[inline] + fn remaining_capacity(&self) -> usize { + (*self).remaining_capacity() + } +} + +impl reader::Storage for super::Deque { + type Error = core::convert::Infallible; + + #[inline] + fn buffered_len(&self) -> usize { + self.len() + } + + #[inline] + fn read_chunk(&mut self, watermark: usize) -> Result, Self::Error> { + ensure!(watermark > 0 && !self.is_empty(), Ok(Chunk::default())); + + // compute how many bytes we need to consume + let len = { + let (head, _) = self.filled().into(); + debug_assert!(!head.is_empty()); + + head.len().min(watermark) + }; + + let (head, _) = self.consume_filled(len).into(); + + Ok(head[..].into()) + } + + #[inline] + fn partial_copy_into(&mut self, dest: &mut Dest) -> Result, Self::Error> + where + Dest: writer::Storage + ?Sized, + { + ensure!( + dest.has_remaining_capacity() && !self.is_empty(), + Ok(Chunk::default()) + ); + + let len = self.len().min(dest.remaining_capacity()); + + let should_return_tail = { + let (head, _tail) = self.filled().into(); + + // if the head isn't enough to fill the watermark then we also need to return the tail + head.len() < len + }; + + let (head, tail) = self.consume_filled(len).into(); + + if should_return_tail { + dest.put_slice(head); + Ok(tail[..].into()) + } else { + Ok(head[..].into()) + } + } + + #[inline] + fn copy_into(&mut self, dest: &mut Dest) -> Result<(), Self::Error> + where + Dest: writer::Storage + ?Sized, + { + ensure!(dest.has_remaining_capacity() && !self.is_empty(), Ok(())); + + let len = self.len().min(dest.remaining_capacity()); + + self.consume_filled(len).copy_into(dest) + } +} + +#[cfg(test)] +mod tests { + use super::{ + super::Deque, + reader::{storage::Infallible as _, Reader}, + writer::Storage as _, + }; + use crate::stream::testing::Data; + use bolero::{check, TypeGenerator}; + + #[test] + fn storage_test() { + let cap = 16; + let mut buffer = Deque::new(cap); + assert_eq!(buffer.remaining_capacity(), cap); + + buffer.put_slice(b"hello"); + buffer.put_slice(b" "); + buffer.put_slice(b"world"); + + let chunk = buffer.infallible_read_chunk(7); + assert_eq!(&chunk[..], b"hello w"); + + let chunk = buffer.infallible_read_chunk(3); + assert_eq!(&chunk[..], b"orl"); + + buffer.put_slice(&[42u8; 15]); + + let mut out: Vec = vec![]; + let chunk = buffer.infallible_partial_copy_into(&mut out); + assert_eq!(&out[..], &[b'd', 42, 42, 42, 42, 42]); + assert_eq!(&chunk[..], &[42u8; 10]); + } + + #[derive(Clone, Copy, Debug, TypeGenerator)] + enum Op { + Put { len: u16 }, + ReadChunk { watermark: u16 }, + PartialCopy { watermark: u16 }, + FullCopy { watermark: u16 }, + } + + #[derive(Debug)] + struct Model { + buffer: Deque, + send: Data, + recv: Data, + } + + impl Default for Model { + fn default() -> Self { + Self { + buffer: Deque::new(u8::MAX as _), + send: Data::new(usize::MAX as _), + recv: Data::new(usize::MAX as _), + } + } + } + + impl Model { + fn apply_all(&mut self, ops: &[Op]) { + for op in ops { + self.apply(op); + } + while !self.buffer.is_empty() { + self.apply(&Op::FullCopy { + watermark: u16::MAX, + }); + } + } + + fn apply(&mut self, op: &Op) { + match *op { + Op::Put { len } => { + let mut stream = self.send.with_read_limit(len as _); + stream.infallible_copy_into(&mut self.buffer); + } + Op::ReadChunk { watermark } => { + let chunk = self.buffer.infallible_read_chunk(watermark as _); + self.recv.receive(&[chunk]); + } + Op::PartialCopy { watermark } => { + let mut recv = self.recv.with_write_limit(watermark as _); + let chunk = self.buffer.infallible_partial_copy_into(&mut recv); + self.recv.receive(&[chunk]); + } + Op::FullCopy { watermark } => { + let mut recv = self.recv.with_write_limit(watermark as _); + self.buffer.infallible_copy_into(&mut recv); + } + } + } + } + + #[test] + fn model_test() { + check!() + .with_type::>() + .for_each(|ops| Model::default().apply_all(ops)) + } +} diff --git a/quic/s2n-quic-core/src/buffer/deque/tests.rs b/quic/s2n-quic-core/src/buffer/deque/tests.rs new file mode 100644 index 0000000000..3410fb6eda --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/deque/tests.rs @@ -0,0 +1,186 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use super::*; +use crate::buffer::writer::Storage as _; +use bolero::{check, TypeGenerator}; +use std::collections::VecDeque; + +// shrink the search space with kani +const CAPACITY: usize = if cfg!(kani) { 4 } else { u8::MAX as usize + 1 }; +const OPS_LEN: usize = if cfg!(kani) { 2 } else { u8::MAX as usize + 1 }; + +#[derive(Clone, Copy, Debug, TypeGenerator)] +enum Op { + Recv { amount: u16, skip: u8 }, + Consume { amount: u16 }, + MakeContiguous, + Clear, +} + +#[derive(Debug)] +struct Model { + oracle: VecDeque, + subject: Deque, + remaining_capacity: usize, + byte: u8, +} + +impl Default for Model { + fn default() -> Self { + let capacity = CAPACITY; + + let buffer = Deque::new(capacity); + let remaining_capacity = buffer.capacity(); + Self { + oracle: Default::default(), + subject: buffer, + remaining_capacity, + byte: 0, + } + } +} + +impl Model { + fn apply_all(&mut self, ops: &[Op]) { + for op in ops { + self.apply(*op); + } + self.invariants(); + } + + #[inline] + fn pattern(&mut self, amount: usize, skip: u8) -> impl Iterator + Clone { + let base = self.byte as usize + skip as usize; + + let iter = core::iter::repeat(base as u8).take(amount); + + self.byte = (base + amount) as u8; + + iter + } + + fn apply(&mut self, op: Op) { + match op { + Op::Recv { amount, skip } => { + let amount = self.remaining_capacity.min(amount as usize); + let mut pattern = self.pattern(amount, skip); + + self.oracle.extend(pattern.clone()); + + let mut pair = self.subject.unfilled(); + + if amount > 0 { + assert!(pair.has_remaining_capacity()); + } + + assert!(amount <= pair.remaining_capacity()); + + // copy the pattern into the unfilled portion + for (a, b) in pair.iter_mut().zip(&mut pattern) { + *a = MaybeUninit::new(b); + } + + assert!(pattern.next().is_none(), "pattern should be drained"); + + unsafe { + self.subject.fill(amount).unwrap(); + } + + self.remaining_capacity -= amount; + } + Op::Consume { amount } => { + let amount = self.oracle.len().min(amount as usize); + self.subject.consume(amount as _); + + if amount > 0 { + self.oracle.drain(..amount); + self.remaining_capacity += amount; + } + } + Op::MakeContiguous => { + self.subject.make_contiguous(); + } + Op::Clear => { + self.subject.clear(); + self.oracle.clear(); + self.remaining_capacity = self.subject.capacity(); + } + } + } + + #[cfg(not(kani))] + fn invariants(&mut self) { + self.invariants_common(); + + let filled = self.subject.filled(); + let subject = filled.iter(); + let oracle = self.oracle.iter(); + + assert!( + subject.eq(oracle), + "subject ({:?}) == oracle ({:?})", + { + let (head, tail) = self.subject.filled().into(); + (&head[..head.len().min(10)], &tail[..tail.len().min(10)]) + }, + { + let (head, tail) = self.oracle.as_slices(); + (&head[..head.len().min(10)], &tail[..tail.len().min(10)]) + } + ); + + // we include the length just to make sure the case where we exceed the length returns + // `None` + for idx in 0..=self.subject.len() { + assert_eq!(self.oracle.get(idx), self.subject.filled().get(idx)); + } + } + + #[cfg(kani)] + fn invariants(&mut self) { + self.invariants_common(); + + let idx = kani::any(); + // we include the length just to make sure the case where we exceed the length returns + // `None` + kani::assume(idx <= self.subject.len()); + + assert_eq!(self.oracle.get(idx), self.subject.filled().get(idx)); + } + + fn invariants_common(&self) { + assert_eq!(self.subject.len(), self.oracle.len()); + assert_eq!(self.subject.remaining_capacity(), self.remaining_capacity); + } +} + +#[test] +#[cfg_attr(kani, kani::proof, kani::unwind(5), kani::solver(cadical))] +// even with the minimal amount of parameter bounds, this proof's memory consumption explodes +#[cfg_attr(kani, cfg(kani_slow))] +fn model_test() { + let ops = bolero::gen::>().with().len(..=OPS_LEN); + + check!().with_generator(ops).for_each(|ops| { + let mut model = Model::default(); + model.apply_all(ops); + }) +} + +#[test] +fn from_vec_test() { + check!().for_each(|slice| { + let extra_capacity = slice.get(..2).map_or(0, |cap| { + let cap: &[u8; 2] = cap.try_into().unwrap(); + u16::from_ne_bytes(*cap) as usize + }); + + let mut vec = slice.to_vec(); + vec.reserve_exact(extra_capacity); + let mut buffer: Deque = vec.into(); + + let (head, _tail) = buffer.filled().into(); + assert_eq!(slice, head); + }) +} diff --git a/quic/s2n-quic-core/src/buffer/duplex/interposer.rs b/quic/s2n-quic-core/src/buffer/duplex/interposer.rs index aa70deb2b4..153da8cb76 100644 --- a/quic/s2n-quic-core/src/buffer/duplex/interposer.rs +++ b/quic/s2n-quic-core/src/buffer/duplex/interposer.rs @@ -33,7 +33,9 @@ where pub fn new(storage: &'a mut S, duplex: &'a mut D) -> Self { debug_assert!( !storage.has_remaining_capacity() || duplex.buffer_is_empty(), - "`duplex` should be drained into `storage` before constructing an Interposer" + "`duplex` (len={}) should be drained into `storage` (cap={}) before constructing an Interposer", + duplex.buffered_len(), + storage.remaining_capacity() ); Self { storage, duplex } diff --git a/quic/s2n-quic-core/src/buffer/reader/storage/chunk.rs b/quic/s2n-quic-core/src/buffer/reader/storage/chunk.rs index 3fb7b04c81..52a26076d1 100644 --- a/quic/s2n-quic-core/src/buffer/reader/storage/chunk.rs +++ b/quic/s2n-quic-core/src/buffer/reader/storage/chunk.rs @@ -63,6 +63,13 @@ impl<'a> core::ops::Deref for Chunk<'a> { } } +impl<'a> AsRef<[u8]> for Chunk<'a> { + #[inline] + fn as_ref(&self) -> &[u8] { + self + } +} + impl<'a> Storage for Chunk<'a> { type Error = core::convert::Infallible; diff --git a/quic/s2n-quic-core/src/buffer/reader/storage/slice.rs b/quic/s2n-quic-core/src/buffer/reader/storage/slice.rs index 4c7b29ce32..5cffb8240b 100644 --- a/quic/s2n-quic-core/src/buffer/reader/storage/slice.rs +++ b/quic/s2n-quic-core/src/buffer/reader/storage/slice.rs @@ -1,6 +1,13 @@ // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 +// ignore these for macro consistency +#![allow( + clippy::mem_replace_with_default, + clippy::redundant_closure_call, + unused_mut +)] + use crate::{ buffer::{ reader::{storage::Chunk, Storage}, @@ -10,7 +17,7 @@ use crate::{ }; macro_rules! impl_slice { - ($ty:ty, $split:ident) => { + ($ty:ty, $default:expr, $split:ident $(, $extend:expr, $new:expr)?) => { impl Storage for $ty { type Error = core::convert::Infallible; @@ -29,7 +36,15 @@ macro_rules! impl_slice { ensure!(!self.is_empty(), Ok(Chunk::empty())); let len = self.len().min(watermark); // use `take` to work around borrowing rules - let (chunk, remaining) = core::mem::take(self).$split(len); + let mut v = core::mem::replace(self, $default); + let (chunk, remaining) = v.$split(len); + $( + let chunk = ($extend)(chunk); + let remaining = ($extend)(remaining); + )? + $( + let remaining = ($new)(remaining); + )? *self = remaining; Ok((&*chunk).into()) } @@ -50,14 +65,44 @@ macro_rules! impl_slice { ensure!(!self.is_empty(), Ok(())); let len = self.len().min(dest.remaining_capacity()); // use `take` to work around borrowing rules - let (chunk, remaining) = core::mem::take(self).$split(len); - *self = remaining; + let mut v = core::mem::replace(self, $default); + let (chunk, remaining) = v.$split(len); dest.put_slice(chunk); + $( + let remaining = ($extend)(remaining); + let remaining = ($new)(remaining); + )? + *self = remaining; Ok(()) } } }; } -impl_slice!(&[u8], split_at); -impl_slice!(&mut [u8], split_at_mut); +impl_slice!(&[u8], &[], split_at); +impl_slice!(&mut [u8], &mut [], split_at_mut); + +#[cfg(feature = "std")] +impl_slice!( + std::io::IoSlice<'_>, + std::io::IoSlice::new(&[]), + split_at, + |chunk: &[u8]| unsafe { + // SAFETY: we're using transmute to extend the lifetime of the chunk to `self` + // Upstream tracking: https://github.com/rust-lang/rust/issues/124659 + core::mem::transmute::<&[u8], &[u8]>(chunk) + }, + std::io::IoSlice::new +); +#[cfg(feature = "std")] +impl_slice!( + std::io::IoSliceMut<'_>, + std::io::IoSliceMut::new(&mut []), + split_at_mut, + |chunk: &mut [u8]| unsafe { + // SAFETY: we're using transmute to extend the lifetime of the chunk to `self` + // Upstream tracking: https://github.com/rust-lang/rust/issues/124659 + core::mem::transmute::<&mut [u8], &mut [u8]>(chunk) + }, + std::io::IoSliceMut::new +); diff --git a/quic/s2n-quic-core/src/buffer/writer/storage.rs b/quic/s2n-quic-core/src/buffer/writer/storage.rs index 47b155aa0b..74c8bd2905 100644 --- a/quic/s2n-quic-core/src/buffer/writer/storage.rs +++ b/quic/s2n-quic-core/src/buffer/writer/storage.rs @@ -11,6 +11,7 @@ mod empty; mod limit; mod tracked; mod uninit_slice; +mod vec_deque; mod write_once; pub use buf::BufMut; diff --git a/quic/s2n-quic-core/src/buffer/writer/storage/vec_deque.rs b/quic/s2n-quic-core/src/buffer/writer/storage/vec_deque.rs new file mode 100644 index 0000000000..cd03551338 --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/writer/storage/vec_deque.rs @@ -0,0 +1,16 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use alloc::collections::VecDeque; + +impl super::Storage for VecDeque { + #[inline] + fn put_slice(&mut self, bytes: &[u8]) { + self.extend(bytes); + } + + #[inline] + fn remaining_capacity(&self) -> usize { + (isize::MAX as usize) - self.len() + } +} diff --git a/quic/s2n-quic-core/src/slice.rs b/quic/s2n-quic-core/src/slice.rs index 45e54e9b8b..6ae31c37b3 100644 --- a/quic/s2n-quic-core/src/slice.rs +++ b/quic/s2n-quic-core/src/slice.rs @@ -3,6 +3,8 @@ use core::ops::{Deref, DerefMut}; +pub mod deque; + /// Copies vectored slices from one slice into another /// /// The number of copied items is limited by the minimum of the lengths of each of the slices. diff --git a/quic/s2n-quic-core/src/slice/deque.rs b/quic/s2n-quic-core/src/slice/deque.rs new file mode 100644 index 0000000000..6e3c670a1e --- /dev/null +++ b/quic/s2n-quic-core/src/slice/deque.rs @@ -0,0 +1,291 @@ +// Copyright The Rust Project Developers +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +//! Utilities for working with ring buffers +//! +//! Most of the logic is derived/copied from Rust's VecDeque implementation: +//! +//! [`VecDeque`](https://github.com/rust-lang/rust/blob/master/library/alloc/src/collections/vec_deque/mod.rs) + +use core::ptr; + +mod pair; + +pub use pair::Pair; + +/// Checks that the current state preserves the required invariants +#[inline] +pub fn invariants(slice: &[T], head: usize, len: usize) -> bool { + let cap = slice.len(); + + ensure!(cap > 0, false); + ensure!(cap > head, false); + ensure!(cap >= len, false); + + true +} + +#[inline] +pub fn wrap(slice: &[T], head: usize, len: usize) -> usize { + assert!(invariants(slice, head, len)); + + let cursor = head + len; + + if slice.len().is_power_of_two() { + cursor & (slice.len() - 1) + } else { + cursor % slice.len() + } +} + +/// Returns the `tail` position for the current state +#[inline] +fn tail(slice: &[T], head: usize, len: usize) -> usize { + assert!(invariants(slice, head, len)); + wrap(slice, head, len) +} + +/// Returns the filled pair of slices for the current state +#[inline] +pub fn filled(slice: &mut [T], head: usize, len: usize) -> Pair<&mut [T]> { + assert!(invariants(slice, head, len)); + + let tail = tail(slice, head, len); + + // if the slice is contiguous then return the single slice + if is_contiguous(slice, head, len) { + let head = &mut slice[head..tail]; + let tail: &mut [T] = &mut []; + debug_assert_eq!(head.len(), len); + + return (head, tail).into(); + } + + let (bytes, head) = slice.split_at_mut(head); + let (tail, _unfilled) = bytes.split_at_mut(tail); + + if head.is_empty() { + debug_assert!(tail.is_empty()); + } + + debug_assert_eq!(head.len() + tail.len(), len); + + (head, tail).into() +} + +/// Returns the unfilled pair of slices for the current state +#[inline] +pub fn unfilled(slice: &mut [T], head: usize, len: usize) -> Pair<&mut [T]> { + assert!(invariants(slice, head, len)); + + let cap = slice.len(); + let tail = tail(slice, head, len); + let remaining_capacity = cap - len; + + // if the slice is non-contiguous then return the unfilled space between + if !is_contiguous(slice, head, len) { + let head = &mut slice[tail..head]; + let tail: &mut [T] = &mut []; + + debug_assert_eq!(head.len(), remaining_capacity); + + return (head, tail).into(); + } + + let (slice, head_slice) = slice.split_at_mut(tail); + let (tail_slice, _filled) = slice.split_at_mut(head); + + let head = head_slice; + let tail = tail_slice; + + debug_assert!(!head.is_empty()); + + debug_assert_eq!(head.len() + tail.len(), remaining_capacity); + + (head, tail).into() +} + +/// Returns `true` if the currently occupied elements are contiguous +#[inline] +pub fn is_contiguous(slice: &[T], head: usize, len: usize) -> bool { + assert!(invariants(slice, head, len)); + + head + len < slice.len() +} + +/// Forces all of the currently occupied elements to be contiguous +#[inline] +pub fn make_contiguous(slice: &mut [T], head_out: &mut usize, len: usize) { + let head = *head_out; + + assert!(invariants(slice, head, len)); + + // we only need to shuffle things if we're non-contiguous + ensure!(!is_contiguous(slice, head, len)); + + let cap = slice.len(); + + debug_assert!(len <= cap); + debug_assert!(head <= cap); + + let free = cap - len; + let head_len = cap - head; + let tail = len - head_len; + let tail_len = tail; + + if free >= head_len { + // there is enough free space to copy the head in one go, + // this means that we first shift the tail backwards, and then + // copy the head to the correct position. + // + // from: DEFGH....ABC + // to: ABCDEFGH.... + unsafe { + copy(slice, 0, head_len, tail_len); + // ...DEFGH.ABC + copy_nonoverlapping(slice, head, 0, head_len); + // ABCDEFGH.... + } + + *head_out = 0; + } else if free >= tail_len { + // there is enough free space to copy the tail in one go, + // this means that we first shift the head forwards, and then + // copy the tail to the correct position. + // + // from: FGH....ABCDE + // to: ...ABCDEFGH. + unsafe { + copy(slice, head, tail, head_len); + // FGHABCDE.... + copy_nonoverlapping(slice, 0, tail + head_len, tail_len); + // ...ABCDEFGH. + } + + *head_out = tail; + } else { + // `free` is smaller than both `head_len` and `tail_len`. + // the general algorithm for this first moves the slices + // right next to each other and then uses `slice::rotate` + // to rotate them into place: + // + // initially: HIJK..ABCDEFG + // step 1: ..HIJKABCDEFG + // step 2: ..ABCDEFGHIJK + // + // or: + // + // initially: FGHIJK..ABCDE + // step 1: FGHIJKABCDE.. + // step 2: ABCDEFGHIJK.. + + // pick the shorter of the 2 slices to reduce the amount + // of memory that needs to be moved around. + if head_len > tail_len { + // tail is shorter, so: + // 1. copy tail forwards + // 2. rotate used part of the buffer + // 3. update head to point to the new beginning (which is just `free`) + + unsafe { + // if there is no free space in the buffer, then the slices are already + // right next to each other and we don't need to move any memory. + if free != 0 { + // because we only move the tail forward as much as there's free space + // behind it, we don't overwrite any elements of the head slice, and + // the slices end up right next to each other. + copy(slice, 0, free, tail_len); + } + + // We just copied the tail right next to the head slice, + // so all of the elements in the range are initialized + let slice = &mut slice[free..cap]; + + // because the deque wasn't contiguous, we know that `tail_len < self.len == slice.len()`, + // so this will never panic. + slice.rotate_left(tail_len); + + // the used part of the buffer now is `free..self.capacity()`, so set + // `head` to the beginning of that range. + *head_out = free; + } + } else { + // head is shorter so: + // 1. copy head backwards + // 2. rotate used part of the buffer + // 3. update head to point to the new beginning (which is the beginning of the buffer) + + unsafe { + // if there is no free space in the buffer, then the slices are already + // right next to each other and we don't need to move any memory. + if free != 0 { + // copy the head slice to lie right behind the tail slice. + copy(slice, head, tail_len, head_len); + } + + // because we copied the head slice so that both slices lie right + // next to each other, all the elements in the range are initialized. + let slice = &mut slice[..len]; + + // because the deque wasn't contiguous, we know that `head_len < self.len == slice.len()` + // so this will never panic. + slice.rotate_right(head_len); + + // the used part of the buffer now is `0..self.len`, so set + // `head` to the beginning of that range. + *head_out = 0; + } + } + } +} + +/// Copies a contiguous block of memory len long from src to dst +#[inline] +unsafe fn copy(slice: &mut [T], src: usize, dst: usize, len: usize) { + debug_assert!( + dst + len <= slice.len(), + "cpy dst={} src={} len={} cap={}", + dst, + src, + len, + slice.len() + ); + debug_assert!( + src + len <= slice.len(), + "cpy dst={} src={} len={} cap={}", + dst, + src, + len, + slice.len() + ); + let ptr = slice.as_mut_ptr(); + unsafe { + ptr::copy(ptr.add(src), ptr.add(dst), len); + } +} + +/// Copies a contiguous block of memory len long from src to dst +#[inline] +unsafe fn copy_nonoverlapping(slice: &mut [T], src: usize, dst: usize, len: usize) { + debug_assert!( + dst + len <= slice.len(), + "cno dst={} src={} len={} cap={}", + dst, + src, + len, + slice.len() + ); + debug_assert!( + src + len <= slice.len(), + "cno dst={} src={} len={} cap={}", + dst, + src, + len, + slice.len() + ); + let ptr = slice.as_mut_ptr(); + unsafe { + ptr::copy_nonoverlapping(ptr.add(src), ptr.add(dst), len); + } +} diff --git a/quic/s2n-quic-core/src/slice/deque/pair.rs b/quic/s2n-quic-core/src/slice/deque/pair.rs new file mode 100644 index 0000000000..da9ddd563b --- /dev/null +++ b/quic/s2n-quic-core/src/slice/deque/pair.rs @@ -0,0 +1,283 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use core::{mem::MaybeUninit, ops}; + +pub struct Pair { + parts: [Part; 2], +} + +impl From<(Part, Part)> for Pair { + #[inline] + fn from((head, tail): (Part, Part)) -> Self { + Self { + parts: [head, tail], + } + } +} + +impl From> for (Part, Part) { + #[inline] + fn from(pair: Pair) -> (Part, Part) { + let [head, tail] = pair.parts; + (head, tail) + } +} + +impl Pair { + #[inline] + pub fn map(self, mut f: F) -> Pair + where + F: FnMut(T) -> U, + { + let [head, tail] = self.parts; + let head = f(head); + let tail = f(tail); + Pair { + parts: [head, tail], + } + } +} + +impl Pair +where + Part: ops::Deref, +{ + #[inline] + pub fn get(&self, mut index: usize) -> Option<&T> { + for part in &self.parts { + if let Some(v) = part.get(index) { + return Some(v); + }; + index -= part.len(); + } + + None + } + + #[inline] + pub fn iter<'a>(&'a self) -> impl Iterator + where + T: 'a, + { + self.parts.iter().flat_map(|p| p.iter()) + } +} + +impl Pair +where + Part: ops::DerefMut, +{ + #[inline] + pub fn get_mut(&mut self, mut index: usize) -> Option<&mut T> { + for part in &mut self.parts { + let part_len = part.len(); + if let Some(v) = part.get_mut(index) { + return Some(v); + }; + index -= part_len; + } + + None + } + + #[inline] + pub fn iter_mut<'a>(&'a mut self) -> impl Iterator + 'a + where + T: 'a, + { + self.parts.iter_mut().flat_map(|p| p.iter_mut()) + } +} + +impl<'a, T> Pair<&'a [MaybeUninit]> { + /// # Safety + /// + /// Callers should guarantee the memory in the pair is initialized + #[inline] + pub unsafe fn assume_init_ref(self) -> Pair<&'a [T]> { + self.map(|slice| { + // SAFETY: similar justification for assume_init_mut + &*(slice as *const [MaybeUninit] as *const [T]) + }) + } +} + +impl<'a, T> Pair<&'a mut [MaybeUninit]> { + /// # Safety + /// + /// Callers should guarantee the memory in the pair is initialized + #[inline] + pub unsafe fn assume_init_mut(self) -> Pair<&'a mut [T]> { + self.map(|slice| { + // SAFETY: casting `slice` to a `*mut [T]` is safe since the caller guarantees that + // `slice` is initialized, and `MaybeUninit` is guaranteed to have the same layout as `T`. + // The pointer obtained is valid since it refers to memory owned by `slice` which is a + // reference and thus guaranteed to be valid for reads and writes. + &mut *(slice as *mut [MaybeUninit] as *mut [T]) + }) + } +} + +#[cfg(feature = "std")] +mod std_ { + use super::*; + use std::io::{IoSlice, IoSliceMut}; + + impl<'a> Pair<&'a [MaybeUninit]> { + /// # Safety + /// + /// Callers should guarantee the memory in the pair is initialized + #[inline] + pub unsafe fn assume_init_io_slice(self) -> Pair> { + self.assume_init_ref().map(IoSlice::new) + } + } + + #[cfg(feature = "std")] + impl<'a> Pair<&'a mut [MaybeUninit]> { + /// # Safety + /// + /// Callers should guarantee the memory in the pair is initialized + #[inline] + pub unsafe fn assume_init_io_slice_mut(self) -> Pair> { + self.assume_init_mut().map(IoSliceMut::new) + } + } +} + +#[cfg(feature = "alloc")] +mod alloc_ { + use super::*; + use crate::buffer::{reader, writer}; + use bytes::buf::UninitSlice; + + impl reader::Storage for Pair { + type Error = S::Error; + + #[inline] + fn buffered_len(&self) -> usize { + self.parts[0].buffered_len() + self.parts[1].buffered_len() + } + + #[inline] + fn read_chunk( + &mut self, + watermark: usize, + ) -> Result, Self::Error> { + if !self.parts[0].buffer_is_empty() { + self.parts[0].read_chunk(watermark) + } else { + self.parts[1].read_chunk(watermark) + } + } + + #[inline] + fn partial_copy_into( + &mut self, + dest: &mut Dest, + ) -> Result, Self::Error> + where + Dest: crate::buffer::writer::Storage + ?Sized, + { + if self.parts[0].buffered_len() >= dest.remaining_capacity() { + self.parts[0].partial_copy_into(dest) + } else { + self.parts[0].copy_into(dest)?; + self.parts[1].partial_copy_into(dest) + } + } + + #[inline] + fn copy_into(&mut self, dest: &mut Dest) -> Result<(), Self::Error> + where + Dest: writer::Storage + ?Sized, + { + self.parts[0].copy_into(dest)?; + self.parts[1].copy_into(dest) + } + } + + impl Pair + where + Part: reader::Storage, + { + #[inline] + pub fn reader_slice(&mut self) -> &[Part] { + let [head, tail] = &self.parts; + match (!head.buffer_is_empty(), !tail.buffer_is_empty()) { + (true, true) => &self.parts, + (true, false) => &self.parts[..1], + (false, true) => &self.parts[1..], + (false, false) => &[], + } + } + + #[inline] + pub fn reader_slice_mut(&mut self) -> &mut [Part] { + let [head, tail] = &self.parts; + match (!head.buffer_is_empty(), !tail.buffer_is_empty()) { + (true, true) => &mut self.parts, + (true, false) => &mut self.parts[..1], + (false, true) => &mut self.parts[1..], + (false, false) => &mut [], + } + } + } + + impl writer::Storage for Pair { + #[inline] + fn put_slice(&mut self, mut bytes: &[u8]) { + use reader::storage::Infallible; + + debug_assert!(bytes.len() <= self.remaining_capacity()); + + bytes.infallible_copy_into(&mut self.parts[0]); + bytes.infallible_copy_into(&mut self.parts[1]); + } + + #[inline] + fn put_uninit_slice(&mut self, payload_len: usize, f: F) -> Result + where + F: FnOnce(&mut UninitSlice) -> Result<(), Error>, + { + if self.parts[0].has_remaining_capacity() { + self.parts[0].put_uninit_slice(payload_len, f) + } else { + self.parts[1].put_uninit_slice(payload_len, f) + } + } + + #[inline] + fn remaining_capacity(&self) -> usize { + self.parts[0].remaining_capacity() + self.parts[1].remaining_capacity() + } + } + + impl Pair + where + Part: writer::Storage, + { + #[inline] + pub fn writer_slice(&mut self) -> &[Part] { + let [head, tail] = &self.parts; + match (head.has_remaining_capacity(), tail.has_remaining_capacity()) { + (true, true) => &self.parts, + (true, false) => &self.parts[..1], + (false, true) => &self.parts[1..], + (false, false) => &[], + } + } + + #[inline] + pub fn writer_slice_mut(&mut self) -> &mut [Part] { + let [head, tail] = &self.parts; + match (head.has_remaining_capacity(), tail.has_remaining_capacity()) { + (true, true) => &mut self.parts, + (true, false) => &mut self.parts[..1], + (false, true) => &mut self.parts[1..], + (false, false) => &mut [], + } + } + } +}