Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add AsyncBufReadExt::read_line #1556

Merged
merged 1 commit into from
May 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 61 additions & 3 deletions futures-util/src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,8 @@ pub use self::read::Read;
mod read_exact;
pub use self::read_exact::ReadExact;

// TODO
// mod read_line;
// pub use self::read_line::ReadLine;
mod read_line;
pub use self::read_line::ReadLine;

mod read_to_end;
pub use self::read_to_end::ReadToEnd;
Expand Down Expand Up @@ -413,6 +412,65 @@ pub trait AsyncBufReadExt: AsyncBufRead {
{
ReadUntil::new(self, byte, buf)
}

/// Creates a future which will read all the bytes associated with this I/O
/// object into `buf` until a newline (the 0xA byte) or EOF is reached,
/// This method is the async equivalent to [`BufRead::read_line`](std::io::BufRead::read_line).
///
/// This function will read bytes from the underlying stream until the
/// newline delimiter (the 0xA byte) or EOF is found. Once found, all bytes
/// up to, and including, the delimiter (if found) will be appended to
/// `buf`.
///
/// The returned future will resolve to the number of bytes read once the read
/// operation is completed.
///
/// In the case of an error the buffer and the object will be discarded, with
/// the error yielded.
///
/// # Errors
///
/// This function has the same error semantics as [`read_until`] and will
/// also return an error if the read bytes are not valid UTF-8. If an I/O
/// error is encountered then `buf` may contain some bytes already read in
/// the event that all data read so far was valid UTF-8.
///
/// [`read_until`]: AsyncBufReadExt::read_until
///
/// # Examples
///
/// ```
/// #![feature(async_await, await_macro)]
/// # futures::executor::block_on(async {
/// use futures::io::AsyncBufReadExt;
/// use std::io::Cursor;
///
/// let mut cursor = Cursor::new(b"foo\nbar");
/// let mut buf = String::new();
///
/// // cursor is at 'f'
/// let num_bytes = await!(cursor.read_line(&mut buf))?;
/// assert_eq!(num_bytes, 4);
/// assert_eq!(buf, "foo\n");
/// buf.clear();
///
/// // cursor is at 'b'
/// let num_bytes = await!(cursor.read_line(&mut buf))?;
/// assert_eq!(num_bytes, 3);
/// assert_eq!(buf, "bar");
/// buf.clear();
///
/// // cursor is at EOF
/// let num_bytes = await!(cursor.read_line(&mut buf))?;
/// assert_eq!(num_bytes, 0);
/// assert_eq!(buf, "");
/// # Ok::<(), Box<std::error::Error>>(()) }).unwrap();
/// ```
fn read_line<'a>(&'a mut self, buf: &'a mut String) -> ReadLine<'a, Self>
where Self: Unpin,
{
ReadLine::new(self, buf)
}
}

impl<R: AsyncBufRead + ?Sized> AsyncBufReadExt for R {}
48 changes: 48 additions & 0 deletions futures-util/src/io/read_line.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
use futures_core::future::Future;
use futures_core::task::{Context, Poll};
use futures_io::AsyncBufRead;
use std::io;
use std::mem;
use std::pin::Pin;
use std::str;
use super::read_until::read_until_internal;

/// Future for the [`read_line`](super::AsyncBufReadExt::read_line) method.
#[derive(Debug)]
pub struct ReadLine<'a, R: ?Sized + Unpin> {
reader: &'a mut R,
buf: &'a mut String,
bytes: Vec<u8>,
read: usize,
}

impl<R: ?Sized + Unpin> Unpin for ReadLine<'_, R> {}

impl<'a, R: AsyncBufRead + ?Sized + Unpin> ReadLine<'a, R> {
pub(super) fn new(reader: &'a mut R, buf: &'a mut String) -> Self {
Self {
reader,
bytes: unsafe { mem::replace(buf.as_mut_vec(), Vec::new()) },
buf,
read: 0,
}
}
}

impl<R: AsyncBufRead + ?Sized + Unpin> Future for ReadLine<'_, R> {
type Output = io::Result<usize>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let Self { reader, buf, bytes, read } = &mut *self;
let ret = ready!(read_until_internal(Pin::new(reader), b'\n', bytes, read, cx));
if str::from_utf8(&bytes).is_err() {
Poll::Ready(ret.and_then(|_| {
Err(io::Error::new(io::ErrorKind::InvalidData,
"stream did not contain valid UTF-8"))
}))
} else {
unsafe { mem::swap(buf.as_mut_vec(), bytes); }
Poll::Ready(ret)
}
}
}
2 changes: 1 addition & 1 deletion futures-util/src/io/read_until.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ impl<'a, R: AsyncBufRead + ?Sized + Unpin> ReadUntil<'a, R> {
}
}

