Skip to content

Commit

Permalink
Add Peekable::{peek_mut, poll_peek_mut} (#2488)
Browse files Browse the repository at this point in the history
  • Loading branch information
taiki-e committed Aug 30, 2021
1 parent 302444b commit 8b39aa1
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 4 deletions.
4 changes: 2 additions & 2 deletions futures-util/src/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ pub use futures_core::stream::{FusedStream, Stream, TryStream};
mod stream;
pub use self::stream::{
Chain, Collect, Concat, Cycle, Enumerate, Filter, FilterMap, FlatMap, Flatten, Fold, ForEach,
Fuse, Inspect, Map, Next, NextIf, NextIfEq, Peek, Peekable, Scan, SelectNextSome, Skip,
SkipWhile, StreamExt, StreamFuture, Take, TakeUntil, TakeWhile, Then, Unzip, Zip,
Fuse, Inspect, Map, Next, NextIf, NextIfEq, Peek, PeekMut, Peekable, Scan, SelectNextSome,
Skip, SkipWhile, StreamExt, StreamFuture, Take, TakeUntil, TakeWhile, Then, Unzip, Zip,
};

#[cfg(feature = "std")]
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/stream/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ pub use self::select_next_some::SelectNextSome;

mod peek;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::peek::{NextIf, NextIfEq, Peek, Peekable};
pub use self::peek::{NextIf, NextIfEq, Peek, PeekMut, Peekable};

mod skip;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
Expand Down
92 changes: 91 additions & 1 deletion futures-util/src/stream/stream/peek.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ impl<St: Stream> Peekable<St> {

delegate_access_inner!(stream, St, (.));

/// Produces a `Peek` future which retrieves a reference to the next item
/// Produces a future which retrieves a reference to the next item
/// in the stream, or `None` if the underlying stream terminates.
pub fn peek(self: Pin<&mut Self>) -> Peek<'_, St> {
Peek { inner: Some(self) }
Expand All @@ -57,6 +57,54 @@ impl<St: Stream> Peekable<St> {
})
}

/// Produces a future which retrieves a mutable reference to the next item
/// in the stream, or `None` if the underlying stream terminates.
///
/// # Examples
///
/// ```
/// # futures::executor::block_on(async {
/// use futures::stream::{self, StreamExt};
/// use futures::pin_mut;
///
/// let stream = stream::iter(vec![1, 2, 3]).peekable();
/// pin_mut!(stream);
///
/// assert_eq!(stream.as_mut().peek_mut().await, Some(&mut 1));
/// assert_eq!(stream.as_mut().next().await, Some(1));
///
/// // Peek into the stream and modify the value which will be returned next
/// if let Some(p) = stream.as_mut().peek_mut().await {
/// if *p == 2 {
/// *p = 5;
/// }
/// }
///
/// assert_eq!(stream.collect::<Vec<_>>().await, vec![5, 3]);
/// # });
/// ```
pub fn peek_mut(self: Pin<&mut Self>) -> PeekMut<'_, St> {
PeekMut { inner: Some(self) }
}

/// Peek retrieves a mutable reference to the next item in the stream.
pub fn poll_peek_mut(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<&mut St::Item>> {
let mut this = self.project();

Poll::Ready(loop {
if this.peeked.is_some() {
break this.peeked.as_mut();
} else if let Some(item) = ready!(this.stream.as_mut().poll_next(cx)) {
*this.peeked = Some(item);
} else {
break None;
}
})
}

/// Creates a future which will consume and return the next value of this
/// stream if a condition is true.
///
Expand Down Expand Up @@ -220,6 +268,48 @@ where
}
}

pin_project! {
/// Future for the [`Peekable::peek_mut`](self::Peekable::peek_mut) method.
#[must_use = "futures do nothing unless polled"]
pub struct PeekMut<'a, St: Stream> {
inner: Option<Pin<&'a mut Peekable<St>>>,
}
}

impl<St> fmt::Debug for PeekMut<'_, St>
where
St: Stream + fmt::Debug,
St::Item: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("PeekMut").field("inner", &self.inner).finish()
}
}

