From 4ec43d1f645b4d79ee72e4abc41bcd42b90494ae Mon Sep 17 00:00:00 2001 From: Taiki Endo Date: Tue, 7 May 2019 02:13:24 +0900 Subject: [PATCH] Add AsyncBufReadExt::read_line --- futures-util/src/io/mod.rs | 64 +++++++++++++++++++++++++++++-- futures-util/src/io/read_line.rs | 48 +++++++++++++++++++++++ futures-util/src/io/read_until.rs | 2 +- futures/src/lib.rs | 4 +- futures/tests/io_read_line.rs | 58 ++++++++++++++++++++++++++++ futures/tests/io_read_until.rs | 55 ++++---------------------- 6 files changed, 178 insertions(+), 53 deletions(-) create mode 100644 futures-util/src/io/read_line.rs create mode 100644 futures/tests/io_read_line.rs diff --git a/futures-util/src/io/mod.rs b/futures-util/src/io/mod.rs index c3a90a67a6..d81b35b1a0 100644 --- a/futures-util/src/io/mod.rs +++ b/futures-util/src/io/mod.rs @@ -40,9 +40,8 @@ pub use self::read::Read; mod read_exact; pub use self::read_exact::ReadExact; -// TODO -// mod read_line; -// pub use self::read_line::ReadLine; +mod read_line; +pub use self::read_line::ReadLine; mod read_to_end; pub use self::read_to_end::ReadToEnd; @@ -413,6 +412,65 @@ pub trait AsyncBufReadExt: AsyncBufRead { { ReadUntil::new(self, byte, buf) } + + /// Creates a future which will read all the bytes associated with this I/O + /// object into `buf` until a newline (the 0xA byte) or EOF is reached, + /// This method is the async equivalent to [`BufRead::read_line`](std::io::BufRead::read_line). + /// + /// This function will read bytes from the underlying stream until the + /// newline delimiter (the 0xA byte) or EOF is found. Once found, all bytes + /// up to, and including, the delimiter (if found) will be appended to + /// `buf`. + /// + /// The returned future will resolve to the number of bytes read once the read + /// operation is completed. + /// + /// In the case of an error the buffer and the object will be discarded, with + /// the error yielded. + /// + /// # Errors + /// + /// This function has the same error semantics as [`read_until`] and will + /// also return an error if the read bytes are not valid UTF-8. If an I/O + /// error is encountered then `buf` may contain some bytes already read in + /// the event that all data read so far was valid UTF-8. + /// + /// [`read_until`]: AsyncBufReadExt::read_until + /// + /// # Examples + /// + /// ``` + /// #![feature(async_await, await_macro)] + /// # futures::executor::block_on(async { + /// use futures::io::AsyncBufReadExt; + /// use std::io::Cursor; + /// + /// let mut cursor = Cursor::new(b"foo\nbar"); + /// let mut buf = String::new(); + /// + /// // cursor is at 'f' + /// let num_bytes = await!(cursor.read_line(&mut buf))?; + /// assert_eq!(num_bytes, 4); + /// assert_eq!(buf, "foo\n"); + /// buf.clear(); + /// + /// // cursor is at 'b' + /// let num_bytes = await!(cursor.read_line(&mut buf))?; + /// assert_eq!(num_bytes, 3); + /// assert_eq!(buf, "bar"); + /// buf.clear(); + /// + /// // cursor is at EOF + /// let num_bytes = await!(cursor.read_line(&mut buf))?; + /// assert_eq!(num_bytes, 0); + /// assert_eq!(buf, ""); + /// # Ok::<(), Box>(()) }).unwrap(); + /// ``` + fn read_line<'a>(&'a mut self, buf: &'a mut String) -> ReadLine<'a, Self> + where Self: Unpin, + { + ReadLine::new(self, buf) + } } impl AsyncBufReadExt for R {} diff --git a/futures-util/src/io/read_line.rs b/futures-util/src/io/read_line.rs new file mode 100644 index 0000000000..3a41ac5a3c --- /dev/null +++ b/futures-util/src/io/read_line.rs @@ -0,0 +1,48 @@ +use futures_core::future::Future; +use futures_core::task::{Context, Poll}; +use futures_io::AsyncBufRead; +use std::io; +use std::mem; +use std::pin::Pin; +use std::str; +use super::read_until::read_until_internal; + +/// Future for the [`read_line`](super::AsyncBufReadExt::read_line) method. +#[derive(Debug)] +pub struct ReadLine<'a, R: ?Sized + Unpin> { + reader: &'a mut R, + buf: &'a mut String, + bytes: Vec, + read: usize, +} + +impl Unpin for ReadLine<'_, R> {} + +impl<'a, R: AsyncBufRead + ?Sized + Unpin> ReadLine<'a, R> { + pub(super) fn new(reader: &'a mut R, buf: &'a mut String) -> Self { + Self { + reader, + bytes: unsafe { mem::replace(buf.as_mut_vec(), Vec::new()) }, + buf, + read: 0, + } + } +} + +impl Future for ReadLine<'_, R> { + type Output = io::Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let Self { reader, buf, bytes, read } = &mut *self; + let ret = ready!(read_until_internal(Pin::new(reader), b'\n', bytes, read, cx)); + if str::from_utf8(&bytes).is_err() { + Poll::Ready(ret.and_then(|_| { + Err(io::Error::new(io::ErrorKind::InvalidData, + "stream did not contain valid UTF-8")) + })) + } else { + unsafe { mem::swap(buf.as_mut_vec(), bytes); } + Poll::Ready(ret) + } + } +} diff --git a/futures-util/src/io/read_until.rs b/futures-util/src/io/read_until.rs index c82c73de1b..a64a721c45 100644 --- a/futures-util/src/io/read_until.rs +++ b/futures-util/src/io/read_until.rs @@ -22,7 +22,7 @@ impl<'a, R: AsyncBufRead + ?Sized + Unpin> ReadUntil<'a, R> { } } -fn read_until_internal( +pub(super) fn read_until_internal( mut reader: Pin<&mut R>, byte: u8, buf: &mut Vec, diff --git a/futures/src/lib.rs b/futures/src/lib.rs index bb057c135c..900e477790 100644 --- a/futures/src/lib.rs +++ b/futures/src/lib.rs @@ -281,8 +281,8 @@ pub mod io { pub use futures_util::io::{ AsyncReadExt, AsyncWriteExt, AsyncSeekExt, AsyncBufReadExt, AllowStdIo, BufReader, - Close, CopyInto, Flush, Read, ReadExact, ReadHalf, ReadToEnd, ReadUntil, - Seek, Window, WriteAll, WriteHalf, + Close, CopyInto, Flush, Read, ReadExact, ReadHalf, ReadLine, ReadToEnd, + ReadUntil, Seek, Window, WriteAll, WriteHalf, }; } diff --git a/futures/tests/io_read_line.rs b/futures/tests/io_read_line.rs new file mode 100644 index 0000000000..3918cb1af7 --- /dev/null +++ b/futures/tests/io_read_line.rs @@ -0,0 +1,58 @@ +use futures::executor::block_on; +use futures::future::Future; +use futures::io::AsyncBufReadExt; +use futures::task::Poll; +use futures_test::io::AsyncReadTestExt; +use futures_test::task::noop_context; +use std::io::Cursor; +use std::pin::Pin; + +#[test] +fn read_line() { + let mut buf = Cursor::new(b"12"); + let mut v = String::new(); + assert_eq!(block_on(buf.read_line(&mut v)).unwrap(), 2); + assert_eq!(v, "12"); + + let mut buf = Cursor::new(b"12\n\n"); + let mut v = String::new(); + assert_eq!(block_on(buf.read_line(&mut v)).unwrap(), 3); + assert_eq!(v, "12\n"); + v.clear(); + assert_eq!(block_on(buf.read_line(&mut v)).unwrap(), 1); + assert_eq!(v, "\n"); + v.clear(); + assert_eq!(block_on(buf.read_line(&mut v)).unwrap(), 0); + assert_eq!(v, ""); +} + +fn run(mut f: F) -> F::Output { + let mut cx = noop_context(); + loop { + if let Poll::Ready(x) = Pin::new(&mut f).poll(&mut cx) { + return x; + } + } +} + +#[test] +fn maybe_pending() { + let mut buf = b"12".interleave_pending(); + let mut v = String::new(); + assert_eq!(run(buf.read_line(&mut v)).unwrap(), 2); + assert_eq!(v, "12"); + + let mut buf = b"12\n\n".interleave_pending(); + let mut v = String::new(); + assert_eq!(run(buf.read_line(&mut v)).unwrap(), 3); + assert_eq!(v, "12\n"); + v.clear(); + assert_eq!(run(buf.read_line(&mut v)).unwrap(), 1); + assert_eq!(v, "\n"); + v.clear(); + assert_eq!(run(buf.read_line(&mut v)).unwrap(), 0); + assert_eq!(v, ""); + v.clear(); + assert_eq!(run(buf.read_line(&mut v)).unwrap(), 0); + assert_eq!(v, ""); +} diff --git a/futures/tests/io_read_until.rs b/futures/tests/io_read_until.rs index 17bccfbcc9..d4e6029330 100644 --- a/futures/tests/io_read_until.rs +++ b/futures/tests/io_read_until.rs @@ -1,20 +1,20 @@ use futures::executor::block_on; use futures::future::Future; -use futures::io::{AsyncRead, AsyncBufRead, AsyncBufReadExt}; -use futures::task::{Context, Poll}; +use futures::io::AsyncBufReadExt; +use futures::task::Poll; +use futures_test::io::AsyncReadTestExt; use futures_test::task::noop_context; -use std::cmp; -use std::io::{self, Cursor}; +use std::io::Cursor; use std::pin::Pin; #[test] fn read_until() { - let mut buf = Cursor::new(&b"12"[..]); + let mut buf = Cursor::new(b"12"); let mut v = Vec::new(); assert_eq!(block_on(buf.read_until(b'3', &mut v)).unwrap(), 2); assert_eq!(v, b"12"); - let mut buf = Cursor::new(&b"1233"[..]); + let mut buf = Cursor::new(b"1233"); let mut v = Vec::new(); assert_eq!(block_on(buf.read_until(b'3', &mut v)).unwrap(), 3); assert_eq!(v, b"123"); @@ -35,53 +35,14 @@ fn run(mut f: F) -> F::Output { } } -struct MaybePending<'a> { - inner: &'a [u8], - ready: bool, -} - -impl<'a> MaybePending<'a> { - fn new(inner: &'a [u8]) -> Self { - Self { inner, ready: false } - } -} - -impl AsyncRead for MaybePending<'_> { - fn poll_read(self: Pin<&mut Self>, _: &mut Context<'_>, _: &mut [u8]) - -> Poll> - { - unimplemented!() - } -} - -impl AsyncBufRead for MaybePending<'_> { - fn poll_fill_buf<'a>(mut self: Pin<&'a mut Self>, _: &mut Context<'_>) - -> Poll> - { - if self.ready { - self.ready = false; - if self.inner.is_empty() { return Poll::Ready(Ok(&[])) } - let len = cmp::min(2, self.inner.len()); - Poll::Ready(Ok(&self.inner[0..len])) - } else { - self.ready = true; - Poll::Pending - } - } - - fn consume(mut self: Pin<&mut Self>, amt: usize) { - self.inner = &self.inner[amt..]; - } -} - #[test] fn maybe_pending() { - let mut buf = MaybePending::new(b"12"); + let mut buf = b"12".interleave_pending(); let mut v = Vec::new(); assert_eq!(run(buf.read_until(b'3', &mut v)).unwrap(), 2); assert_eq!(v, b"12"); - let mut buf = MaybePending::new(b"12333"); + let mut buf = b"12333".interleave_pending(); let mut v = Vec::new(); assert_eq!(run(buf.read_until(b'3', &mut v)).unwrap(), 3); assert_eq!(v, b"123");