fn read_until_internal<R: AsyncBufRead + ?Sized + Unpin>(
pub(super) fn read_until_internal<R: AsyncBufRead + ?Sized + Unpin>(
mut reader: Pin<&mut R>,
byte: u8,
buf: &mut Vec<u8>,
Expand Down
4 changes: 2 additions & 2 deletions futures/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,8 +281,8 @@ pub mod io {

pub use futures_util::io::{
AsyncReadExt, AsyncWriteExt, AsyncSeekExt, AsyncBufReadExt, AllowStdIo, BufReader,
Close, CopyInto, Flush, Read, ReadExact, ReadHalf, ReadToEnd, ReadUntil,
Seek, Window, WriteAll, WriteHalf,
Close, CopyInto, Flush, Read, ReadExact, ReadHalf, ReadLine, ReadToEnd,
ReadUntil, Seek, Window, WriteAll, WriteHalf,
};
}

Expand Down
58 changes: 58 additions & 0 deletions futures/tests/io_read_line.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
use futures::executor::block_on;
use futures::future::Future;
use futures::io::AsyncBufReadExt;
use futures::task::Poll;
use futures_test::io::AsyncReadTestExt;
use futures_test::task::noop_context;
use std::io::Cursor;
use std::pin::Pin;

#[test]
fn read_line() {
taiki-e marked this conversation as resolved.
Show resolved Hide resolved
let mut buf = Cursor::new(b"12");
let mut v = String::new();
assert_eq!(block_on(buf.read_line(&mut v)).unwrap(), 2);
assert_eq!(v, "12");

let mut buf = Cursor::new(b"12\n\n");
let mut v = String::new();
assert_eq!(block_on(buf.read_line(&mut v)).unwrap(), 3);
assert_eq!(v, "12\n");
v.clear();
assert_eq!(block_on(buf.read_line(&mut v)).unwrap(), 1);
assert_eq!(v, "\n");
v.clear();
assert_eq!(block_on(buf.read_line(&mut v)).unwrap(), 0);
assert_eq!(v, "");
}

fn run<F: Future + Unpin>(mut f: F) -> F::Output {
let mut cx = noop_context();
loop {
if let Poll::Ready(x) = Pin::new(&mut f).poll(&mut cx) {
return x;
}
}
}

#[test]
fn maybe_pending() {
let mut buf = b"12".interleave_pending();
let mut v = String::new();
assert_eq!(run(buf.read_line(&mut v)).unwrap(), 2);
assert_eq!(v, "12");

let mut buf = b"12\n\n".interleave_pending();
let mut v = String::new();
assert_eq!(run(buf.read_line(&mut v)).unwrap(), 3);
assert_eq!(v, "12\n");
v.clear();
assert_eq!(run(buf.read_line(&mut v)).unwrap(), 1);
assert_eq!(v, "\n");
v.clear();
assert_eq!(run(buf.read_line(&mut v)).unwrap(), 0);
assert_eq!(v, "");
v.clear();
assert_eq!(run(buf.read_line(&mut v)).unwrap(), 0);
assert_eq!(v, "");
}
55 changes: 8 additions & 47 deletions futures/tests/io_read_until.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
use futures::executor::block_on;
use futures::future::Future;
use futures::io::{AsyncRead, AsyncBufRead, AsyncBufReadExt};
use futures::task::{Context, Poll};
use futures::io::AsyncBufReadExt;
use futures::task::Poll;
use futures_test::io::AsyncReadTestExt;
use futures_test::task::noop_context;
use std::cmp;
use std::io::{self, Cursor};
use std::io::Cursor;
use std::pin::Pin;

#[test]
fn read_until() {
let mut buf = Cursor::new(&b"12"[..]);
let mut buf = Cursor::new(b"12");
let mut v = Vec::new();
assert_eq!(block_on(buf.read_until(b'3', &mut v)).unwrap(), 2);
assert_eq!(v, b"12");

let mut buf = Cursor::new(&b"1233"[..]);
let mut buf = Cursor::new(b"1233");
let mut v = Vec::new();
assert_eq!(block_on(buf.read_until(b'3', &mut v)).unwrap(), 3);
assert_eq!(v, b"123");
Expand All @@ -35,53 +35,14 @@ fn run<F: Future + Unpin>(mut f: F) -> F::Output {
}
}

struct MaybePending<'a> {
inner: &'a [u8],
ready: bool,
}

impl<'a> MaybePending<'a> {
fn new(inner: &'a [u8]) -> Self {
Self { inner, ready: false }
}
}

impl AsyncRead for MaybePending<'_> {
fn poll_read(self: Pin<&mut Self>, _: &mut Context<'_>, _: &mut [u8])
-> Poll<io::Result<usize>>
{
unimplemented!()
}
}

impl AsyncBufRead for MaybePending<'_> {
fn poll_fill_buf<'a>(mut self: Pin<&'a mut Self>, _: &mut Context<'_>)
-> Poll<io::Result<&'a [u8]>>
{
if self.ready {
self.ready = false;
if self.inner.is_empty() { return Poll::Ready(Ok(&[])) }
let len = cmp::min(2, self.inner.len());
Poll::Ready(Ok(&self.inner[0..len]))
} else {
self.ready = true;
Poll::Pending
}
}

fn consume(mut self: Pin<&mut Self>, amt: usize) {
self.inner = &self.inner[amt..];
}
}

#[test]
fn maybe_pending() {
let mut buf = MaybePending::new(b"12");
let mut buf = b"12".interleave_pending();
let mut v = Vec::new();
assert_eq!(run(buf.read_until(b'3', &mut v)).unwrap(), 2);
assert_eq!(v, b"12");

let mut buf = MaybePending::new(b"12333");
let mut buf = b"12333".interleave_pending();
Copy link
Member

@Nemo157 Nemo157 May 7, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that this won't add a pending in between reads that actually return data like the old implementation, it will be like Pending, Ready(b"12333"), Pending, Ready(b"").

If you want to test reading data multiple times my plan elsewhere was to use TryStreamExt::into_async_read, so something like stream::iter(vec![b"12", b"33", b"3"]).map(Ok).into_async_read().interleave_pending().

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing that out. I updated this PR after checking locally that the new test can detect problems like #1566, but the code you suggested is even better. I will open a PR

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds like IntoAsyncRead should special-case consume(0) to be a noop then.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. Opened #1584.

let mut v = Vec::new();
assert_eq!(run(buf.read_until(b'3', &mut v)).unwrap(), 3);
assert_eq!(v, b"123");
Expand Down