Skip to content

Commit

Permalink
Implement Stream for mpsc::Receiver. Add PollSender.
Browse files Browse the repository at this point in the history
Implement `futures::future::Stream` for `mpsc::Receiver`. Add
`mpsc::PollSender` that wrapps a mpsc::Sender and implements `Sink`.
  • Loading branch information
Felix Obenhuber committed Mar 8, 2023
1 parent a0e5740 commit c6f4bd0
Show file tree
Hide file tree
Showing 3 changed files with 172 additions and 2 deletions.
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@ std = ["alloc", "parking_lot"]
alloc = []
default = ["std"]
static = []
stream = ["futures"]

[dependencies]
pin-project = "1"
parking_lot = { version = "0.12", optional = true, default-features = false }
futures = { version = "0.3", default_features = false, optional = true }

[dev-dependencies]
tokio = { version = "1.14.0", features = ["rt", "rt-multi-thread", "macros", "sync"] }
Expand Down
167 changes: 167 additions & 0 deletions src/mpsc/async_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,160 @@ feature! {
}
}

feature! {
#![feature = "stream"]
use futures::sink::Sink;
use std::mem;

/// PollSender internal state.
enum PollSenderState<'a, T, R> {
Idle(Sender<T, R>),
Acquiring(Pin<Box<SendRefFuture<'a ,T, R>>>),
ReadyToSend(SendRef<'a, T>),
Closed,
}

/// A wrapper around mpsc::Sender that can be polled.
pub struct PollSender<'a, T, R> {
sender: Option<Sender<T, R>>,
state: PollSenderState<'a, T, R>,
}

impl<'a, T, R> PollSender<'a, T, R> where R: 'a + Recycle<T>, T: 'a {
/// Creates a new `PollSender`.
pub fn new(sender: Sender<T, R>) -> PollSender<'a, T, R> {
PollSender {
sender: Some(sender.clone()),
state: PollSenderState::Idle(sender),
}
}

pub fn poll_reserve(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Closed>> {
loop {
let (result, next_state) = match mem::replace(&mut self.state, PollSenderState::Closed) {
PollSenderState::Idle(sender) => {
let send_ref = Box::pin(SendRefFuture {
core: &sender.inner.core,
slots: sender.inner.slots.as_ref(),
recycle: &sender.inner.recycle,
state: State::Start,
waiter: queue::Waiter::new(),
});
// Start trying to acquire a permit to reserve a slot for our send, and
// immediately loop back around to poll it the first time.
(None, PollSenderState::Acquiring(send_ref))
}
PollSenderState::Acquiring(mut f) => match f.as_mut().poll(cx) {
// Channel has capacity.
Poll::Ready(Ok(send_ref)) => {
(Some(Poll::Ready(Ok(()))), PollSenderState::ReadyToSend(send_ref))
}
// Channel is closed.
Poll::Ready(Err(e)) => (Some(Poll::Ready(Err(e))), PollSenderState::Closed),
// Channel doesn't have capacity yet, so we need to wait.
Poll::Pending => (Some(Poll::Pending), PollSenderState::Acquiring(f)),
},
// We're closed, either by choice or because the underlying sender was closed.
s @ PollSenderState::Closed => (Some(Poll::Ready(Err(Closed(())))), s),
// We're already ready to send an item.
s @ PollSenderState::ReadyToSend(_) => (Some(Poll::Ready(Ok(()))), s),
};

self.state = next_state;
if let Some(result) = result {
return result;
}
}
}

/// Sends an item to the channel.
///
/// Before calling `send_item`, `poll_reserve` must be called with a successful return
/// value of `Poll::Ready(Ok(()))`.
///
/// # Errors
///
/// If the channel is closed, an error will be returned. This is a permanent state.
///
/// # Panics
///
/// If `poll_reserve` was not successfully called prior to calling `send_item`, then this method
/// will panic.
pub fn send_item(&mut self, value: T) -> Result<(), Closed<T>> {
let (result, next_state) = match mem::replace(&mut self.state, PollSenderState::Closed) {
PollSenderState::Idle(_) | PollSenderState::Acquiring(_) => {
panic!("`send_item` called without first calling `poll_reserve`")
}
// We have a permit to send our item, so go ahead, which gets us our sender back.
PollSenderState::ReadyToSend(mut send_ref) => {
*send_ref = value;
match &self.sender {
Some(sender) => (Ok(()), PollSenderState::<T, R>::Idle(sender.clone())),
None => (Ok(()), PollSenderState::Closed), // Closed in between.
}
},
// We're closed, either by choice or because the underlying sender was closed.
PollSenderState::Closed => (Err(Closed(value)), PollSenderState::Closed),
};

// Handle deferred closing if `close` was called between `poll_reserve` and `send_item`.
self.state = if self.sender.is_some() {
next_state
} else {
PollSenderState::Closed
};

result
}

/// Checks whether this sender is been closed.
///
/// The underlying channel that this sender was wrapping may still be open.
pub fn is_closed(&'a self) -> bool {
matches!(self.state, PollSenderState::Closed) || self.sender.is_none()
}

/// Gets a reference to the `Sender` of the underlying channel.
///
/// If `PollSender` has been closed, `None` is returned. The underlying channel that this sender
/// was wrapping may still be open.
pub fn get_ref(&self) -> Option<&Sender<T, R>> {
self.sender.as_ref()
}

/// Closes this sender.
///
/// No more messages will be able to be sent from this sender, but the underlying channel will
/// remain open until all senders have dropped, or until the [`Receiver`] closes the channel.
pub fn close(&mut self) {
// Mark ourselves officially closed by dropping our main sender.
self.sender = None;
self.state = PollSenderState::Closed;
}
}

impl<'a, T, R> Sink<T> for PollSender<'a, T, R> where T: 'a + Default, R: 'a + Recycle<T> {
type Error = Closed<T>;

fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Pin::into_inner(self).poll_reserve(cx).map_err(|_| Closed(T::default()))
}

fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
Pin::into_inner(self).send_item(item)
}

fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}

fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Pin::into_inner(self).close();
Poll::Ready(Ok(()))
}
}
}

// === impl Receiver ===

impl<T, R> Receiver<T, R> {
Expand Down Expand Up @@ -589,6 +743,19 @@ feature! {
}
}

feature! {
#![feature = "stream"]
use futures::stream::Stream;

impl<T: Default + Clone> Stream for Receiver<T> {
type Item = T;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.poll_recv(cx)
}
}
}

impl<T, R: fmt::Debug> fmt::Debug for Inner<T, R> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Inner")
Expand Down
5 changes: 3 additions & 2 deletions src/mpsc/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,14 @@ pub enum TryRecvError {
}

/// Error returned by [`Sender::send`] or [`Sender::send_ref`] (and
/// [`StaticSender::send`]/[`StaticSender::send_ref`]), if the
/// [`Receiver`] half of the channel has been dropped.
/// [`StaticSender::send`]/[`StaticSender::send_ref`]/[`PollSender::Send`]),
/// if the [`Receiver`] half of the channel has been dropped.
///
/// [`Sender::send`]: super::Sender::send
/// [`Sender::send_ref`]: super::Sender::send_ref
/// [`StaticSender::send`]: super::StaticSender::send
/// [`StaticSender::send_ref`]: super::StaticSender::send_ref
/// [`PollSender::send`]: super::PollSender::send
/// [`Receiver`]: super::Receiver
#[derive(PartialEq, Eq)]
pub struct Closed<T = ()>(pub(crate) T);
Expand Down

0 comments on commit c6f4bd0

Please sign in to comment.