Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Stream for mpsc::Receiver. Add PollSender. #74

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

flxo
Copy link

@flxo flxo commented Feb 15, 2023

Implement futures::future::Stream for mpsc::Receiver. Add mpsc::PollSender that wrapps a mpsc::Sender and implements Sink. The PollSender implementation is borrowed from tokio-util::sync::PollSender and slightly adopted.

There's an lifetime issue, I'm currently unsure how to solve because SendRefFuture needs 'sender, so the PR is marked as draft.

Fixes #19

@flxo
Copy link
Author

flxo commented Mar 8, 2023

@hawkw I'm stuck with a lifetime issue. Think this can be only solved by introducing kind of an OwnedSendRefFuture that keeps a copy of Sender<..> but this is a bigger change I'd like to get your opinion on that first.

Thanks!

@flxo flxo force-pushed the main branch 2 times, most recently from 2df0ec3 to 7ef5eb2 Compare March 8, 2023 13:24
Copy link
Owner

@hawkw hawkw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, sorry I didn't see this PR sooner! I'd love to merge something like this. I had a suggestion regarding the implementation.

Regarding:

@hawkw I'm stuck with a lifetime issue. Think this can be only solved by introducing kind of an OwnedSendRefFuture that keeps a copy of Sender<..> but this is a bigger change I'd > like to get your opinion on that first.

I'd be open to that approach. Since Senders are either internally reference counted or are internally &'static references to a static array, cloning them is inexpensive, so it seems fine to add an owned variant. Would you mind doing that in a separate PR that this PR can be rebased onto? Thanks!

I also left some comments on the proposed API. Let me know what you think of them. Thank you!

Comment on lines +317 to +323
let send_ref = Box::pin(SendRefFuture {
core: &sender.inner.core,
slots: sender.inner.slots.as_ref(),
recycle: &sender.inner.recycle,
state: State::Start,
waiter: queue::Waiter::new(),
});
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, it's unfortunate that every time a poll_sender sends a new message, we'll allocate a new Box which is dropped when that message is sent. it would be preferable if there was a single heap allocation that lives for the entire lifetime of the sender, instead.

tokio_util's PollSender uses a ReusableBoxFuture type for this. I would be open to adding an optional tokio-util dependency that's only enabled when the pollable sender implementation is used, if it allowed us to avoid the allocation that occurs on every poll_reserve call...

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. Sure! I saw ReusableBoxFuture but have been afraid of adding an external dependent.

enum PollSenderState<'a, T, R> {
Idle(Sender<T, R>),
Acquiring(Pin<Box<SendRefFuture<'a ,T, R>>>),
ReadyToSend(SendRef<'a, T>),
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it feels a little strange to me that the poll_reserve method stores the SendRef inside the PollSender and then consumes it when send is called, rather than just having poll_reserve return a SendRef to the user that can then be used to write a value. this API would allow the user to write to the slot in place through the SendRef, which isn't currently possible with the current API.

OTTOH, we would still need to store the SendRef internally in order to make the Sink impl work, but I think we could change the poll_reserve function to return a SendRef, and only store that SendRef internally when Sink::poll_ready is called? Of course, in order for that to work, we would need to add an owned SendRef variant, as you mentioned.

This would make the API somewhat different from tokio_util::sync's PollSender, but I think it's worth it to allow the user to actually write to a slot in place through a SendRef. If we wanted to present an identical API to tokio_util::sync::PollSender, we could also add a new poll method that returns a SendRef, and have poll_reserve just call that method and then store the returned SendRef if it returns Poll::Ready...what do you think?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. I used the PollSender approach in order to not invent something new for something in place - but yes - the ability to directly return the SendRef is plausible. Having a different implementation than PollSender is not a problem for me personally because I assume that in the "normal" cases the Stream/Sink API is used.
If anyone calls poll_reserve, they normally know that they do and a difference to PollSender doesn't matter at all.

@hawkw
Copy link
Owner

hawkw commented Apr 28, 2023

FWIW, there is already an open issue for adding owned variants to SendRef etc: #62

Implement `futures::future::Stream` for `mpsc::Receiver`. Add
`mpsc::PollSender` that wrapps a mpsc::Sender and implements `Sink`.
@flxo
Copy link
Author

flxo commented May 3, 2023

Hi, sorry I didn't see this PR sooner! I'd love to merge something like this. I had a suggestion regarding the implementation.

Regarding:

@hawkw I'm stuck with a lifetime issue. Think this can be only solved by introducing kind of an OwnedSendRefFuture that keeps a copy of Sender<..> but this is a bigger change I'd > like to get your opinion on that first.

I'd be open to that approach. Since Senders are either internally reference counted or are internally &'static references to a static array, cloning them is inexpensive, so it seems fine to add an owned variant. Would you mind doing that in a separate PR that this PR can be rebased onto? Thanks!

Thanks for you reply. I totally agree that a separate PR with the owned send refs makes sense. I'd pause this PR until that is done.
I'm willing to implement that but probably won't be able to do so before July (I will be off any keyboard for two month :-)). So if anyone else wan't to - please.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Stream (and Sink?) impls for async MPSC
2 participants