diff --git a/crescent/src/main.rs b/crescent/src/main.rs index 2337553..59fa0cf 100644 --- a/crescent/src/main.rs +++ b/crescent/src/main.rs @@ -1,4 +1,4 @@ -#![feature(type_alias_impl_trait)] +#![feature(impl_trait_in_assoc_type)] use std::{ cell::RefCell, os::{fd::OwnedFd, unix::net::UnixStream}, @@ -270,12 +270,13 @@ impl CrescentClient { impl Client for CrescentClient { type Connection = Connection; - type DispatchFut<'a, R> = runa_core::client::Dispatch<'a, Self, R> where R: runa_io::traits::buf::AsyncBufReadWithFd + 'a; type EventDispatcher = EventDispatcher; type Object = AnyObject; type ObjectStore = Store; type ServerContext = Crescent; + type DispatchFut<'a, R> = impl std::future::Future + 'a where R: runa_io::traits::buf::AsyncBufReadWithFd + 'a; + fn dispatch<'a, R>(&'a mut self, reader: Pin<&'a mut R>) -> Self::DispatchFut<'a, R> where R: runa_io::traits::buf::AsyncBufReadWithFd, diff --git a/runa-core/src/client/event_dispatcher.rs b/runa-core/src/client/event_dispatcher.rs index 9ab54cf..de73fa4 100644 --- a/runa-core/src/client/event_dispatcher.rs +++ b/runa-core/src/client/event_dispatcher.rs @@ -18,39 +18,59 @@ use futures_util::{ use super::traits; use crate::utils::one_shot_signal; -type PairedEventHandlerFut<'a, H, Ctx> -where - H: traits::EventHandler, - Ctx: traits::Client, -= impl Future> + 'a; - -/// A helper function for storing the event handler's future. +/// Helper trait for implementing `paired_event_handler_driver`. /// -/// Event handler's generate a future that references the event handler itself, -/// if we store that directly in [`PairedEventHandler`] we would have a -/// self-referential struct, so we use this async fn to get around that. -/// -/// The stop signal is needed so when the future can to be stopped early, for -/// example when a [`PendingEventFut`] is dropped. -fn paired_event_handler_driver<'ctx, H, Ctx: traits::Client>( - mut handler: H, - mut stop_signal: one_shot_signal::Receiver, - mut message: H::Message, - objects: &'ctx mut Ctx::ObjectStore, - connection: &'ctx mut Ctx::Connection, - server_context: &'ctx Ctx::ServerContext, -) -> PairedEventHandlerFut<'ctx, H, Ctx> +/// We need to name the type returned by `paired_event_handler_driver` so we can +/// store it. However the new `impl_trait_in_assoc_type` restricted TAIT to +/// associated types of traits only, so we have to use this helper trait to get +/// around that. +trait EventHandlerExt: traits::EventHandler + Sized { + type PairedEventHandlerFut<'a>: Future> + + 'a; + fn paired_event_handler_driver<'ctx>( + self, + stop_signal: one_shot_signal::Receiver, + message: Self::Message, + objects: &'ctx mut Ctx::ObjectStore, + connection: &'ctx mut Ctx::Connection, + server_context: &'ctx Ctx::ServerContext, + ) -> Self::PairedEventHandlerFut<'ctx>; +} + +impl EventHandlerExt for T where - H: traits::EventHandler, + Ctx: traits::Client, + T: traits::EventHandler, { - async move { - use futures_util::{select, FutureExt}; - select! { - () = stop_signal => { - Err((message, handler)) - } - ret = handler.handle_event(objects, connection, server_context, &mut message).fuse() => { - Ok((ret, handler)) + type PairedEventHandlerFut<'a> = + impl Future> + 'a; + + /// A helper function for storing the event handler's future. + /// + /// Event handler's generate a future that references the event handler + /// itself, if we store that directly in [`PairedEventHandler`] we would + /// have a self-referential struct, so we use this async fn to get + /// around that. + /// + /// The stop signal is needed so when the future can to be stopped early, + /// for example when a [`PendingEventFut`] is dropped. + fn paired_event_handler_driver<'ctx>( + mut self, + mut stop_signal: one_shot_signal::Receiver, + mut message: Self::Message, + objects: &'ctx mut Ctx::ObjectStore, + connection: &'ctx mut Ctx::Connection, + server_context: &'ctx Ctx::ServerContext, + ) -> Self::PairedEventHandlerFut<'ctx> { + async move { + use futures_util::{select, FutureExt}; + select! { + () = stop_signal => { + Err((message, self)) + } + ret = self.handle_event(objects, connection, server_context, &mut message).fuse() => { + Ok((ret, self)) + } } } } @@ -66,7 +86,7 @@ struct PairedEventHandler<'fut, Ctx: traits::Client, ES: Stream, H: traits::Even handler: Option, message: Option, #[pin] - fut: Option>, + fut: Option<>::PairedEventHandlerFut<'fut>>, stop_signal: Option, _ctx: PhantomData, } @@ -172,8 +192,7 @@ where let (tx, stop_signal) = one_shot_signal::new_pair(); *this.stop_signal = Some(tx); - let new_fut = paired_event_handler_driver( - handler, + let new_fut = handler.paired_event_handler_driver( stop_signal, message, objects, diff --git a/runa-core/src/client/mod.rs b/runa-core/src/client/mod.rs index d4904d0..ff05235 100644 --- a/runa-core/src/client/mod.rs +++ b/runa-core/src/client/mod.rs @@ -25,21 +25,13 @@ pub mod event_dispatcher; pub mod store; pub mod traits; -use std::{future::Future, os::fd::RawFd, pin::Pin}; +use std::{os::fd::RawFd, pin::Pin}; #[doc(inline)] pub use event_dispatcher::EventDispatcher; #[doc(inline)] pub use store::Store; -/// Type of future returned by [`dispatch_to`]. -pub type Dispatch<'a, Ctx, R> -where - Ctx: traits::Client + 'a, - Ctx::Object: for<'b> crate::objects::Object = (&'b [u8], &'b [RawFd])>, - R: runa_io::traits::buf::AsyncBufReadWithFd + 'a, -= impl Future + 'a; - /// Reference implementation of the [`traits::Client::dispatch`] method. /// /// This function reads a message from `reader`, looks up the corresponding @@ -57,75 +49,70 @@ where /// not, you are supposed to deserialize a wayland message from a /// `(&[u8], &[RawFd])` tuple yourself and then dispatch the deserialized /// message properly. -pub fn dispatch_to<'a, Ctx, R>(ctx: &'a mut Ctx, mut reader: Pin<&'a mut R>) -> Dispatch<'a, Ctx, R> +pub async fn dispatch_to<'a, Ctx, R>(ctx: &'a mut Ctx, mut reader: Pin<&'a mut R>) -> bool where R: runa_io::traits::buf::AsyncBufReadWithFd, Ctx: traits::Client, Ctx::Object: for<'b> crate::objects::Object = (&'b [u8], &'b [RawFd])>, { - async move { - use runa_io::traits::{buf::Message, WriteMessage}; - use runa_wayland_protocols::wayland::wl_display::v1 as wl_display; - use runa_wayland_types as types; - use traits::Store; - - use crate::{error::ProtocolError, objects::AnyObject}; - let Message { - object_id, - len, - data, - fds, - } = match R::next_message(reader.as_mut()).await { - Ok(v) => v, - // I/O error, no point sending the error to the client - Err(_) => return true, - }; - let (ret, bytes_read, fds_read) = - <::Object as crate::objects::Object>::dispatch( - ctx, - object_id, - (data, fds), - ) - .await; - let (mut fatal, error) = match ret { - Ok(_) => (false, None), - Err(e) => ( - e.fatal(), - e.wayland_error() - .map(|(object_id, error_code)| (object_id, error_code, e.to_string())), - ), - }; - if let Some((object_id, error_code, msg)) = error { - // We are going to disconnect the client so we don't care about the - // error. - fatal |= ctx - .connection_mut() - .send(crate::objects::DISPLAY_ID, wl_display::events::Error { - object_id: types::Object(object_id), - code: error_code, - message: types::Str(msg.as_bytes()), - }) - .await - .is_err(); - } - if !fatal { - if bytes_read != len { - let len_opcode = u32::from_ne_bytes(data[0..4].try_into().unwrap()); - let opcode = len_opcode & 0xffff; - tracing::error!( - "unparsed bytes in buffer, read ({bytes_read}) != received ({len}). \ - object_id: {}@{object_id}, opcode: {opcode}", - ctx.objects() - .get::(object_id) - .map(|o| o.interface()) - .unwrap_or("unknown") - ); - fatal = true; - } - reader.consume(bytes_read, fds_read); + use runa_io::traits::{buf::Message, WriteMessage}; + use runa_wayland_protocols::wayland::wl_display::v1 as wl_display; + use runa_wayland_types as types; + use traits::Store; + + use crate::{error::ProtocolError, objects::AnyObject}; + let Message { + object_id, + len, + data, + fds, + } = match R::next_message(reader.as_mut()).await { + Ok(v) => v, + // I/O error, no point sending the error to the client + Err(_) => return true, + }; + let (ret, bytes_read, fds_read) = <::Object as crate::objects::Object< + Ctx, + >>::dispatch(ctx, object_id, (data, fds)) + .await; + let (mut fatal, error) = match ret { + Ok(_) => (false, None), + Err(e) => ( + e.fatal(), + e.wayland_error() + .map(|(object_id, error_code)| (object_id, error_code, e.to_string())), + ), + }; + if let Some((object_id, error_code, msg)) = error { + // We are going to disconnect the client so we don't care about the + // error. + fatal |= ctx + .connection_mut() + .send(crate::objects::DISPLAY_ID, wl_display::events::Error { + object_id: types::Object(object_id), + code: error_code, + message: types::Str(msg.as_bytes()), + }) + .await + .is_err(); + } + if !fatal { + if bytes_read != len { + let len_opcode = u32::from_ne_bytes(data[0..4].try_into().unwrap()); + let opcode = len_opcode & 0xffff; + tracing::error!( + "unparsed bytes in buffer, read ({bytes_read}) != received ({len}). object_id: \ + {}@{object_id}, opcode: {opcode}", + ctx.objects() + .get::(object_id) + .map(|o| o.interface()) + .unwrap_or("unknown") + ); + fatal = true; } - fatal + reader.consume(bytes_read, fds_read); } + fatal } #[doc(hidden)] // not ready for use yet diff --git a/runa-core/src/events/broadcast.rs b/runa-core/src/events/broadcast.rs index 2190b19..f102f80 100644 --- a/runa-core/src/events/broadcast.rs +++ b/runa-core/src/events/broadcast.rs @@ -69,9 +69,6 @@ impl Ring { } } -/// Type of future returned by [`Broadcast::broadcast_owned`]. -pub type BroadcastOwned = impl Future + 'static; - impl Broadcast { /// Send a new event to all receivers pub fn broadcast(&self, msg: E) -> impl Future + '_ { @@ -87,7 +84,7 @@ impl Broadcast { /// Like [`Self::broadcast`], but the returned future doesn't borrow from /// `self`. - pub fn broadcast_owned(&self, msg: E) -> BroadcastOwned + pub fn broadcast_owned(&self, msg: E) -> impl Future where E: 'static, { diff --git a/runa-core/src/lib.rs b/runa-core/src/lib.rs index f007f5f..7a95b0b 100644 --- a/runa-core/src/lib.rs +++ b/runa-core/src/lib.rs @@ -37,10 +37,10 @@ //! creating a new client context each time a new client connects. It also //! provides access to the globals. -// We can drop trait_upcasting if necessary. `type_alias_impl_trait` is +// We can drop trait_upcasting if necessary. `impl_trait_in_assoc_type` is // the indispensable feature here. #![allow(incomplete_features)] -#![feature(type_alias_impl_trait, trait_upcasting)] +#![feature(impl_trait_in_assoc_type, trait_upcasting)] #![warn( missing_debug_implementations, missing_copy_implementations, @@ -266,7 +266,8 @@ impl std::fmt::Debug for IdAlloc { impl Serial for IdAlloc { type Data = D; - type Iter<'a> = <&'a Self as IntoIterator>::IntoIter where Self: 'a; + + type Iter<'a> = impl Iterator where Self: 'a; fn next_serial(&mut self, data: Self::Data) -> u32 { loop { @@ -299,17 +300,12 @@ impl Serial for IdAlloc { } } -#[doc(hidden)] -pub type IdIter<'a, D> -where - D: 'a, -= impl Iterator + 'a; - impl<'a, D: 'a> IntoIterator for &'a IdAlloc { - type IntoIter = IdIter<'a, D> where Self: 'a; type Item = (u32, &'a D); - fn into_iter(self) -> IdIter<'a, D> { + type IntoIter = impl Iterator; + + fn into_iter(self) -> Self::IntoIter { self.data.iter().map(|(k, v)| (*k, v)) } } diff --git a/runa-core/src/server/mod.rs b/runa-core/src/server/mod.rs index cf5679d..8651dde 100644 --- a/runa-core/src/server/mod.rs +++ b/runa-core/src/server/mod.rs @@ -46,9 +46,8 @@ impl EventSource> for GlobalStore { /// GlobalStore will notify listeners when globals are added or removed. The /// notification will be sent to the slot registered as "wl_registry". impl traits::GlobalStore for GlobalStore { - type Iter<'a> = <&'a Self as IntoIterator>::IntoIter where Self: 'a, G: 'a; - type InsertFut<'a, I> = impl Future + 'a where Self: 'a, I: 'a + Into; + type Iter<'a> = impl Iterator)> where Self: 'a, G: 'a; type RemoveFut<'a> = impl Future + 'a where Self: 'a; fn insert<'a, I: Into + 'a>(&'a mut self, global: I) -> Self::InsertFut<'_, I> { @@ -82,10 +81,14 @@ impl traits::GlobalStore for GlobalStore { self.into_iter() } } -impl<'a, G> IntoIterator for &'a GlobalStore { - type IntoIter = crate::IdIter<'a, Rc>; +impl<'a, G> IntoIterator for &'a GlobalStore +where + G: 'a, +{ type Item = (u32, &'a Rc); + type IntoIter = impl Iterator)> + 'a; + fn into_iter(self) -> Self::IntoIter { self.globals.iter() } diff --git a/runa-io-traits/src/lib.rs b/runa-io-traits/src/lib.rs index 4ff1714..4731812 100644 --- a/runa-io-traits/src/lib.rs +++ b/runa-io-traits/src/lib.rs @@ -1,4 +1,4 @@ -#![feature(type_alias_impl_trait)] +#![feature(impl_trait_in_assoc_type)] use std::{ future::Future, io::Result, @@ -382,35 +382,10 @@ pub mod buf { })) } - fn next_message<'a>(self: Pin<&'a mut Self>) -> NextMessageFut<'a, Self> + fn next_message<'a>(self: Pin<&'a mut Self>) -> NextMessage<'a, Self> where Self: Sized, { - pub struct NextMessage<'a, R>(Option>); - impl<'a, R> Future for NextMessage<'a, R> - where - R: AsyncBufReadWithFd, - { - type Output = Result>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.get_mut(); - let mut reader = this.0.take().expect("NextMessage polled after completion"); - match reader.as_mut().poll_next_message(cx) { - Poll::Pending => { - this.0 = Some(reader); - Poll::Pending - }, - Poll::Ready(Ok(_)) => match reader.poll_next_message(cx) { - Poll::Pending => { - panic!("poll_next_message returned Ready, but then Pending again") - }, - ready => ready, - }, - Poll::Ready(Err(e)) => Poll::Ready(Err(e)), - } - } - } NextMessage(Some(self)) } } @@ -434,6 +409,29 @@ pub mod buf { } } - pub type NextMessageFut<'a, T: AsyncBufReadWithFd + 'a> = - impl Future>> + 'a; + pub struct NextMessage<'a, R>(Option>); + impl<'a, R> Future for NextMessage<'a, R> + where + R: AsyncBufReadWithFd, + { + type Output = Result>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.get_mut(); + let mut reader = this.0.take().expect("NextMessage polled after completion"); + match reader.as_mut().poll_next_message(cx) { + Poll::Pending => { + this.0 = Some(reader); + Poll::Pending + }, + Poll::Ready(Ok(_)) => match reader.poll_next_message(cx) { + Poll::Pending => { + panic!("poll_next_message returned Ready, but then Pending again") + }, + ready => ready, + }, + Poll::Ready(Err(e)) => Poll::Ready(Err(e)), + } + } + } } diff --git a/runa-io/src/lib.rs b/runa-io/src/lib.rs index fcb3315..38dbb1a 100644 --- a/runa-io/src/lib.rs +++ b/runa-io/src/lib.rs @@ -1,4 +1,4 @@ -#![feature(type_alias_impl_trait)] +#![feature(impl_trait_in_assoc_type)] use std::{ io::Result, mem::MaybeUninit, diff --git a/runa-orbiter/src/lib.rs b/runa-orbiter/src/lib.rs index 6e37fa2..021484c 100644 --- a/runa-orbiter/src/lib.rs +++ b/runa-orbiter/src/lib.rs @@ -20,7 +20,7 @@ //! globals you choose to use. #![allow(incomplete_features)] -#![feature(type_alias_impl_trait, trait_upcasting)] +#![feature(impl_trait_in_assoc_type, trait_upcasting)] #![warn( missing_debug_implementations, missing_copy_implementations,