Skip to content

Commit

Permalink
Add TryFutureExt::try_flatten_stream
Browse files Browse the repository at this point in the history
  • Loading branch information
taiki-e committed May 18, 2019
1 parent 033a90b commit 7b2d58b
Show file tree
Hide file tree
Showing 4 changed files with 153 additions and 20 deletions.
29 changes: 10 additions & 19 deletions futures-util/src/future/flatten_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use core::pin::Pin;
use futures_core::future::Future;
use futures_core::stream::{FusedStream, Stream};
use futures_core::task::{Context, Poll};
use pin_utils::unsafe_pinned;

/// Stream for the [`flatten_stream`](super::FutureExt::flatten_stream) method.
#[must_use = "streams do nothing unless polled"]
Expand All @@ -11,6 +12,8 @@ pub struct FlattenStream<Fut: Future> {
}

impl<Fut: Future> FlattenStream<Fut> {
unsafe_pinned!(state: State<Fut>);

pub(super) fn new(future: Fut) -> FlattenStream<Fut> {
FlattenStream {
state: State::Future(future)
Expand Down Expand Up @@ -58,34 +61,22 @@ impl<Fut> Stream for FlattenStream<Fut>
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
// safety: data is never moved via the resulting &mut reference
let stream = match &mut unsafe { Pin::get_unchecked_mut(self.as_mut()) }.state {
match &mut unsafe { Pin::get_unchecked_mut(self.as_mut()) }.state {
State::Future(f) => {
// safety: the future we're re-pinning here will never be moved;
// it will just be polled, then dropped in place
match unsafe { Pin::new_unchecked(f) }.poll(cx) {
Poll::Pending => {
// State is not changed, early return.
return Poll::Pending
},
Poll::Ready(stream) => {
// Future resolved to stream.
// We do not return, but poll that
// stream in the next loop iteration.
stream
}
}
let stream = ready!(unsafe { Pin::new_unchecked(f) }.poll(cx));

// Future resolved to stream.
// We do not return, but poll that
// stream in the next loop iteration.
self.as_mut().state().set(State::Stream(stream));
}
State::Stream(s) => {
// safety: the stream we're repinning here will never be moved;
// it will just be polled, then dropped in place
return unsafe { Pin::new_unchecked(s) }.poll_next(cx);
}
};

unsafe {
// safety: we use the &mut only for an assignment, which causes
// only an in-place drop
Pin::get_unchecked_mut(self.as_mut()).state = State::Stream(stream);
}
}
}
Expand Down
37 changes: 37 additions & 0 deletions futures-util/src/try_future/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use core::pin::Pin;
use futures_core::future::TryFuture;
use futures_core::stream::TryStream;
use futures_core::task::{Context, Poll};
use futures_sink::Sink;

Expand Down Expand Up @@ -52,6 +53,9 @@ pub use self::map_ok::MapOk;
mod or_else;
pub use self::or_else::OrElse;

mod try_flatten_stream;
pub use self::try_flatten_stream::TryFlattenStream;

mod unwrap_or_else;
pub use self::unwrap_or_else::UnwrapOrElse;

Expand Down Expand Up @@ -319,6 +323,39 @@ pub trait TryFutureExt: TryFuture {
OrElse::new(self, f)
}

/// Flatten the execution of this future when the successful result of this
/// future is a stream.
///
/// This can be useful when stream initialization is deferred, and it is
/// convenient to work with that stream as if stream was available at the
/// call site.
///
/// Note that this function consumes this future and returns a wrapped
/// version of it.
///
/// # Examples
///
/// ```
/// #![feature(async_await)]
/// # futures::executor::block_on(async {
/// use futures::future::{self, TryFutureExt};
/// use futures::stream::{self, TryStreamExt};
///
/// let stream_items = vec![17, 18, 19].into_iter().map(Ok);
/// let future_of_a_stream = future::ok::<_, ()>(stream::iter(stream_items));
///
/// let stream = future_of_a_stream.try_flatten_stream();
/// let list = stream.try_collect::<Vec<_>>().await;
/// assert_eq!(list, Ok(vec![17, 18, 19]));
/// # });
/// ```
fn try_flatten_stream(self) -> TryFlattenStream<Self>
where Self::Ok: TryStream<Error = Self::Error>,
Self: Sized
{
TryFlattenStream::new(self)
}

/* TODO
/// Waits for either one of two differently-typed futures to complete.
///
Expand Down
105 changes: 105 additions & 0 deletions futures-util/src/try_future/try_flatten_stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
use core::fmt;
use core::pin::Pin;
use futures_core::future::TryFuture;
use futures_core::stream::{FusedStream, Stream, TryStream};
use futures_core::task::{Context, Poll};
use pin_utils::unsafe_pinned;

/// Stream for the [`try_flatten_stream`](super::TryFutureExt::try_flatten_stream) method.
#[must_use = "streams do nothing unless polled"]
pub struct TryFlattenStream<Fut>
where
Fut: TryFuture,
{
state: State<Fut>
}

impl<Fut: TryFuture> TryFlattenStream<Fut>
where
Fut: TryFuture,
Fut::Ok: TryStream<Error = Fut::Error>,
{
unsafe_pinned!(state: State<Fut>);

pub(super) fn new(future: Fut) -> Self {
Self {
state: State::Future(future)
}
}
}

impl<Fut> fmt::Debug for TryFlattenStream<Fut>
where
Fut: TryFuture + fmt::Debug,
Fut::Ok: fmt::Debug,
{
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("TryFlattenStream")
.field("state", &self.state)
.finish()
}
}

#[derive(Debug)]
enum State<Fut: TryFuture> {
// future is not yet called or called and not ready
Future(Fut),
// future resolved to Stream
Stream(Fut::Ok),
// future resolved to error
Done,
}

impl<Fut> FusedStream for TryFlattenStream<Fut>
where
Fut: TryFuture,
Fut::Ok: TryStream<Error = Fut::Error> + FusedStream,
{
fn is_terminated(&self) -> bool {
match &self.state {
State::Future(_) => false,
State::Stream(stream) => stream.is_terminated(),
State::Done => true,
}
}
}

impl<Fut> Stream for TryFlattenStream<Fut>
where
Fut: TryFuture,
Fut::Ok: TryStream<Error = Fut::Error>,
{
type Item = Result<<Fut::Ok as TryStream>::Ok, Fut::Error>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
// safety: data is never moved via the resulting &mut reference
match &mut unsafe { Pin::get_unchecked_mut(self.as_mut()) }.state {
State::Future(f) => {
// safety: the future we're re-pinning here will never be moved;
// it will just be polled, then dropped in place
match ready!(unsafe { Pin::new_unchecked(f) }.try_poll(cx)) {
Ok(stream) => {
// Future resolved to stream.
// We do not return, but poll that
// stream in the next loop iteration.
self.as_mut().state().set(State::Stream(stream));
}
Err(e) => {
// Future resolved to error.
// We have neither a pollable stream nor a future.
self.as_mut().state().set(State::Done);
return Poll::Ready(Some(Err(e)));
}
}
}
State::Stream(s) => {
// safety: the stream we're repinning here will never be moved;
// it will just be polled, then dropped in place
return unsafe { Pin::new_unchecked(s) }.try_poll_next(cx);
}
State::Done => return Poll::Ready(None),
}
}
}
}
2 changes: 1 addition & 1 deletion futures/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ pub mod future {

TryFutureExt,
AndThen, ErrInto, FlattenSink, IntoFuture, MapErr, MapOk, OrElse,
UnwrapOrElse,
TryFlattenStream, UnwrapOrElse,
};

#[cfg(feature = "never-type")]
Expand Down

0 comments on commit 7b2d58b

Please sign in to comment.