-
-
Notifications
You must be signed in to change notification settings - Fork 25
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement Stream for mpsc::Receiver. Add PollSender. #74
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -281,6 +281,161 @@ feature! { | |
} | ||
} | ||
|
||
feature! { | ||
#![feature = "stream"] | ||
use futures::sink::Sink; | ||
use std::mem; | ||
|
||
/// PollSender internal state. | ||
enum PollSenderState<'a, T, R> { | ||
Idle(Sender<T, R>), | ||
Acquiring(Pin<Box<SendRefFuture<'a ,T, R>>>), | ||
ReadyToSend(SendRef<'a, T>), | ||
Closed, | ||
} | ||
|
||
/// A wrapper around mpsc::Sender that can be polled. | ||
pub struct PollSender<'a, T, R> { | ||
sender: Option<Sender<T, R>>, | ||
state: PollSenderState<'a, T, R>, | ||
} | ||
|
||
impl<'a, T, R> PollSender<'a, T, R> where R: 'a + Recycle<T>, T: 'a { | ||
/// Creates a new `PollSender`. | ||
pub fn new(sender: Sender<T, R>) -> PollSender<'a, T, R> { | ||
PollSender { | ||
sender: Some(sender.clone()), | ||
state: PollSenderState::Idle(sender), | ||
} | ||
} | ||
|
||
/// Attempts to reserve a slot in the channel. | ||
pub fn poll_reserve(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Closed>> { | ||
loop { | ||
let (result, next_state) = match mem::replace(&mut self.state, PollSenderState::Closed) { | ||
PollSenderState::Idle(sender) => { | ||
let send_ref = Box::pin(SendRefFuture { | ||
core: &sender.inner.core, | ||
slots: sender.inner.slots.as_ref(), | ||
recycle: &sender.inner.recycle, | ||
state: State::Start, | ||
waiter: queue::Waiter::new(), | ||
}); | ||
Comment on lines
+317
to
+323
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. hmm, it's unfortunate that every time a
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok. Sure! I saw |
||
// Start trying to acquire a permit to reserve a slot for our send, and | ||
// immediately loop back around to poll it the first time. | ||
(None, PollSenderState::Acquiring(send_ref)) | ||
} | ||
PollSenderState::Acquiring(mut f) => match f.as_mut().poll(cx) { | ||
// Channel has capacity. | ||
Poll::Ready(Ok(send_ref)) => { | ||
(Some(Poll::Ready(Ok(()))), PollSenderState::ReadyToSend(send_ref)) | ||
} | ||
// Channel is closed. | ||
Poll::Ready(Err(e)) => (Some(Poll::Ready(Err(e))), PollSenderState::Closed), | ||
// Channel doesn't have capacity yet, so we need to wait. | ||
Poll::Pending => (Some(Poll::Pending), PollSenderState::Acquiring(f)), | ||
}, | ||
// We're closed, either by choice or because the underlying sender was closed. | ||
s @ PollSenderState::Closed => (Some(Poll::Ready(Err(Closed(())))), s), | ||
// We're already ready to send an item. | ||
s @ PollSenderState::ReadyToSend(_) => (Some(Poll::Ready(Ok(()))), s), | ||
}; | ||
|
||
self.state = next_state; | ||
if let Some(result) = result { | ||
return result; | ||
} | ||
} | ||
} | ||
|
||
/// Sends an item to the channel. | ||
/// | ||
/// Before calling `send_item`, `poll_reserve` must be called with a successful return | ||
/// value of `Poll::Ready(Ok(()))`. | ||
/// | ||
/// # Errors | ||
/// | ||
/// If the channel is closed, an error will be returned. This is a permanent state. | ||
/// | ||
/// # Panics | ||
/// | ||
/// If `poll_reserve` was not successfully called prior to calling `send_item`, then this method | ||
/// will panic. | ||
pub fn send_item(&mut self, value: T) -> Result<(), Closed<T>> { | ||
let (result, next_state) = match mem::replace(&mut self.state, PollSenderState::Closed) { | ||
PollSenderState::Idle(_) | PollSenderState::Acquiring(_) => { | ||
panic!("`send_item` called without first calling `poll_reserve`") | ||
} | ||
// We have a permit to send our item, so go ahead, which gets us our sender back. | ||
PollSenderState::ReadyToSend(mut send_ref) => { | ||
*send_ref = value; | ||
match &self.sender { | ||
Some(sender) => (Ok(()), PollSenderState::<T, R>::Idle(sender.clone())), | ||
None => (Ok(()), PollSenderState::Closed), // Closed in between. | ||
} | ||
}, | ||
// We're closed, either by choice or because the underlying sender was closed. | ||
PollSenderState::Closed => (Err(Closed(value)), PollSenderState::Closed), | ||
}; | ||
|
||
// Handle deferred closing if `close` was called between `poll_reserve` and `send_item`. | ||
self.state = if self.sender.is_some() { | ||
next_state | ||
} else { | ||
PollSenderState::Closed | ||
}; | ||
|
||
result | ||
} | ||
|
||
/// Checks whether this sender is been closed. | ||
/// | ||
/// The underlying channel that this sender was wrapping may still be open. | ||
pub fn is_closed(&'a self) -> bool { | ||
matches!(self.state, PollSenderState::Closed) || self.sender.is_none() | ||
} | ||
|
||
/// Gets a reference to the `Sender` of the underlying channel. | ||
/// | ||
/// If `PollSender` has been closed, `None` is returned. The underlying channel that this sender | ||
/// was wrapping may still be open. | ||
pub fn get_ref(&self) -> Option<&Sender<T, R>> { | ||
self.sender.as_ref() | ||
} | ||
|
||
/// Closes this sender. | ||
/// | ||
/// No more messages will be able to be sent from this sender, but the underlying channel will | ||
/// remain open until all senders have dropped, or until the [`Receiver`] closes the channel. | ||
pub fn close(&mut self) { | ||
// Mark ourselves officially closed by dropping our main sender. | ||
self.sender = None; | ||
self.state = PollSenderState::Closed; | ||
} | ||
} | ||
|
||
impl<'a, T, R> Sink<T> for PollSender<'a, T, R> where T: 'a + Default, R: 'a + Recycle<T> { | ||
type Error = Closed<T>; | ||
|
||
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { | ||
Pin::into_inner(self).poll_reserve(cx).map_err(|_| Closed(T::default())) | ||
} | ||
|
||
fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> { | ||
Pin::into_inner(self).send_item(item) | ||
} | ||
|
||
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { | ||
Poll::Ready(Ok(())) | ||
} | ||
|
||
fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { | ||
Pin::into_inner(self).close(); | ||
Poll::Ready(Ok(())) | ||
} | ||
} | ||
} | ||
|
||
// === impl Receiver === | ||
|
||
impl<T, R> Receiver<T, R> { | ||
|
@@ -589,6 +744,19 @@ feature! { | |
} | ||
} | ||
|
||
feature! { | ||
#![feature = "stream"] | ||
use futures::stream::Stream; | ||
|
||
impl<T: Default + Clone> Stream for Receiver<T> { | ||
type Item = T; | ||
|
||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { | ||
self.poll_recv(cx) | ||
} | ||
} | ||
} | ||
|
||
impl<T, R: fmt::Debug> fmt::Debug for Inner<T, R> { | ||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { | ||
f.debug_struct("Inner") | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it feels a little strange to me that the
poll_reserve
method stores theSendRef
inside thePollSender
and then consumes it whensend
is called, rather than just havingpoll_reserve
return aSendRef
to the user that can then be used to write a value. this API would allow the user to write to the slot in place through theSendRef
, which isn't currently possible with the current API.OTTOH, we would still need to store the
SendRef
internally in order to make theSink
impl work, but I think we could change thepoll_reserve
function to return aSendRef
, and only store thatSendRef
internally whenSink::poll_ready
is called? Of course, in order for that to work, we would need to add an ownedSendRef
variant, as you mentioned.This would make the API somewhat different from
tokio_util::sync
'sPollSender
, but I think it's worth it to allow the user to actually write to a slot in place through aSendRef
. If we wanted to present an identical API totokio_util::sync::PollSender
, we could also add a new poll method that returns aSendRef
, and havepoll_reserve
just call that method and then store the returnedSendRef
if it returnsPoll::Ready
...what do you think?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. I used the
PollSender
approach in order to not invent something new for something in place - but yes - the ability to directly return theSendRef
is plausible. Having a different implementation thanPollSender
is not a problem for me personally because I assume that in the "normal" cases theStream
/Sink
API is used.If anyone calls
poll_reserve
, they normally know that they do and a difference toPollSender
doesn't matter at all.