Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Redefine Stream/Sink/AsyncRead/AsyncWrite/etc on top of Future #1365

Closed
Matthias247 opened this issue Dec 8, 2018 · 20 comments
Closed

Redefine Stream/Sink/AsyncRead/AsyncWrite/etc on top of Future #1365

Matthias247 opened this issue Dec 8, 2018 · 20 comments

Comments

@Matthias247
Copy link
Contributor

I think in the async/await world it might be preferable to redefine the traits and types like the ones mentioned in the title on top of Futures, and e.g. move from poll_something() signatures to methods that return Futures.

E.g.

trait Stream<T> {
    type Item: Future<Item=T>;
    fn read(&mut self) -> Self::Item;
    // or something along the following, whatever works best
    fn read(&mut self) -> impl Future<Item = T>
}

The important reason for this, that it should be possible to implement those types on top of Futures as primitives, which isn't possible today.

As a motivating example I'm thinking about a MPMC channel, which can be built around some async primitives as shown below:

struct ChannelImpl<T> {
    state: AsyncMutex<Queue<T>>;
    has_space: AsyncManualResetEvent;
    has_element: AsyncManualResetEvent;
}

impl ChannelImpl<T> {
    async fn write(&mut self, item: T) {
        loop {
            await!(self.has_space.wait());
            let mut queue = await!(self.state.lock());
            if queue.free_space() == 0 {
                continue;
            }
            queue.push(item);
            break;
       }
       self.has_element.set();
    }

    async fn read(&mut self) -> T {
        let mut result: Option<T> = None;
        loop {
            await!(self.has_element.wait());
            let mut queue = await!(self.state.lock());
            if queue.len() == 0 {
                continue;
            }
            result = Some(queue.pop());
            break;
       }
       self.has_space.set();
       result.unwrap()
    }
}

I'm not yet sure if the current state of async functions and traits allow all this (please educate me!), but I think this should be one of the goals that async programming in Rust should enable.

This kind of implementation is however currently not wrappable in a Stream or Sink implementation.
We can't stuff this kind of code inside a poll_xyz method of a Stream/etc, since it requires temporary Futures that all carry their own poll state to be instantiated in the task. Those Futures must maintain their state between calls to poll(). E.g. polling an asynchronous mutex the first time creates a registration in the waiting list, and dropping it cancels the registration. If we would try to use an asynchronous mutex inside a poll_next method, we must always need to drop it before poll_next() returns, which would cancel the registration and never allow use to retrieve the mutex.

I think the more general observation might be, that Futures allow for persisting some additional temporary state per operation, and provide additional lifecycle hooks per operation (Future gets polled for the first time, and Future gets dropped) compared to the current Stream definitions.

Another side effect is that currently Streams/etc must all support pinning. In Streams that return Futures that wouldn't be the case, since the Futures get pinned. They might indirectly get pinned because the respective Futures might reference the Stream through a reference and a certain lifetime, but that relationship is already directly checked through Rusts normal means, and e.g. doesn't put special requirements regarding pinning on the implementation of the type. And often we would Streams being moveable after they have been partly consumed (e.g. a Tcp Socket), which is not allowed if they are pinned. Of course some of them might be Unpin, but this kind of change would allow moves again for all Streams.

@Nemo157
Copy link
Member

Nemo157 commented Dec 9, 2018

Having the core traits return futures directly are probably blocked on GATs (or all the way to async methods, although I’m hopeful that there will be good cross-compatibility between async methods and their underlying GAT representation). While it is possible to have a stream that can produce the futures for more than one item at a time, I think the common case is that the stream can only produce a future for the next item. For example a trait like

trait Stream {
    type Output;
    type Next<'a>: Future<Item = Option<Self::Output>> + 'a;

    fn next(self: Pin<&mut self>) -> Self::Next<'_>;
}

I reintroduced Pin to the method as I have a usecase where the stream must be pinned from the first call to next until it is dropped, doing the setup and teardown of the immovable part on every call to next would be too expensive. I’m not sure how common this might be, so I guess I could impl Stream for Pin<&mut MyStream> instead and eat the cost of a double indirection if pinning is dropped from the Stream API. Maybe if we have some way to construct streams via async fn this would be common enough to keep it as part of the trait.

@Matthias247
Copy link
Contributor Author