impl<St: Stream> FusedFuture for PeekMut<'_, St> {
fn is_terminated(&self) -> bool {
self.inner.is_none()
}
}

impl<'a, St> Future for PeekMut<'a, St>
where
St: Stream,
{
type Output = Option<&'a mut St::Item>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let inner = self.project().inner;
if let Some(peekable) = inner {
ready!(peekable.as_mut().poll_peek_mut(cx));

inner.take().unwrap().poll_peek_mut(cx)
} else {
panic!("PeekMut polled after completion")
}
}
}

pin_project! {
/// Future for the [`Peekable::next_if`](self::Peekable::next_if) method.
#[must_use = "futures do nothing unless polled"]
Expand Down
22 changes: 22 additions & 0 deletions futures/tests/auto_traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,13 @@ pub mod future {
assert_not_impl!(PollFn<*const ()>: Sync);
assert_impl!(PollFn<PhantomPinned>: Unpin);

assert_impl!(PollImmediate<SendStream>: Send);
assert_not_impl!(PollImmediate<LocalStream<()>>: Send);
assert_impl!(PollImmediate<SyncStream>: Sync);
assert_not_impl!(PollImmediate<LocalStream<()>>: Sync);
assert_impl!(PollImmediate<UnpinStream>: Unpin);
assert_not_impl!(PollImmediate<PinnedStream>: Unpin);

assert_impl!(Ready<()>: Send);
assert_not_impl!(Ready<*const ()>: Send);
assert_impl!(Ready<()>: Sync);
Expand Down Expand Up @@ -1430,6 +1437,14 @@ pub mod stream {
assert_not_impl!(Peek<'_, LocalStream<()>>: Sync);
assert_impl!(Peek<'_, PinnedStream>: Unpin);

assert_impl!(PeekMut<'_, SendStream<()>>: Send);
assert_not_impl!(PeekMut<'_, SendStream>: Send);
assert_not_impl!(PeekMut<'_, LocalStream<()>>: Send);
assert_impl!(PeekMut<'_, SyncStream<()>>: Sync);
assert_not_impl!(PeekMut<'_, SyncStream>: Sync);
assert_not_impl!(PeekMut<'_, LocalStream<()>>: Sync);
assert_impl!(PeekMut<'_, PinnedStream>: Unpin);

assert_impl!(Peekable<SendStream<()>>: Send);
assert_not_impl!(Peekable<SendStream>: Send);
assert_not_impl!(Peekable<LocalStream>: Send);
Expand All @@ -1451,6 +1466,13 @@ pub mod stream {
assert_not_impl!(PollFn<*const ()>: Sync);
assert_impl!(PollFn<PhantomPinned>: Unpin);

assert_impl!(PollImmediate<SendStream>: Send);
assert_not_impl!(PollImmediate<LocalStream<()>>: Send);
assert_impl!(PollImmediate<SyncStream>: Sync);
assert_not_impl!(PollImmediate<LocalStream<()>>: Sync);
assert_impl!(PollImmediate<UnpinStream>: Unpin);
assert_not_impl!(PollImmediate<PinnedStream>: Unpin);

assert_impl!(ReadyChunks<SendStream<()>>: Send);
assert_not_impl!(ReadyChunks<SendStream>: Send);
assert_not_impl!(ReadyChunks<LocalStream>: Send);
Expand Down
14 changes: 14 additions & 0 deletions futures/tests/stream_peekable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,20 @@ fn peekable() {
});
}

#[test]
fn peekable_mut() {
block_on(async {
let s = stream::iter(vec![1u8, 2, 3]).peekable();
pin_mut!(s);
if let Some(p) = s.as_mut().peek_mut().await {
if *p == 1 {
*p = 5;
}
}
assert_eq!(s.collect::<Vec<_>>().await, vec![5, 2, 3]);
});
}

#[test]
fn peekable_next_if_eq() {
block_on(async {
Expand Down

0 comments on commit 8b39aa1

Please sign in to comment.