Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Stream for mpsc::Receiver. Add PollSender. #74

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@ std = ["alloc", "parking_lot"]
alloc = []
default = ["std"]
static = []
stream = ["futures"]

[dependencies]
pin-project = "1"
parking_lot = { version = "0.12", optional = true, default-features = false }
futures = { version = "0.3", default_features = false, optional = true }

[dev-dependencies]
tokio = { version = "1.14.0", features = ["rt", "rt-multi-thread", "macros", "sync"] }
Expand Down
168 changes: 168 additions & 0 deletions src/mpsc/async_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,161 @@ feature! {
}
}

feature! {
#![feature = "stream"]
use futures::sink::Sink;
use std::mem;

/// PollSender internal state.
enum PollSenderState<'a, T, R> {
Idle(Sender<T, R>),
Acquiring(Pin<Box<SendRefFuture<'a ,T, R>>>),
ReadyToSend(SendRef<'a, T>),
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it feels a little strange to me that the poll_reserve method stores the SendRef inside the PollSender and then consumes it when send is called, rather than just having poll_reserve return a SendRef to the user that can then be used to write a value. this API would allow the user to write to the slot in place through the SendRef, which isn't currently possible with the current API.

OTTOH, we would still need to store the SendRef internally in order to make the Sink impl work, but I think we could change the poll_reserve function to return a SendRef, and only store that SendRef internally when Sink::poll_ready is called? Of course, in order for that to work, we would need to add an owned SendRef variant, as you mentioned.

This would make the API somewhat different from tokio_util::sync's PollSender, but I think it's worth it to allow the user to actually write to a slot in place through a SendRef. If we wanted to present an identical API to tokio_util::sync::PollSender, we could also add a new poll method that returns a SendRef, and have poll_reserve just call that method and then store the returned SendRef if it returns Poll::Ready...what do you think?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. I used the PollSender approach in order to not invent something new for something in place - but yes - the ability to directly return the SendRef is plausible. Having a different implementation than PollSender is not a problem for me personally because I assume that in the "normal" cases the Stream/Sink API is used.
If anyone calls poll_reserve, they normally know that they do and a difference to PollSender doesn't matter at all.

Closed,
}

/// A wrapper around mpsc::Sender that can be polled.
pub struct PollSender<'a, T, R> {
sender: Option<Sender<T, R>>,
state: PollSenderState<'a, T, R>,
}

impl<'a, T, R> PollSender<'a, T, R> where R: 'a + Recycle<T>, T: 'a {
/// Creates a new `PollSender`.
pub fn new(sender: Sender<T, R>) -> PollSender<'a, T, R> {
PollSender {
sender: Some(sender.clone()),
state: PollSenderState::Idle(sender),
}
}

/// Attempts to reserve a slot in the channel.
pub fn poll_reserve(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Closed>> {
loop {
let (result, next_state) = match mem::replace(&mut self.state, PollSenderState::Closed) {
PollSenderState::Idle(sender) => {
let send_ref = Box::pin(SendRefFuture {
core: &sender.inner.core,
slots: sender.inner.slots.as_ref(),
recycle: &sender.inner.recycle,
state: State::Start,
waiter: queue::Waiter::new(),
});
Comment on lines +317 to +323
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, it's unfortunate that every time a poll_sender sends a new message, we'll allocate a new Box which is dropped when that message is sent. it would be preferable if there was a single heap allocation that lives for the entire lifetime of the sender, instead.

tokio_util's PollSender uses a ReusableBoxFuture type for this. I would be open to adding an optional tokio-util dependency that's only enabled when the pollable sender implementation is used, if it allowed us to avoid the allocation that occurs on every poll_reserve call...

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. Sure! I saw ReusableBoxFuture but have been afraid of adding an external dependent.

// Start trying to acquire a permit to reserve a slot for our send, and
// immediately loop back around to poll it the first time.
(None, PollSenderState::Acquiring(send_ref))
}
PollSenderState::Acquiring(mut f) => match f.as_mut().poll(cx) {
// Channel has capacity.
Poll::Ready(Ok(send_ref)) => {
(Some(Poll::Ready(Ok(()))), PollSenderState::ReadyToSend(send_ref))
}
// Channel is closed.
Poll::Ready(Err(e)) => (Some(Poll::Ready(Err(e))), PollSenderState::Closed),
// Channel doesn't have capacity yet, so we need to wait.
Poll::Pending => (Some(Poll::Pending), PollSenderState::Acquiring(f)),
},
// We're closed, either by choice or because the underlying sender was closed.
s @ PollSenderState::Closed => (Some(Poll::Ready(Err(Closed(())))), s),
// We're already ready to send an item.
s @ PollSenderState::ReadyToSend(_) => (Some(Poll::Ready(Ok(()))), s),
};

self.state = next_state;
if let Some(result) = result {
return result;
}
}
}

/// Sends an item to the channel.
///
/// Before calling `send_item`, `poll_reserve` must be called with a successful return
/// value of `Poll::Ready(Ok(()))`.
///
/// # Errors
///
/// If the channel is closed, an error will be returned. This is a permanent state.
///
/// # Panics
///
/// If `poll_reserve` was not successfully called prior to calling `send_item`, then this method
/// will panic.
pub fn send_item(&mut self, value: T) -> Result<(), Closed<T>> {
let (result, next_state) = match mem::replace(&mut self.state, PollSenderState::Closed) {
PollSenderState::Idle(_) | PollSenderState::Acquiring(_) => {
panic!("`send_item` called without first calling `poll_reserve`")
}
// We have a permit to send our item, so go ahead, which gets us our sender back.
PollSenderState::ReadyToSend(mut send_ref) => {
*send_ref = value;
match &self.sender {
Some(sender) => (Ok(()), PollSenderState::<T, R>::Idle(sender.clone())),
None => (Ok(()), PollSenderState::Closed), // Closed in between.
}
},
// We're closed, either by choice or because the underlying sender was closed.
PollSenderState::Closed => (Err(Closed(value)), PollSenderState::Closed),
};

// Handle deferred closing if `close` was called between `poll_reserve` and `send_item`.
self.state = if self.sender.is_some() {
next_state
} else {
PollSenderState::Closed
};

result
}

/// Checks whether this sender is been closed.
///
/// The underlying channel that this sender was wrapping may still be open.
pub fn is_closed(&'a self) -> bool {
matches!(self.state, PollSenderState::Closed) || self.sender.is_none()
}

/// Gets a reference to the `Sender` of the underlying channel.
///
/// If `PollSender` has been closed, `None` is returned. The underlying channel that this sender
/// was wrapping may still be open.
pub fn get_ref(&self) -> Option<&Sender<T, R>> {
self.sender.as_ref()
}

/// Closes this sender.
///
/// No more messages will be able to be sent from this sender, but the underlying channel will
/// remain open until all senders have dropped, or until the [`Receiver`] closes the channel.
pub fn close(&mut self) {
// Mark ourselves officially closed by dropping our main sender.
self.sender = None;
self.state = PollSenderState::Closed;
}
}

impl<'a, T, R> Sink<T> for PollSender<'a, T, R> where T: 'a + Default, R: 'a + Recycle<T> {
type Error = Closed<T>;

fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Pin::into_inner(self).poll_reserve(cx).map_err(|_| Closed(T::default()))
}

fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
Pin::into_inner(self).send_item(item)
}

fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}

fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Pin::into_inner(self).close();
Poll::Ready(Ok(()))
}
}
}

// === impl Receiver ===

impl<T, R> Receiver<T, R> {
Expand Down Expand Up @@ -589,6 +744,19 @@ feature! {
}
}

feature! {
#![feature = "stream"]
use futures::stream::Stream;

impl<T: Default + Clone> Stream for Receiver<T> {
type Item = T;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.poll_recv(cx)
}
}
}

impl<T, R: fmt::Debug> fmt::Debug for Inner<T, R> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Inner")
Expand Down
5 changes: 3 additions & 2 deletions src/mpsc/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,14 @@ pub enum TryRecvError {
}

/// Error returned by [`Sender::send`] or [`Sender::send_ref`] (and
/// [`StaticSender::send`]/[`StaticSender::send_ref`]), if the
/// [`Receiver`] half of the channel has been dropped.
/// [`StaticSender::send`]/[`StaticSender::send_ref`]/[`PollSender::poll_reserve`]),
/// if the [`Receiver`] half of the channel has been dropped.
///
/// [`Sender::send`]: super::Sender::send
/// [`Sender::send_ref`]: super::Sender::send_ref
/// [`StaticSender::send`]: super::StaticSender::send
/// [`StaticSender::send_ref`]: super::StaticSender::send_ref
/// [`PollSender::poll_reserve`]: super::PollSender::poll_reserve
/// [`Receiver`]: super::Receiver
#[derive(PartialEq, Eq)]
pub struct Closed<T = ()>(pub(crate) T);
Expand Down