Having the core traits return futures directly are probably blocked on GATs (or all the way to async methods, although I’m hopeful that there will be good cross-compatibility between async methods and their underlying GAT representation).

I was afraid of that. I don't know yet what GATs exactly are but will try read up on them. Can you elaborate how a signature for a stream would need to look like in order to make it implementable by future composition as outlined above?

I would say as long as that is not possible, we would maybe face the general issue that it's not possible to define abstractions around types that are implemented by async functions. And e.g if I built an async powered HTTP client, I can't built an interface/trait around it that I use for unit-testing an dependency-injection in other code.

While it is possible to have a stream that can produce the futures for more than one item at a time, I think the common case is that the stream can only produce a future for the next item.

You are right, most streams will only support producing a single item at a time. Supporting more would require internal queuing (which can easily be implemented through AsyncMutex in some situations, but is hard for manually implemented Streams). E.g. if we look at other implementations glib only does support one concurrent outstanding operation per IO type. WinAPI/IOCP supports more (that's what the term Overlapped tries to describe), but there's barely a need for it.

I think that aspect could be most cleanly solved by the type system, if reading from the stream produces a Future and consumes the stream until the operation had been finished - where we get it back. Allowing the creation of multiple futures, and then either blocking in their implementation (via AsyncMutex) or reporting an error if no concurrent operations are allowed is another option.

I reintroduced Pin to the method as I have a usecase where the stream must be pinned from the first call to next until it is dropped, doing the setup and teardown of the immovable part on every call to next would be too expensive. I’m not sure how common this might be, so I guess I could impl Stream for Pin<&mut MyStream> instead and eat the cost of a double indirection if pinning is dropped from the Stream API

I can see that some Streams must have stable addresses, but I'm not sure whether that fact needs to be propagated or is an implementation detail. E.g many streams will be be implemented through Arc<Mutex<InnerStream>>, which effectively pins the inner stream, but that doesn't leak into the API and still makes moveable. What's your use-case for this?

@Nemo157
Copy link
Member

Nemo157 commented Dec 10, 2018

GATs are generic associated types, that's the feature that allows parameterizing the Next associated type by the lifetime 'a in my example.

I think that aspect could be most cleanly solved by the type system, if reading from the stream produces a Future and consumes the stream until the operation had been finished - where we get it back.

This is where having GATs is useful, rather than consuming the stream and receiving it back once complete you can borrow the stream, and have that borrow last until the future is complete/dropped.

What's your use-case for this?

My use case was reading a stream of values from a radio peripheral in a microcontroller. By using pinning the stream can have an internal buffer that is written to by the radio using DMA, without pinning this buffer would need to be externally allocated and passed in as a reference (or heap allocated, but this was for a no_std project).

@Matthias247
Copy link
Contributor Author

This is where having GATs is useful, rather than consuming the stream and receiving it back once complete you can borrow the stream, and have that borrow last until the future is complete/dropped.

Ah, that makes sense. So the following wouldn't work, because without GAT we can't put the lifetime on the item?

trait Stream {
    type Output;
    type Next: Future<Item = Self::Output>;

    fn next(self: &'a mut) -> Self::Next<'a>; // Or is that Self::Next + 'a ?
}

After reading GATs I understand it makes lifetimes more flexible, but I wasn't sure whether something is possible without them. It should be OK for this use-case if the lifetime of the returned Future is bound to the stream.

My use case was reading a stream of values from a radio peripheral in a microcontroller. By using pinning the stream can have an internal buffer that is written to by the radio using DMA, without pinning this buffer would need to be externally allocated and passed in as a reference (or heap allocated, but this was for a no_std project).

Ah, cool idea to represent it as a stream. I think however your requirement is more the typical pain point of an embedded system: One shouldn't dynamically allocate, and therefore ideally all buffers/etc. should be contained inside the type.

However without e.g. const generics that's already painful, and even with I don't find the embedding approach super appealing (there's an endless forwarding of type parameters).
I guess I would go with either the externally allocated route (which gives the stream a lifetime, but that can be 'static for a global buffer). Or maybe cleaner: Create a simple bump allocator for the platform, and change the strategy from "don't allocate at all" to "only allocate at startup".

@Matthias247
Copy link
Contributor Author

Matthias247 commented Dec 11, 2018

I tried things out. I seems to be only possible to define this at the moment:

trait Stream<'a> {
    type Output;
    type Next: Future<Item = Self::Output> + 'a;

    fn next(&'a mut self) -> Self::Next;
}

which is arguably not nice due to the lifetime which needlessly is on the trait.
Haven't yet tried out whether that's implementable through async function composition.

@aturon
Copy link
Member

aturon commented Dec 13, 2018

cc @withoutboats @cramertj

@cramertj
Copy link
Member

I discussed this with @erickt elsewhere and explained my thoughts. As the ecosystem stands, it's not possible to make this work without making all of these traits non-object-safe, so I'm not interested in pursuing this at the moment.

@seanmonstar
Copy link
Contributor

One thing that come to mind with regards to AsyncRead/AsyncWrite, if the poll_read/poll_write methods are removed, and there are only async fns, then we can't really wait on both the read and the write future, since both will want to mutably borrow the transport and not release the borrow until complete.

@Matthias247
Copy link
Contributor Author

@cramertj: would you mind sharing these publicly at some point? I guess that would be interesting for others too. The implications on the ecosystem are totally understood. Although the current changes from 0.1 already are lots of those smaller renaming things. And this would mostly be another. We can still have the old things around, with adapters to the new variants. This direction is possible to bridge. However it’s not possible to move future based thingies into the current signatures without costs (eg extra allocations)

@seanmonstar: I think this should still be possible as all types that support read and write should also support a split method for getting individual halves in order to make them more flexible. Then you can get a future from the reader and one from the writer.
In lots of Protocol implementations i would want to give ownership of the writer to some writing task, and keep the reader in the reading task. Then I need split anyway.

@cramertj
Copy link
Member

@Matthias247 Oh sorry, I don't have a large writeup or anything, I just mean't we had discussed it. If there were a way to return a dyn<size_of::<usize>> Future<Output = io::Result<()>> (dynamic future with maximum one-pointer size) I'd support that, but that feature doesn't exist yet.

@Matthias247
Copy link
Contributor Author

Ok. Let's focus on this issue, instead of spreading the discussion out.

I think e.g. AsyncWrite should be de defined along:

trait AsyncWrite {
    type Error;
    type WriteFut<'a>: Future<Output = Result<usize, Self::Error>> + 'a;

    fn write(&mut self, bytes: &[u8]) -> Self:: WriteFut <'_>;
}

which is mostly a copy of the well-known synchronous variant, and behaves just like it when combined with await.

The obvious problem is that this won't work yet without GATs. For Streams/Sinks we can work around this by requiring them to be implemented through Arcs, which at least for std environments isn't a big issue. That removes the lifetime from the returned Futures.

I am aware that the inability to create trait objects is an issue. But not being able to compose Futures and other async traits seems like an even bigger one to me currently.

Just as a motivating example: I have written a completely async HTTP/2 library a while ago, which I still think has a reasonably simple API and implementation. The API provides users of the library some byte stream based interfaces, which are similar to AsyncWrite/Read: https://github.com/Matthias247/http2dotnet/blob/master/Http2/ByteStreams.cs

The implementation of this interface is heavily based on top of Future/Task composition, as it can be seen here: https://github.com/Matthias247/http2dotnet/blob/master/Http2/StreamImpl.cs#L520-L591

Writing data includes blocking on async mutexes for synchronization of long-lived tasks, sending data asynchronously to a background task which performs all IO multiplexing and more async waiting until the data has been actually flushed. This can't really be done with the the current Rust traits. It seems like it would be back to manually implementing state-machines for all those operations.

Therefore I would really like to see a mechanism where we can implement those common async base traits on top of Futures - ideally in a zero cost fashion. Since it seems like this isn't yet possible I'm totally ok with having some intermediate solutions in between. But I wouldn't like the whole ecosystem to commit already on those. Or even to build a completely separate and incompatible world on top of those.

@seanmonstar
Copy link
Contributor

@Matthias247 The "Ext" traits allow you to easily treat read and write as Futures. Changing the core traits to remove their poll_* methods and only return a Future has two issues that I can see:

  1. It's pretty common to make boxed trait objects of IO types, like Box<AsyncWrite>. With those associated types, this becomes a whole lot harder. Essentially, you end up needing to require boxing the future as well, to get some sort of Box<AsyncWrite<WriteFut = Box<Future<..>>>.

  2. As I mentioned before, async fns end up causing a borrow on &mut self. This prevents one from being able to wait on both a read and write future at the same time. It's possible to get that back by putting the IO object inside an Arc with some AsyncMutex, but that is no longer a "zero cost fashion".

Given that the style of writing code that you want want is already possible thanks to the "Ext" traits, it seems a shame to remove the ability to have zero cost async read + write.

@Matthias247
Copy link
Contributor Author

@seanmonstar

Given that the style of writing code that you want want is already possible thanks to the "Ext" traits, it seems a shame to remove the ability to have zero cost async read + write.

The Ext traits allow to use those other traits as Futures. But they don't allow those types to be implemented on top of Futures, since there is literally no place for the state that is kept between polls that is normally kept inside the Future.

Writing code in the style I shared is not possible using the current state of the traits.

@najamelan
Copy link
Contributor

I feel like we could prepare a more modern api already. Async trait methods can be done by returning Pin<Box< dyn Future...>> It can be feature gated to indicate that this is an unstable api that will change, but people who want to experiment with it already can.

When GAT's or async trait methods come around, we can change the method signature, remove the feature gate and deprecate the poll_* methods. In the mean time if people need high performance, they can continue using the current api.

@cramertj
Copy link
Member

@najamelan It's critically important to continue allowing these traits to be object-safe and non-allocating.

@najamelan
Copy link
Contributor

@cramertj As far as I can tell, returning Pin box dyn future from a trait method is object safe.

For the allocating of a boxed future, well that will be a temporary problem until async trait methods become possible. Do you feel that having such an api behind a feature gate whilst keeping the current api for people who need the performance is unacceptable? I mean having an unstable api behind a feature gate that does allocation temporarily is unacceptable?

The problem I'm running into at least, but maybe there's a better solution, is that implementing start_send on Sink is pretty hard if you have to call async code for the sending internally. Since start_send is synchronous, you have to spawn, but if the future references self, you won't be able to because self would escape the borrowed context.

Im not sure either that it's more performant to spawn a new task than to box a future...

@cramertj
Copy link
Member

It's not a temporary problem-- there's no solution that has been proposed so far which would allow non-allocating object-safe async methods.

@Matthias247
Copy link
Contributor Author

I think the object-safety is a good argument. The allocations are not necessarily. We can start with defining things like

pub trait Stream {
    type Output;
    type Next: Future<Output = Self::Output>;

    fn next(&self) -> Self::Next;
}

pub trait Sink {
    type Input;
    type Error;
    type Next: Future<Output = Result<(), Self::Error>>;

    fn send(&self, value: Self::Input) -> Self::Next;
}

That is zero-allocation for all implementations where the source/sink internally uses an Arc (since the returned future without a lifetime can simply capture that Arc too). Later on that could be expanded to a second trait which uses GATs in order to support Futures which capture their source via a lifetime.

Object safety is obviously a harder discussion, and also a bit of a question of prioritization. Is it more important for users that those types are object safe than being able to compose them from Futures?
Even if that means even in our shiny new async/await world people need to continue hand-writing state-machines for Streams/Sinks/etc? If there is a proposal somewhere that defines how we can get Futures and Streams compatible again I would love to see it. But I'm really a bit concerned about them being incompatible, and then having a world of 3 different function types (normal ones, async fns and stream functions).

Also since the current Futures ecosystem isn't really that dynamic-dispatch friendly (due to enormous generated types), it's not obvious to me why we prioritize it here over the ability to write easy-to-understand async code.

@Matthias247
Copy link
Contributor Author

Regarding object-safety we might think whether that could be regained for the most common use-cases by some custom allocator or boxing that type-erases the returned Future if it would be up to a certain size. I think @matthieu-m proposed something like that.

Then we would could have an additional

pub trait DynamicDispatchableStream {
    type Output;

    fn next(&self) ->FixedSizeFuture<Output = Self::Output>;
}

@cramertj
Copy link
Member

I don't believe these are changes we're going to make for reasons I've explained above (the structure of the state machines, object safety). Thanks for the discussion, though, and I'm sure there'll be more to discuss here in the future.

For those interested in more, http://smallcultfollowing.com/babysteps/blog/2019/10/26/async-fn-in-traits-are-hard/ has a great explanation of the complications here

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants