Skip to content

Commit

Permalink
Permit conversion into and creation from parts.
Browse files Browse the repository at this point in the history
  • Loading branch information
Roman S. Borschel committed Nov 10, 2020
1 parent bba1993 commit 43dde94
Show file tree
Hide file tree
Showing 6 changed files with 193 additions and 44 deletions.
76 changes: 65 additions & 11 deletions src/framed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ use std::task::{Context, Poll};
/// let bytes = Bytes::from("Hello world!");
/// framed.send(bytes).await?;
///
/// // Release the I/O and codec
/// let (cur, _) = framed.release();
/// // Drop down to the underlying I/O stream.
/// let cur = framed.into_inner();
/// assert_eq!(cur.get_ref(), b"Hello world!");
/// # Ok::<_, std::io::Error>(())
/// # }).unwrap();
Expand Down Expand Up @@ -65,23 +65,44 @@ where
/// A codec is a type which implements `Decoder` and `Encoder`.
pub fn new(inner: T, codec: U) -> Self {
Self {
inner: framed_read_2(framed_write_2(Fuse::new(inner, codec))),
inner: framed_read_2(framed_write_2(Fuse::new(inner, codec), None), None),
}
}

/// Release the I/O and Codec
pub fn release(self: Self) -> (T, U) {
let fuse = self.inner.release().release();
(fuse.t, fuse.u)
/// Creates a new `Framed` from [`FramedParts`].
///
/// See also [`Framed::into_parts`].
pub fn from_parts(FramedParts {
io, codec, write_buffer, read_buffer, ..
}: FramedParts<T, U>) -> Self {
let framed_write = framed_write_2(Fuse::new(io, codec), Some(write_buffer));
let framed_read = framed_read_2(framed_write, Some(read_buffer));
Self { inner: framed_read }
}

/// Consumes the `Framed`, returning its parts, such that a new
/// `Framed` may be constructed, possibly with a different codec.
///
/// See also [`Framed::from_parts`].
pub fn into_parts(self) -> FramedParts<T, U> {
let (framed_write, read_buffer) = self.inner.into_parts();
let (fuse, write_buffer) = framed_write.into_parts();
FramedParts {
io: fuse.t,
codec: fuse.u,
read_buffer,
write_buffer,
_priv: (),
}
}

/// Consumes the `Framed`, returning its underlying I/O stream.
///
/// Note that care should be taken to not tamper with the underlying stream
/// of data coming in as it may corrupt the stream of frames otherwise
/// being worked with.
/// Note that data that has already been read or written but not yet
/// consumed by the decoder or flushed, respectively, is dropped.
/// To retain any such potentially buffered data, use [`Framed::into_parts()`].
pub fn into_inner(self) -> T {
self.release().0
self.into_parts().io
}

/// Returns a reference to the underlying codec wrapped by
Expand Down Expand Up @@ -140,3 +161,36 @@ where
self.project().inner.poll_close(cx)
}
}

/// The parts obtained from [`Framed::into_parts`].
pub struct FramedParts<T, U> {
/// The underlying I/O stream.
pub io: T,
/// The codec used for encoding and decoding frames.
pub codec: U,
/// The remaining read buffer, containing data that has been
/// read from `io` but not yet consumed by the codec's decoder.
pub read_buffer: BytesMut,
/// The remaining write buffer, containing framed data that has been
/// buffered but not yet flushed to `io`.
pub write_buffer: BytesMut,
/// Keep the constructor private.
_priv: (),
}

impl<T, U> FramedParts<T, U> {
/// Changes the codec used in this `FramedParts`.
pub fn map_codec<V, F>(self, f: F) -> FramedParts<T, V>
where
V: Encoder + Decoder,
F: FnOnce(U) -> V,
{
FramedParts {
io: self.io,
codec: f(self.codec),
read_buffer: self.read_buffer,
write_buffer: self.write_buffer,
_priv: (),
}
}
}
74 changes: 61 additions & 13 deletions src/framed_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,23 +56,42 @@ where
/// Creates a new `FramedRead` transport with the given `Decoder`.
pub fn new(inner: T, decoder: D) -> Self {
Self {
inner: framed_read_2(Fuse::new(inner, decoder)),
inner: framed_read_2(Fuse::new(inner, decoder), None),
}
}

/// Release the I/O and Decoder
pub fn release(self: Self) -> (T, D) {
let fuse = self.inner.release();
(fuse.t, fuse.u)
/// Creates a new `FramedRead` from [`FramedReadParts`].
///
/// See also [`FramedRead::into_parts`].
pub fn from_parts(FramedReadParts {
io, decoder, buffer, ..
}: FramedReadParts<T, D>) -> Self {
Self {
inner: framed_read_2(Fuse::new(io, decoder), Some(buffer))
}
}

/// Consumes the `FramedRead`, returning its parts such that a
/// new `FramedRead` may be constructed, possibly with a different decoder.
///
/// See also [`FramedRead::from_parts`].
pub fn into_parts(self) -> FramedReadParts<T, D> {
let (fuse, buffer) = self.inner.into_parts();
FramedReadParts {
io: fuse.t,
decoder: fuse.u,
buffer,
_priv: (),
}
}

/// Consumes the `FramedRead`, returning its underlying I/O stream.
///
/// Note that care should be taken to not tamper with the underlying stream
/// of data coming in as it may corrupt the stream of frames otherwise
/// being worked with.
/// Note that data that has already been read but not yet consumed
/// by the decoder is dropped. To retain any such potentially
/// buffered data, use [`FramedRead::into_parts()`].
pub fn into_inner(self) -> T {
self.release().0
self.into_parts().io
}

/// Returns a reference to the underlying decoder.
Expand Down Expand Up @@ -133,10 +152,10 @@ impl<T> DerefMut for FramedRead2<T> {

const INITIAL_CAPACITY: usize = 8 * 1024;

pub fn framed_read_2<T>(inner: T) -> FramedRead2<T> {
pub fn framed_read_2<T>(inner: T, buffer: Option<BytesMut>) -> FramedRead2<T> {
FramedRead2 {
inner,
buffer: BytesMut::with_capacity(INITIAL_CAPACITY),
buffer: buffer.unwrap_or_else(|| BytesMut::with_capacity(INITIAL_CAPACITY)),
}
}

Expand Down Expand Up @@ -207,11 +226,40 @@ where
}

impl<T> FramedRead2<T> {
pub fn release(self: Self) -> T {
self.inner
pub fn into_parts(self) -> (T, BytesMut) {
(self.inner, self.buffer)
}

pub fn buffer(&self) -> &BytesMut {
&self.buffer
}
}

/// The parts obtained from (FramedRead::into_parts).
pub struct FramedReadParts<T, D> {
/// The underlying I/O stream.
pub io: T,
/// The frame decoder.
pub decoder: D,
/// The buffer of data that has been read from `io` but not
/// yet consumed by `decoder`.
pub buffer: BytesMut,
/// Keep the constructor private.
_priv: (),
}

impl<T, D> FramedReadParts<T, D> {
/// Changes the decoder in `FramedReadParts`.
pub fn map_decoder<E, F>(self, f: F) -> FramedReadParts<T, E>
where
E: Decoder,
F: FnOnce(D) -> E,
{
FramedReadParts {
io: self.io,
decoder: f(self.decoder),
buffer: self.buffer,
_priv: (),
}
}
}
73 changes: 60 additions & 13 deletions src/framed_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,18 @@ where
/// Creates a new `FramedWrite` transport with the given `Encoder`.
pub fn new(inner: T, encoder: E) -> Self {
Self {
inner: framed_write_2(Fuse::new(inner, encoder)),
inner: framed_write_2(Fuse::new(inner, encoder), None),
}
}

/// Creates a new `FramedWrite` from [`FramedWriteParts`].
///
/// See also [`FramedWrite::into_parts`].
pub fn from_parts(FramedWriteParts {
io, encoder, buffer, ..
}: FramedWriteParts<T, E>) -> Self {
Self {
inner: framed_write_2(Fuse::new(io, encoder), Some(buffer))
}
}

Expand Down Expand Up @@ -82,19 +93,27 @@ where
self.inner.high_water_mark = hwm;
}

/// Release the I/O and Encoder
pub fn release(self) -> (T, E) {
let fuse = self.inner.release();
(fuse.t, fuse.u)
/// Consumes the `FramedWrite`, returning its parts such that
/// a new `FramedWrite` may be constructed, possibly with a different encoder.
///
/// See also [`FramedWrite::from_parts`].
pub fn into_parts(self) -> FramedWriteParts<T, E> {
let (fuse, buffer) = self.inner.into_parts();
FramedWriteParts {
io: fuse.t,
encoder: fuse.u,
buffer,
_priv: (),
}
}

/// Consumes the `FramedWrite`, returning its underlying I/O stream.
///
/// Note that care should be taken to not tamper with the underlying stream
/// of data coming in as it may corrupt the stream of frames otherwise
/// being worked with.
/// Note that data that has already been written but not yet flushed
/// is dropped. To retain any such potentially buffered data, use
/// [`FramedWrite::into_parts()`].
pub fn into_inner(self) -> T {
self.release().0
self.into_parts().io
}

/// Returns a reference to the underlying encoder.
Expand Down Expand Up @@ -176,11 +195,11 @@ impl<T> DerefMut for FramedWrite2<T> {
// TCP send buffer size (SO_SNDBUF)
const DEFAULT_SEND_HIGH_WATER_MARK: usize = 131072;

pub fn framed_write_2<T>(inner: T) -> FramedWrite2<T> {
pub fn framed_write_2<T>(inner: T, buffer: Option<BytesMut>) -> FramedWrite2<T> {
FramedWrite2 {
inner,
high_water_mark: DEFAULT_SEND_HIGH_WATER_MARK,
buffer: BytesMut::with_capacity(1028 * 8),
buffer: buffer.unwrap_or_else(|| BytesMut::with_capacity(1028 * 8)),
}
}

Expand Down Expand Up @@ -240,11 +259,39 @@ where
}

impl<T> FramedWrite2<T> {
pub fn release(self) -> T {
self.inner
pub fn into_parts(self) -> (T, BytesMut) {
(self.inner, self.buffer)
}
}

fn err_eof() -> Error {
Error::new(ErrorKind::UnexpectedEof, "End of file")
}

/// The parts obtained from [`FramedWrite::into_parts`].
pub struct FramedWriteParts<T, E> {
/// The underlying I/O stream.
pub io: T,
/// The frame encoder.
pub encoder: E,
/// The framed data that has been buffered but not yet flushed to `io`.
pub buffer: BytesMut,
/// Keep the constructor private.
_priv: (),
}

impl<T, E> FramedWriteParts<T, E> {
/// Changes the encoder used in `FramedWriteParts`.
pub fn map_encoder<G, F>(self, f: F) -> FramedWriteParts<T, G>
where
G: Encoder,
F: FnOnce(E) -> G,
{
FramedWriteParts {
io: self.io,
encoder: f(self.encoder),
buffer: self.buffer,
_priv: (),
}
}
}
6 changes: 3 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@ mod encoder;
pub use encoder::Encoder;

mod framed;
pub use framed::Framed;
pub use framed::{Framed, FramedParts};

mod framed_read;
pub use framed_read::FramedRead;
pub use framed_read::{FramedRead, FramedReadParts};

mod framed_write;
pub use framed_write::FramedWrite;
pub use framed_write::{FramedWrite, FramedWriteParts};

mod fuse;
6 changes: 3 additions & 3 deletions tests/framed_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ fn line_write() {
let mut framer = FramedWrite::new(curs, LinesCodec {});
executor::block_on(framer.send("Hello\n".to_owned())).unwrap();
executor::block_on(framer.send("World\n".to_owned())).unwrap();
let (curs, _) = framer.release();
let curs = framer.into_inner();
assert_eq!(&curs.get_ref()[0..12], b"Hello\nWorld\n");
assert_eq!(curs.position(), 12);
}
Expand All @@ -69,7 +69,7 @@ fn line_write_to_eof() {
let mut framer = FramedWrite::new(curs, LinesCodec {});
let _err =
executor::block_on(framer.send("This will fill up the buffer\n".to_owned())).unwrap_err();
let (curs, _) = framer.release();
let curs = framer.into_inner();
assert_eq!(curs.position(), 16);
assert_eq!(&curs.get_ref()[0..16], b"This will fill u");
}
Expand All @@ -93,7 +93,7 @@ fn send_high_water_mark() {
let mut framer = FramedWrite::new(io, BytesCodec {});
framer.set_send_high_water_mark(500);
executor::block_on(framer.send_all(&mut stream)).unwrap();
let (io, _) = framer.release();
let io = framer.into_inner();
assert_eq!(io.num_poll_write, 2);
assert_eq!(io.last_write_size, 499);
}
2 changes: 1 addition & 1 deletion tests/length_delimited.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ fn same_msgs_are_received_as_were_sent() {
};
executor::block_on(send_msgs);

let (mut cur, _) = framed.release();
let mut cur = framed.into_inner();
cur.set_position(0);
let framed = Framed::new(cur, LengthCodec {});

Expand Down

0 comments on commit 43dde94

Please sign in to comment.