From 77f488c1067451221b3d0f1308030454e543e597 Mon Sep 17 00:00:00 2001 From: Michael Ilyin Date: Sat, 12 Oct 2024 21:28:11 +0200 Subject: [PATCH 01/15] queryable builder moved to builders --- .../src/api/{builders.rs => builders/mod.rs} | 1 + zenoh/src/api/builders/queryable.rs | 271 ++++++++++++++++++ zenoh/src/api/queryable.rs | 239 --------------- zenoh/src/api/session.rs | 11 +- zenoh/src/lib.rs | 9 +- 5 files changed, 284 insertions(+), 247 deletions(-) rename zenoh/src/api/{builders.rs => builders/mod.rs} (95%) create mode 100644 zenoh/src/api/builders/queryable.rs diff --git a/zenoh/src/api/builders.rs b/zenoh/src/api/builders/mod.rs similarity index 95% rename from zenoh/src/api/builders.rs rename to zenoh/src/api/builders/mod.rs index 5327dabe90..04fc7f1a57 100644 --- a/zenoh/src/api/builders.rs +++ b/zenoh/src/api/builders/mod.rs @@ -13,4 +13,5 @@ // pub(crate) mod publisher; +pub(crate) mod queryable; pub(crate) mod sample; diff --git a/zenoh/src/api/builders/queryable.rs b/zenoh/src/api/builders/queryable.rs new file mode 100644 index 0000000000..24da21f025 --- /dev/null +++ b/zenoh/src/api/builders/queryable.rs @@ -0,0 +1,271 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +use std::{ + future::{IntoFuture, Ready}, + sync::Arc, +}; + +use zenoh_core::{Resolvable, Wait}; +use zenoh_result::ZResult; +#[zenoh_macros::unstable] +use { + crate::api::queryable::Query, crate::api::queryable::Queryable, + crate::api::queryable::QueryableInner, +}; + +use crate::{ + api::{ + handlers::{locked, DefaultHandler, IntoHandler}, + key_expr::KeyExpr, + sample::Locality, + }, + handlers::Callback, + Session, +}; + +/// A builder for initializing a [`Queryable`]. +/// +/// # Examples +/// ``` +/// # #[tokio::main] +/// # async fn main() { +/// +/// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); +/// let queryable = session.declare_queryable("key/expression").await.unwrap(); +/// # } +/// ``` +#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"] +#[derive(Debug)] +pub struct QueryableBuilder<'a, 'b, Handler, const BACKGROUND: bool = false> { + pub(crate) session: &'a Session, + pub(crate) key_expr: ZResult>, + pub(crate) complete: bool, + pub(crate) origin: Locality, + pub(crate) handler: Handler, +} + +impl<'a, 'b> QueryableBuilder<'a, 'b, DefaultHandler> { + /// Receive the queries for this queryable with a callback. + /// + /// # Examples + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); + /// let queryable = session + /// .declare_queryable("key/expression") + /// .callback(|query| {println!(">> Handling query '{}'", query.selector());}) + /// .await + /// .unwrap(); + /// # } + /// ``` + #[inline] + pub fn callback(self, callback: F) -> QueryableBuilder<'a, 'b, Callback> + where + F: Fn(Query) + Send + Sync + 'static, + { + self.with(Callback::new(Arc::new(callback))) + } + + /// Receive the queries for this Queryable with a mutable callback. + /// + /// Using this guarantees that your callback will never be called concurrently. + /// If your callback is also accepted by the [`callback`](QueryableBuilder::callback) method, we suggest you use it instead of `callback_mut`. + /// + /// # Examples + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); + /// let mut n = 0; + /// let queryable = session + /// .declare_queryable("key/expression") + /// .callback_mut(move |query| {n += 1;}) + /// .await + /// .unwrap(); + /// # } + /// ``` + #[inline] + pub fn callback_mut(self, callback: F) -> QueryableBuilder<'a, 'b, Callback> + where + F: FnMut(Query) + Send + Sync + 'static, + { + self.callback(locked(callback)) + } + + /// Receive the queries for this Queryable with a [`Handler`](crate::handlers::IntoHandler). + /// + /// # Examples + /// ```no_run + /// # #[tokio::main] + /// # async fn main() { + /// + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); + /// let queryable = session + /// .declare_queryable("key/expression") + /// .with(flume::bounded(32)) + /// .await + /// .unwrap(); + /// while let Ok(query) = queryable.recv_async().await { + /// println!(">> Handling query '{}'", query.selector()); + /// } + /// # } + /// ``` + #[inline] + pub fn with(self, handler: Handler) -> QueryableBuilder<'a, 'b, Handler> + where + Handler: IntoHandler, + { + let QueryableBuilder { + session, + key_expr, + complete, + origin, + handler: _, + } = self; + QueryableBuilder { + session, + key_expr, + complete, + origin, + handler, + } + } +} + +impl<'a, 'b> QueryableBuilder<'a, 'b, Callback> { + /// Register the queryable callback to be run in background until the session is closed. + /// + /// Background builder doesn't return a `Queryable` object anymore. + /// + /// # Examples + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); + /// // no need to assign and keep a variable with a background queryable + /// session + /// .declare_queryable("key/expression") + /// .callback(|query| {println!(">> Handling query '{}'", query.selector());}) + /// .background() + /// .await + /// .unwrap(); + /// # } + /// ``` + pub fn background(self) -> QueryableBuilder<'a, 'b, Callback, true> { + QueryableBuilder { + session: self.session, + key_expr: self.key_expr, + complete: self.complete, + origin: self.origin, + handler: self.handler, + } + } +} + +impl QueryableBuilder<'_, '_, Handler, BACKGROUND> { + /// Change queryable completeness. + #[inline] + pub fn complete(mut self, complete: bool) -> Self { + self.complete = complete; + self + } + + /// + /// + /// Restrict the matching queries that will be receive by this [`Queryable`] + /// to the ones that have the given [`Locality`](Locality). + #[inline] + #[zenoh_macros::unstable] + pub fn allowed_origin(mut self, origin: Locality) -> Self { + self.origin = origin; + self + } +} + +impl Resolvable for QueryableBuilder<'_, '_, Handler> +where + Handler: IntoHandler + Send, + Handler::Handler: Send, +{ + type To = ZResult>; +} + +impl Wait for QueryableBuilder<'_, '_, Handler> +where + Handler: IntoHandler + Send, + Handler::Handler: Send, +{ + fn wait(self) -> ::To { + let session = self.session; + let (callback, receiver) = self.handler.into_handler(); + session + .0 + .declare_queryable_inner( + &self.key_expr?.to_wire(&session.0), + self.complete, + self.origin, + callback, + ) + .map(|qable_state| Queryable { + inner: QueryableInner { + session: self.session.downgrade(), + id: qable_state.id, + undeclare_on_drop: true, + }, + handler: receiver, + }) + } +} + +impl IntoFuture for QueryableBuilder<'_, '_, Handler> +where + Handler: IntoHandler + Send, + Handler::Handler: Send, +{ + type Output = ::To; + type IntoFuture = Ready<::To>; + + fn into_future(self) -> Self::IntoFuture { + std::future::ready(self.wait()) + } +} + +impl Resolvable for QueryableBuilder<'_, '_, Callback, true> { + type To = ZResult<()>; +} + +impl Wait for QueryableBuilder<'_, '_, Callback, true> { + fn wait(self) -> ::To { + self.session.0.declare_queryable_inner( + &self.key_expr?.to_wire(&self.session.0), + self.complete, + self.origin, + self.handler, + )?; + Ok(()) + } +} + +impl IntoFuture for QueryableBuilder<'_, '_, Callback, true> { + type Output = ::To; + type IntoFuture = Ready<::To>; + + fn into_future(self) -> Self::IntoFuture { + std::future::ready(self.wait()) + } +} diff --git a/zenoh/src/api/queryable.rs b/zenoh/src/api/queryable.rs index 3aafb067f2..de7c0dc6c3 100644 --- a/zenoh/src/api/queryable.rs +++ b/zenoh/src/api/queryable.rs @@ -44,7 +44,6 @@ use crate::{ }, bytes::{OptionZBytes, ZBytes}, encoding::Encoding, - handlers::{locked, DefaultHandler, IntoHandler}, key_expr::KeyExpr, publisher::Priority, sample::{Locality, QoSBuilder, Sample, SampleKind}, @@ -55,7 +54,6 @@ use crate::{ }, handlers::Callback, net::primitives::Primitives, - Session, }; pub(crate) struct QueryInner { @@ -583,170 +581,6 @@ impl IntoFuture for QueryableUndeclaration { std::future::ready(self.wait()) } } - -/// A builder for initializing a [`Queryable`]. -/// -/// # Examples -/// ``` -/// # #[tokio::main] -/// # async fn main() { -/// -/// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); -/// let queryable = session.declare_queryable("key/expression").await.unwrap(); -/// # } -/// ``` -#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"] -#[derive(Debug)] -pub struct QueryableBuilder<'a, 'b, Handler, const BACKGROUND: bool = false> { - pub(crate) session: &'a Session, - pub(crate) key_expr: ZResult>, - pub(crate) complete: bool, - pub(crate) origin: Locality, - pub(crate) handler: Handler, -} - -impl<'a, 'b> QueryableBuilder<'a, 'b, DefaultHandler> { - /// Receive the queries for this queryable with a callback. - /// - /// # Examples - /// ``` - /// # #[tokio::main] - /// # async fn main() { - /// - /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); - /// let queryable = session - /// .declare_queryable("key/expression") - /// .callback(|query| {println!(">> Handling query '{}'", query.selector());}) - /// .await - /// .unwrap(); - /// # } - /// ``` - #[inline] - pub fn callback(self, callback: F) -> QueryableBuilder<'a, 'b, Callback> - where - F: Fn(Query) + Send + Sync + 'static, - { - self.with(Callback::new(Arc::new(callback))) - } - - /// Receive the queries for this Queryable with a mutable callback. - /// - /// Using this guarantees that your callback will never be called concurrently. - /// If your callback is also accepted by the [`callback`](QueryableBuilder::callback) method, we suggest you use it instead of `callback_mut`. - /// - /// # Examples - /// ``` - /// # #[tokio::main] - /// # async fn main() { - /// - /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); - /// let mut n = 0; - /// let queryable = session - /// .declare_queryable("key/expression") - /// .callback_mut(move |query| {n += 1;}) - /// .await - /// .unwrap(); - /// # } - /// ``` - #[inline] - pub fn callback_mut(self, callback: F) -> QueryableBuilder<'a, 'b, Callback> - where - F: FnMut(Query) + Send + Sync + 'static, - { - self.callback(locked(callback)) - } - - /// Receive the queries for this Queryable with a [`Handler`](crate::handlers::IntoHandler). - /// - /// # Examples - /// ```no_run - /// # #[tokio::main] - /// # async fn main() { - /// - /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); - /// let queryable = session - /// .declare_queryable("key/expression") - /// .with(flume::bounded(32)) - /// .await - /// .unwrap(); - /// while let Ok(query) = queryable.recv_async().await { - /// println!(">> Handling query '{}'", query.selector()); - /// } - /// # } - /// ``` - #[inline] - pub fn with(self, handler: Handler) -> QueryableBuilder<'a, 'b, Handler> - where - Handler: IntoHandler, - { - let QueryableBuilder { - session, - key_expr, - complete, - origin, - handler: _, - } = self; - QueryableBuilder { - session, - key_expr, - complete, - origin, - handler, - } - } -} - -impl<'a, 'b> QueryableBuilder<'a, 'b, Callback> { - /// Register the queryable callback to be run in background until the session is closed. - /// - /// Background builder doesn't return a `Queryable` object anymore. - /// - /// # Examples - /// ``` - /// # #[tokio::main] - /// # async fn main() { - /// - /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); - /// // no need to assign and keep a variable with a background queryable - /// session - /// .declare_queryable("key/expression") - /// .callback(|query| {println!(">> Handling query '{}'", query.selector());}) - /// .background() - /// .await - /// .unwrap(); - /// # } - /// ``` - pub fn background(self) -> QueryableBuilder<'a, 'b, Callback, true> { - QueryableBuilder { - session: self.session, - key_expr: self.key_expr, - complete: self.complete, - origin: self.origin, - handler: self.handler, - } - } -} - -impl QueryableBuilder<'_, '_, Handler, BACKGROUND> { - /// Change queryable completeness. - #[inline] - pub fn complete(mut self, complete: bool) -> Self { - self.complete = complete; - self - } - - /// - /// - /// Restrict the matching queries that will be receive by this [`Queryable`] - /// to the ones that have the given [`Locality`](Locality). - #[inline] - #[zenoh_macros::unstable] - pub fn allowed_origin(mut self, origin: Locality) -> Self { - self.origin = origin; - self - } -} - /// A queryable that provides data through a [`Handler`](crate::handlers::IntoHandler). /// /// Queryables can be created from a zenoh [`Session`](crate::Session) @@ -911,76 +745,3 @@ impl DerefMut for Queryable { self.handler_mut() } } - -impl Resolvable for QueryableBuilder<'_, '_, Handler> -where - Handler: IntoHandler + Send, - Handler::Handler: Send, -{ - type To = ZResult>; -} - -impl Wait for QueryableBuilder<'_, '_, Handler> -where - Handler: IntoHandler + Send, - Handler::Handler: Send, -{ - fn wait(self) -> ::To { - let session = self.session; - let (callback, receiver) = self.handler.into_handler(); - session - .0 - .declare_queryable_inner( - &self.key_expr?.to_wire(&session.0), - self.complete, - self.origin, - callback, - ) - .map(|qable_state| Queryable { - inner: QueryableInner { - session: self.session.downgrade(), - id: qable_state.id, - undeclare_on_drop: true, - }, - handler: receiver, - }) - } -} - -impl IntoFuture for QueryableBuilder<'_, '_, Handler> -where - Handler: IntoHandler + Send, - Handler::Handler: Send, -{ - type Output = ::To; - type IntoFuture = Ready<::To>; - - fn into_future(self) -> Self::IntoFuture { - std::future::ready(self.wait()) - } -} - -impl Resolvable for QueryableBuilder<'_, '_, Callback, true> { - type To = ZResult<()>; -} - -impl Wait for QueryableBuilder<'_, '_, Callback, true> { - fn wait(self) -> ::To { - self.session.0.declare_queryable_inner( - &self.key_expr?.to_wire(&self.session.0), - self.complete, - self.origin, - self.handler, - )?; - Ok(()) - } -} - -impl IntoFuture for QueryableBuilder<'_, '_, Callback, true> { - type Output = ::To; - type IntoFuture = Ready<::To>; - - fn into_future(self) -> Self::IntoFuture { - std::future::ready(self.wait()) - } -} diff --git a/zenoh/src/api/session.rs b/zenoh/src/api/session.rs index 42a6fcf8b0..47102c3960 100644 --- a/zenoh/src/api/session.rs +++ b/zenoh/src/api/session.rs @@ -70,9 +70,12 @@ use zenoh_task::TaskController; use super::{ admin, - builders::publisher::{ - PublicationBuilderDelete, PublicationBuilderPut, PublisherBuilder, SessionDeleteBuilder, - SessionPutBuilder, + builders::{ + publisher::{ + PublicationBuilderDelete, PublicationBuilderPut, PublisherBuilder, + SessionDeleteBuilder, SessionPutBuilder, + }, + queryable::QueryableBuilder, }, bytes::ZBytes, encoding::Encoding, @@ -83,7 +86,7 @@ use super::{ query::{ ConsolidationMode, QueryConsolidation, QueryState, QueryTarget, Reply, SessionGetBuilder, }, - queryable::{Query, QueryInner, QueryableBuilder, QueryableState}, + queryable::{Query, QueryInner, QueryableState}, sample::{DataInfo, DataInfoIntoSample, Locality, QoS, Sample, SampleKind}, selector::Selector, subscriber::{SubscriberBuilder, SubscriberKind, SubscriberState}, diff --git a/zenoh/src/lib.rs b/zenoh/src/lib.rs index 737b5587b4..010575134e 100644 --- a/zenoh/src/lib.rs +++ b/zenoh/src/lib.rs @@ -243,16 +243,17 @@ pub mod query { #[zenoh_macros::internal] pub use crate::api::queryable::ReplySample; - #[zenoh_macros::unstable] - pub use crate::api::{query::ReplyKeyExpr, selector::ZenohParameters}; pub use crate::api::{ + builders::queryable::QueryableBuilder, query::{ConsolidationMode, QueryConsolidation, QueryTarget, Reply, ReplyError}, queryable::{ - Query, Queryable, QueryableBuilder, QueryableUndeclaration, ReplyBuilder, - ReplyBuilderDelete, ReplyBuilderPut, ReplyErrBuilder, + Query, Queryable, QueryableUndeclaration, ReplyBuilder, ReplyBuilderDelete, + ReplyBuilderPut, ReplyErrBuilder, }, selector::Selector, }; + #[zenoh_macros::unstable] + pub use crate::api::{query::ReplyKeyExpr, selector::ZenohParameters}; } /// Callback handler trait From 7ff8df09aa898180118cc757a2796bdbc4cd48ee Mon Sep 17 00:00:00 2001 From: Michael Ilyin Date: Sat, 12 Oct 2024 22:23:39 +0200 Subject: [PATCH 02/15] reply builders moved to builders --- zenoh/src/api/builders/mod.rs | 1 + zenoh/src/api/builders/reply.rs | 286 ++++++++++++++++++++++++++++++++ zenoh/src/api/queryable.rs | 238 +------------------------- zenoh/src/lib.rs | 6 +- 4 files changed, 298 insertions(+), 233 deletions(-) create mode 100644 zenoh/src/api/builders/reply.rs diff --git a/zenoh/src/api/builders/mod.rs b/zenoh/src/api/builders/mod.rs index 04fc7f1a57..4a9ae98f9d 100644 --- a/zenoh/src/api/builders/mod.rs +++ b/zenoh/src/api/builders/mod.rs @@ -14,4 +14,5 @@ pub(crate) mod publisher; pub(crate) mod queryable; +pub(crate) mod reply; pub(crate) mod sample; diff --git a/zenoh/src/api/builders/reply.rs b/zenoh/src/api/builders/reply.rs new file mode 100644 index 0000000000..d97119da99 --- /dev/null +++ b/zenoh/src/api/builders/reply.rs @@ -0,0 +1,286 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +use std::future::{IntoFuture, Ready}; + +use uhlc::Timestamp; +use zenoh_core::{Resolvable, Wait}; +use zenoh_protocol::{ + core::{CongestionControl, WireExpr}, + network::{response, Mapping, Response}, + zenoh::{self, ResponseBody}, +}; +use zenoh_result::ZResult; + +#[zenoh_macros::unstable] +use crate::api::sample::SourceInfo; +use crate::api::{ + builders::sample::{ + EncodingBuilderTrait, QoSBuilderTrait, SampleBuilder, SampleBuilderTrait, + TimestampBuilderTrait, + }, + bytes::{OptionZBytes, ZBytes}, + encoding::Encoding, + key_expr::KeyExpr, + publisher::Priority, + queryable::Query, + sample::QoSBuilder, + value::Value, +}; + +#[derive(Debug)] +pub struct ReplyBuilderPut { + payload: ZBytes, + encoding: Encoding, +} +#[derive(Debug)] +pub struct ReplyBuilderDelete; + +/// A builder returned by [`Query::reply()`](Query::reply) and [`Query::reply_del()`](Query::reply_del) +#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"] +#[derive(Debug)] +pub struct ReplyBuilder<'a, 'b, T> { + query: &'a Query, + key_expr: ZResult>, + kind: T, + timestamp: Option, + qos: QoSBuilder, + #[cfg(feature = "unstable")] + source_info: SourceInfo, + attachment: Option, +} + +impl<'a, 'b> ReplyBuilder<'a, 'b, ReplyBuilderPut> { + pub(crate) fn new( + query: &'a Query, + key_expr: TryIntoKeyExpr, + payload: IntoZBytes, + ) -> Self + where + TryIntoKeyExpr: TryInto>, + >>::Error: Into, + IntoZBytes: Into, + { + Self { + query, + key_expr: key_expr.try_into().map_err(Into::into), + qos: response::ext::QoSType::RESPONSE.into(), + kind: ReplyBuilderPut { + payload: payload.into(), + encoding: Encoding::default(), + }, + timestamp: None, + #[cfg(feature = "unstable")] + source_info: SourceInfo::empty(), + attachment: None, + } + } +} + +impl<'a, 'b> ReplyBuilder<'a, 'b, ReplyBuilderDelete> { + pub(crate) fn new(query: &'a Query, key_expr: TryIntoKeyExpr) -> Self + where + TryIntoKeyExpr: TryInto>, + >>::Error: Into, + { + Self { + query, + key_expr: key_expr.try_into().map_err(Into::into), + qos: response::ext::QoSType::RESPONSE.into(), + kind: ReplyBuilderDelete, + timestamp: None, + #[cfg(feature = "unstable")] + source_info: SourceInfo::empty(), + attachment: None, + } + } +} + +#[zenoh_macros::internal_trait] +impl TimestampBuilderTrait for ReplyBuilder<'_, '_, T> { + fn timestamp>>(self, timestamp: U) -> Self { + Self { + timestamp: timestamp.into(), + ..self + } + } +} + +#[zenoh_macros::internal_trait] +impl SampleBuilderTrait for ReplyBuilder<'_, '_, T> { + fn attachment>(self, attachment: U) -> Self { + let attachment: OptionZBytes = attachment.into(); + Self { + attachment: attachment.into(), + ..self + } + } + + #[cfg(feature = "unstable")] + fn source_info(self, source_info: SourceInfo) -> Self { + Self { + source_info, + ..self + } + } +} + +#[zenoh_macros::internal_trait] +impl QoSBuilderTrait for ReplyBuilder<'_, '_, T> { + fn congestion_control(self, congestion_control: CongestionControl) -> Self { + let qos = self.qos.congestion_control(congestion_control); + Self { qos, ..self } + } + + fn priority(self, priority: Priority) -> Self { + let qos = self.qos.priority(priority); + Self { qos, ..self } + } + + fn express(self, is_express: bool) -> Self { + let qos = self.qos.express(is_express); + Self { qos, ..self } + } +} + +#[zenoh_macros::internal_trait] +impl EncodingBuilderTrait for ReplyBuilder<'_, '_, ReplyBuilderPut> { + fn encoding>(self, encoding: T) -> Self { + Self { + kind: ReplyBuilderPut { + encoding: encoding.into(), + ..self.kind + }, + ..self + } + } +} + +impl Resolvable for ReplyBuilder<'_, '_, T> { + type To = ZResult<()>; +} + +impl Wait for ReplyBuilder<'_, '_, ReplyBuilderPut> { + fn wait(self) -> ::To { + let key_expr = self.key_expr?.into_owned(); + let sample = SampleBuilder::put(key_expr, self.kind.payload) + .encoding(self.kind.encoding) + .timestamp(self.timestamp) + .qos(self.qos.into()); + #[cfg(feature = "unstable")] + let sample = sample.source_info(self.source_info); + let sample = sample.attachment(self.attachment); + self.query._reply_sample(sample.into()) + } +} + +impl Wait for ReplyBuilder<'_, '_, ReplyBuilderDelete> { + fn wait(self) -> ::To { + let key_expr = self.key_expr?.into_owned(); + let sample = SampleBuilder::delete(key_expr) + .timestamp(self.timestamp) + .qos(self.qos.into()); + #[cfg(feature = "unstable")] + let sample = sample.source_info(self.source_info); + let sample = sample.attachment(self.attachment); + self.query._reply_sample(sample.into()) + } +} + +impl IntoFuture for ReplyBuilder<'_, '_, ReplyBuilderPut> { + type Output = ::To; + type IntoFuture = Ready<::To>; + + fn into_future(self) -> Self::IntoFuture { + std::future::ready(self.wait()) + } +} + +impl IntoFuture for ReplyBuilder<'_, '_, ReplyBuilderDelete> { + type Output = ::To; + type IntoFuture = Ready<::To>; + + fn into_future(self) -> Self::IntoFuture { + std::future::ready(self.wait()) + } +} + +/// A builder returned by [`Query::reply_err()`](Query::reply_err). +#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"] +#[derive(Debug)] +pub struct ReplyErrBuilder<'a> { + query: &'a Query, + value: Value, +} + +impl<'a> ReplyErrBuilder<'a> { + pub(crate) fn new(query: &'a Query, payload: IntoZBytes) -> ReplyErrBuilder<'_> + where + IntoZBytes: Into, + { + Self { + query, + value: Value::new(payload, Encoding::default()), + } + } +} + +#[zenoh_macros::internal_trait] +impl EncodingBuilderTrait for ReplyErrBuilder<'_> { + fn encoding>(self, encoding: T) -> Self { + let mut value = self.value.clone(); + value.encoding = encoding.into(); + Self { value, ..self } + } +} + +impl<'a> Resolvable for ReplyErrBuilder<'a> { + type To = ZResult<()>; +} + +impl Wait for ReplyErrBuilder<'_> { + fn wait(self) -> ::To { + self.query.inner.primitives.send_response(Response { + rid: self.query.inner.qid, + wire_expr: WireExpr { + scope: 0, + suffix: std::borrow::Cow::Owned(self.query.key_expr().as_str().to_owned()), + mapping: Mapping::Sender, + }, + payload: ResponseBody::Err(zenoh::Err { + encoding: self.value.encoding.into(), + ext_sinfo: None, + #[cfg(feature = "shared-memory")] + ext_shm: None, + ext_unknown: vec![], + payload: self.value.payload.into(), + }), + ext_qos: response::ext::QoSType::RESPONSE, + ext_tstamp: None, + ext_respid: Some(response::ext::ResponderIdType { + zid: self.query.inner.zid, + eid: self.query.eid, + }), + }); + Ok(()) + } +} + +impl<'a> IntoFuture for ReplyErrBuilder<'a> { + type Output = ::To; + type IntoFuture = Ready<::To>; + + fn into_future(self) -> Self::IntoFuture { + std::future::ready(self.wait()) + } +} diff --git a/zenoh/src/api/queryable.rs b/zenoh/src/api/queryable.rs index de7c0dc6c3..613d1fa1d8 100644 --- a/zenoh/src/api/queryable.rs +++ b/zenoh/src/api/queryable.rs @@ -19,18 +19,16 @@ use std::{ }; use tracing::error; -use uhlc::Timestamp; use zenoh_core::{Resolvable, Resolve, Wait}; use zenoh_protocol::{ - core::{CongestionControl, EntityId, Parameters, WireExpr, ZenohIdProto}, + core::{EntityId, Parameters, WireExpr, ZenohIdProto}, network::{response, Mapping, RequestId, Response, ResponseFinal}, zenoh::{self, reply::ReplyBody, Del, Put, ResponseBody}, }; use zenoh_result::ZResult; #[zenoh_macros::unstable] use { - crate::api::{query::ReplyKeyExpr, sample::SourceInfo}, - zenoh_config::wrappers::EntityGlobalId, + crate::api::query::ReplyKeyExpr, zenoh_config::wrappers::EntityGlobalId, zenoh_protocol::core::EntityGlobalIdProto, }; @@ -38,15 +36,11 @@ use { use crate::api::selector::ZenohParameters; use crate::{ api::{ - builders::sample::{ - EncodingBuilderTrait, QoSBuilderTrait, SampleBuilder, SampleBuilderTrait, - TimestampBuilderTrait, - }, - bytes::{OptionZBytes, ZBytes}, + builders::reply::{ReplyBuilder, ReplyBuilderDelete, ReplyBuilderPut, ReplyErrBuilder}, + bytes::ZBytes, encoding::Encoding, key_expr::KeyExpr, - publisher::Priority, - sample::{Locality, QoSBuilder, Sample, SampleKind}, + sample::{Locality, Sample, SampleKind}, selector::Selector, session::{UndeclarableSealed, WeakSession}, value::Value, @@ -161,19 +155,7 @@ impl Query { >>::Error: Into, IntoZBytes: Into, { - ReplyBuilder { - query: self, - key_expr: key_expr.try_into().map_err(Into::into), - qos: response::ext::QoSType::RESPONSE.into(), - kind: ReplyBuilderPut { - payload: payload.into(), - encoding: Encoding::default(), - }, - timestamp: None, - #[cfg(feature = "unstable")] - source_info: SourceInfo::empty(), - attachment: None, - } + ReplyBuilder::<'_, 'b, ReplyBuilderPut>::new(self, key_expr, payload) } /// Sends a [`crate::query::ReplyError`] as a reply to this Query. @@ -182,10 +164,7 @@ impl Query { where IntoZBytes: Into, { - ReplyErrBuilder { - query: self, - value: Value::new(payload, Encoding::default()), - } + ReplyErrBuilder::new(self, payload) } /// Sends a [`crate::sample::Sample`] of kind [`crate::sample::SampleKind::Delete`] as a reply to this Query. @@ -202,16 +181,7 @@ impl Query { TryIntoKeyExpr: TryInto>, >>::Error: Into, { - ReplyBuilder { - query: self, - key_expr: key_expr.try_into().map_err(Into::into), - qos: response::ext::QoSType::RESPONSE.into(), - kind: ReplyBuilderDelete, - timestamp: None, - #[cfg(feature = "unstable")] - source_info: SourceInfo::empty(), - attachment: None, - } + ReplyBuilder::<'_, 'b, ReplyBuilderDelete>::new(self, key_expr) } /// Queries may or may not accept replies on key expressions that do not intersect with their own key expression. @@ -280,121 +250,8 @@ impl IntoFuture for ReplySample<'_> { } } -#[derive(Debug)] -pub struct ReplyBuilderPut { - payload: ZBytes, - encoding: Encoding, -} -#[derive(Debug)] -pub struct ReplyBuilderDelete; - -/// A builder returned by [`Query::reply()`](Query::reply) and [`Query::reply_del()`](Query::reply_del) -#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"] -#[derive(Debug)] -pub struct ReplyBuilder<'a, 'b, T> { - query: &'a Query, - key_expr: ZResult>, - kind: T, - timestamp: Option, - qos: QoSBuilder, - #[cfg(feature = "unstable")] - source_info: SourceInfo, - attachment: Option, -} - -#[zenoh_macros::internal_trait] -impl TimestampBuilderTrait for ReplyBuilder<'_, '_, T> { - fn timestamp>>(self, timestamp: U) -> Self { - Self { - timestamp: timestamp.into(), - ..self - } - } -} - -#[zenoh_macros::internal_trait] -impl SampleBuilderTrait for ReplyBuilder<'_, '_, T> { - fn attachment>(self, attachment: U) -> Self { - let attachment: OptionZBytes = attachment.into(); - Self { - attachment: attachment.into(), - ..self - } - } - - #[cfg(feature = "unstable")] - fn source_info(self, source_info: SourceInfo) -> Self { - Self { - source_info, - ..self - } - } -} - -#[zenoh_macros::internal_trait] -impl QoSBuilderTrait for ReplyBuilder<'_, '_, T> { - fn congestion_control(self, congestion_control: CongestionControl) -> Self { - let qos = self.qos.congestion_control(congestion_control); - Self { qos, ..self } - } - - fn priority(self, priority: Priority) -> Self { - let qos = self.qos.priority(priority); - Self { qos, ..self } - } - - fn express(self, is_express: bool) -> Self { - let qos = self.qos.express(is_express); - Self { qos, ..self } - } -} - -#[zenoh_macros::internal_trait] -impl EncodingBuilderTrait for ReplyBuilder<'_, '_, ReplyBuilderPut> { - fn encoding>(self, encoding: T) -> Self { - Self { - kind: ReplyBuilderPut { - encoding: encoding.into(), - ..self.kind - }, - ..self - } - } -} - -impl Resolvable for ReplyBuilder<'_, '_, T> { - type To = ZResult<()>; -} - -impl Wait for ReplyBuilder<'_, '_, ReplyBuilderPut> { - fn wait(self) -> ::To { - let key_expr = self.key_expr?.into_owned(); - let sample = SampleBuilder::put(key_expr, self.kind.payload) - .encoding(self.kind.encoding) - .timestamp(self.timestamp) - .qos(self.qos.into()); - #[cfg(feature = "unstable")] - let sample = sample.source_info(self.source_info); - let sample = sample.attachment(self.attachment); - self.query._reply_sample(sample.into()) - } -} - -impl Wait for ReplyBuilder<'_, '_, ReplyBuilderDelete> { - fn wait(self) -> ::To { - let key_expr = self.key_expr?.into_owned(); - let sample = SampleBuilder::delete(key_expr) - .timestamp(self.timestamp) - .qos(self.qos.into()); - #[cfg(feature = "unstable")] - let sample = sample.source_info(self.source_info); - let sample = sample.attachment(self.attachment); - self.query._reply_sample(sample.into()) - } -} - impl Query { - fn _reply_sample(&self, sample: Sample) -> ZResult<()> { + pub(crate) fn _reply_sample(&self, sample: Sample) -> ZResult<()> { let c = zcondfeat!( "unstable", !self._accepts_any_replies().unwrap_or(false), @@ -446,83 +303,6 @@ impl Query { Ok(()) } } - -impl IntoFuture for ReplyBuilder<'_, '_, ReplyBuilderPut> { - type Output = ::To; - type IntoFuture = Ready<::To>; - - fn into_future(self) -> Self::IntoFuture { - std::future::ready(self.wait()) - } -} - -impl IntoFuture for ReplyBuilder<'_, '_, ReplyBuilderDelete> { - type Output = ::To; - type IntoFuture = Ready<::To>; - - fn into_future(self) -> Self::IntoFuture { - std::future::ready(self.wait()) - } -} - -/// A builder returned by [`Query::reply_err()`](Query::reply_err). -#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"] -#[derive(Debug)] -pub struct ReplyErrBuilder<'a> { - query: &'a Query, - value: Value, -} - -#[zenoh_macros::internal_trait] -impl EncodingBuilderTrait for ReplyErrBuilder<'_> { - fn encoding>(self, encoding: T) -> Self { - let mut value = self.value.clone(); - value.encoding = encoding.into(); - Self { value, ..self } - } -} - -impl<'a> Resolvable for ReplyErrBuilder<'a> { - type To = ZResult<()>; -} - -impl Wait for ReplyErrBuilder<'_> { - fn wait(self) -> ::To { - self.query.inner.primitives.send_response(Response { - rid: self.query.inner.qid, - wire_expr: WireExpr { - scope: 0, - suffix: std::borrow::Cow::Owned(self.query.key_expr().as_str().to_owned()), - mapping: Mapping::Sender, - }, - payload: ResponseBody::Err(zenoh::Err { - encoding: self.value.encoding.into(), - ext_sinfo: None, - #[cfg(feature = "shared-memory")] - ext_shm: None, - ext_unknown: vec![], - payload: self.value.payload.into(), - }), - ext_qos: response::ext::QoSType::RESPONSE, - ext_tstamp: None, - ext_respid: Some(response::ext::ResponderIdType { - zid: self.query.inner.zid, - eid: self.query.eid, - }), - }); - Ok(()) - } -} - -impl<'a> IntoFuture for ReplyErrBuilder<'a> { - type Output = ::To; - type IntoFuture = Ready<::To>; - - fn into_future(self) -> Self::IntoFuture { - std::future::ready(self.wait()) - } -} - pub(crate) struct QueryableState { pub(crate) id: Id, pub(crate) key_expr: WireExpr<'static>, diff --git a/zenoh/src/lib.rs b/zenoh/src/lib.rs index 010575134e..c2cbbcec1a 100644 --- a/zenoh/src/lib.rs +++ b/zenoh/src/lib.rs @@ -245,11 +245,9 @@ pub mod query { pub use crate::api::queryable::ReplySample; pub use crate::api::{ builders::queryable::QueryableBuilder, + builders::reply::{ReplyBuilder, ReplyBuilderDelete, ReplyBuilderPut, ReplyErrBuilder}, query::{ConsolidationMode, QueryConsolidation, QueryTarget, Reply, ReplyError}, - queryable::{ - Query, Queryable, QueryableUndeclaration, ReplyBuilder, ReplyBuilderDelete, - ReplyBuilderPut, ReplyErrBuilder, - }, + queryable::{Query, Queryable, QueryableUndeclaration}, selector::Selector, }; #[zenoh_macros::unstable] From f11c95ea9b21817450abe66bdb4b284e2e2c096e Mon Sep 17 00:00:00 2001 From: Michael Ilyin Date: Sat, 12 Oct 2024 22:34:34 +0200 Subject: [PATCH 03/15] cargo fmt --- zenoh/src/lib.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/zenoh/src/lib.rs b/zenoh/src/lib.rs index c2cbbcec1a..a5f0bb355e 100644 --- a/zenoh/src/lib.rs +++ b/zenoh/src/lib.rs @@ -244,8 +244,10 @@ pub mod query { #[zenoh_macros::internal] pub use crate::api::queryable::ReplySample; pub use crate::api::{ - builders::queryable::QueryableBuilder, - builders::reply::{ReplyBuilder, ReplyBuilderDelete, ReplyBuilderPut, ReplyErrBuilder}, + builders::{ + queryable::QueryableBuilder, + reply::{ReplyBuilder, ReplyBuilderDelete, ReplyBuilderPut, ReplyErrBuilder}, + }, query::{ConsolidationMode, QueryConsolidation, QueryTarget, Reply, ReplyError}, queryable::{Query, Queryable, QueryableUndeclaration}, selector::Selector, From efb45b168e6880a631a26b217fa7d70622def0cd Mon Sep 17 00:00:00 2001 From: Michael Ilyin Date: Sun, 13 Oct 2024 10:10:46 +0200 Subject: [PATCH 04/15] subscriber builder moved to builders --- zenoh/src/api/builders/mod.rs | 1 + zenoh/src/api/builders/subscriber.rs | 268 +++++++++++++++++++++++++++ zenoh/src/api/session.rs | 3 +- zenoh/src/api/subscriber.rs | 254 +------------------------ zenoh/src/lib.rs | 11 +- 5 files changed, 284 insertions(+), 253 deletions(-) create mode 100644 zenoh/src/api/builders/subscriber.rs diff --git a/zenoh/src/api/builders/mod.rs b/zenoh/src/api/builders/mod.rs index 4a9ae98f9d..10d2415aa3 100644 --- a/zenoh/src/api/builders/mod.rs +++ b/zenoh/src/api/builders/mod.rs @@ -16,3 +16,4 @@ pub(crate) mod publisher; pub(crate) mod queryable; pub(crate) mod reply; pub(crate) mod sample; +pub(crate) mod subscriber; diff --git a/zenoh/src/api/builders/subscriber.rs b/zenoh/src/api/builders/subscriber.rs new file mode 100644 index 0000000000..340dbf1be7 --- /dev/null +++ b/zenoh/src/api/builders/subscriber.rs @@ -0,0 +1,268 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +use std::{ + future::{IntoFuture, Ready}, + sync::Arc, +}; + +use zenoh_core::{Resolvable, Wait}; +use zenoh_result::ZResult; + +use crate::{ + api::{ + handlers::{locked, Callback, DefaultHandler, IntoHandler}, + key_expr::KeyExpr, + sample::{Locality, Sample}, + subscriber::{Subscriber, SubscriberInner, SubscriberKind}, + }, + Session, +}; + +/// A builder for initializing a [`crate::pubsub::Subscriber`]. +/// +/// # Examples +/// ``` +/// # #[tokio::main] +/// # async fn main() { +/// +/// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); +/// let subscriber = session +/// .declare_subscriber("key/expression") +/// .await +/// .unwrap(); +/// # } +/// ``` +#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"] +#[derive(Debug)] +pub struct SubscriberBuilder<'a, 'b, Handler, const BACKGROUND: bool = false> { + #[cfg(feature = "internal")] + pub session: &'a Session, + #[cfg(not(feature = "internal"))] + pub(crate) session: &'a Session, + + #[cfg(feature = "internal")] + pub key_expr: ZResult>, + #[cfg(not(feature = "internal"))] + pub(crate) key_expr: ZResult>, + + #[cfg(feature = "internal")] + pub origin: Locality, + #[cfg(not(feature = "internal"))] + pub(crate) origin: Locality, + + #[cfg(feature = "internal")] + pub handler: Handler, + #[cfg(not(feature = "internal"))] + pub(crate) handler: Handler, +} + +impl<'a, 'b> SubscriberBuilder<'a, 'b, DefaultHandler> { + /// Receive the samples for this subscription with a callback. + /// + /// # Examples + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); + /// let subscriber = session + /// .declare_subscriber("key/expression") + /// .callback(|sample| { println!("Received: {} {:?}", sample.key_expr(), sample.payload()); }) + /// .await + /// .unwrap(); + /// # } + /// ``` + #[inline] + pub fn callback(self, callback: F) -> SubscriberBuilder<'a, 'b, Callback> + where + F: Fn(Sample) + Send + Sync + 'static, + { + self.with(Callback::new(Arc::new(callback))) + } + + /// Receive the samples for this subscription with a mutable callback. + /// + /// Using this guarantees that your callback will never be called concurrently. + /// If your callback is also accepted by the [`callback`](SubscriberBuilder::callback) method, we suggest you use it instead of `callback_mut`. + /// + /// # Examples + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); + /// let mut n = 0; + /// let subscriber = session + /// .declare_subscriber("key/expression") + /// .callback_mut(move |_sample| { n += 1; }) + /// .await + /// .unwrap(); + /// # } + /// ``` + #[inline] + pub fn callback_mut(self, callback: F) -> SubscriberBuilder<'a, 'b, Callback> + where + F: FnMut(Sample) + Send + Sync + 'static, + { + self.callback(locked(callback)) + } + + /// Receive the samples for this subscription with a [`Handler`](crate::handlers::IntoHandler). + /// + /// # Examples + /// ```no_run + /// # #[tokio::main] + /// # async fn main() { + /// + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); + /// let subscriber = session + /// .declare_subscriber("key/expression") + /// .with(flume::bounded(32)) + /// .await + /// .unwrap(); + /// while let Ok(sample) = subscriber.recv_async().await { + /// println!("Received: {} {:?}", sample.key_expr(), sample.payload()); + /// } + /// # } + /// ``` + #[inline] + pub fn with(self, handler: Handler) -> SubscriberBuilder<'a, 'b, Handler> + where + Handler: IntoHandler, + { + let SubscriberBuilder { + session, + key_expr, + origin, + handler: _, + } = self; + SubscriberBuilder { + session, + key_expr, + origin, + handler, + } + } +} + +impl<'a, 'b> SubscriberBuilder<'a, 'b, Callback> { + /// Register the subscriber callback to be run in background until the session is closed. + /// + /// Background builder doesn't return a `Subscriber` object anymore. + /// + /// # Examples + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); + /// // no need to assign and keep a variable with a background subscriber + /// session + /// .declare_subscriber("key/expression") + /// .callback(|sample| { println!("Received: {} {:?}", sample.key_expr(), sample.payload()); }) + /// .background() + /// .await + /// .unwrap(); + /// # } + /// ``` + pub fn background(self) -> SubscriberBuilder<'a, 'b, Callback, true> { + SubscriberBuilder { + session: self.session, + key_expr: self.key_expr, + origin: self.origin, + handler: self.handler, + } + } +} + +impl SubscriberBuilder<'_, '_, Handler, BACKGROUND> { + /// Changes the [`crate::sample::Locality`] of received publications. + /// + /// Restricts the matching publications that will be receive by this [`Subscriber`] to the ones + /// that have the given [`crate::sample::Locality`]. + #[zenoh_macros::unstable] + #[inline] + pub fn allowed_origin(mut self, origin: Locality) -> Self { + self.origin = origin; + self + } +} + +impl Resolvable for SubscriberBuilder<'_, '_, Handler> +where + Handler: IntoHandler + Send, + Handler::Handler: Send, +{ + type To = ZResult>; +} + +impl Wait for SubscriberBuilder<'_, '_, Handler> +where + Handler: IntoHandler + Send, + Handler::Handler: Send, +{ + fn wait(self) -> ::To { + let key_expr = self.key_expr?; + let session = self.session; + let (callback, receiver) = self.handler.into_handler(); + session + .0 + .declare_subscriber_inner(&key_expr, self.origin, callback) + .map(|sub_state| Subscriber { + inner: SubscriberInner { + session: session.downgrade(), + id: sub_state.id, + key_expr: sub_state.key_expr.clone(), + kind: SubscriberKind::Subscriber, + undeclare_on_drop: true, + }, + handler: receiver, + }) + } +} + +impl IntoFuture for SubscriberBuilder<'_, '_, Handler> +where + Handler: IntoHandler + Send, + Handler::Handler: Send, +{ + type Output = ::To; + type IntoFuture = Ready<::To>; + + fn into_future(self) -> Self::IntoFuture { + std::future::ready(self.wait()) + } +} + +impl Resolvable for SubscriberBuilder<'_, '_, Callback, true> { + type To = ZResult<()>; +} + +impl Wait for SubscriberBuilder<'_, '_, Callback, true> { + fn wait(self) -> ::To { + self.session + .0 + .declare_subscriber_inner(&self.key_expr?, self.origin, self.handler)?; + Ok(()) + } +} + +impl IntoFuture for SubscriberBuilder<'_, '_, Callback, true> { + type Output = ::To; + type IntoFuture = Ready<::To>; + + fn into_future(self) -> Self::IntoFuture { + std::future::ready(self.wait()) + } +} diff --git a/zenoh/src/api/session.rs b/zenoh/src/api/session.rs index 47102c3960..ec52eb480e 100644 --- a/zenoh/src/api/session.rs +++ b/zenoh/src/api/session.rs @@ -76,6 +76,7 @@ use super::{ SessionDeleteBuilder, SessionPutBuilder, }, queryable::QueryableBuilder, + subscriber::SubscriberBuilder, }, bytes::ZBytes, encoding::Encoding, @@ -89,7 +90,7 @@ use super::{ queryable::{Query, QueryInner, QueryableState}, sample::{DataInfo, DataInfoIntoSample, Locality, QoS, Sample, SampleKind}, selector::Selector, - subscriber::{SubscriberBuilder, SubscriberKind, SubscriberState}, + subscriber::{SubscriberKind, SubscriberState}, value::Value, Id, }; diff --git a/zenoh/src/api/subscriber.rs b/zenoh/src/api/subscriber.rs index 4d92c62db0..8c908f2a5b 100644 --- a/zenoh/src/api/subscriber.rs +++ b/zenoh/src/api/subscriber.rs @@ -15,7 +15,6 @@ use std::{ fmt, future::{IntoFuture, Ready}, ops::{Deref, DerefMut}, - sync::Arc, }; use tracing::error; @@ -24,15 +23,12 @@ use zenoh_result::ZResult; #[cfg(feature = "unstable")] use {zenoh_config::wrappers::EntityGlobalId, zenoh_protocol::core::EntityGlobalIdProto}; -use crate::{ - api::{ - handlers::{locked, Callback, DefaultHandler, IntoHandler}, - key_expr::KeyExpr, - sample::{Locality, Sample}, - session::{UndeclarableSealed, WeakSession}, - Id, - }, - Session, +use crate::api::{ + handlers::Callback, + key_expr::KeyExpr, + sample::{Locality, Sample}, + session::{UndeclarableSealed, WeakSession}, + Id, }; pub(crate) struct SubscriberState { @@ -98,244 +94,6 @@ impl IntoFuture for SubscriberUndeclaration { } } -/// A builder for initializing a [`crate::pubsub::Subscriber`]. -/// -/// # Examples -/// ``` -/// # #[tokio::main] -/// # async fn main() { -/// -/// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); -/// let subscriber = session -/// .declare_subscriber("key/expression") -/// .await -/// .unwrap(); -/// # } -/// ``` -#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"] -#[derive(Debug)] -pub struct SubscriberBuilder<'a, 'b, Handler, const BACKGROUND: bool = false> { - #[cfg(feature = "internal")] - pub session: &'a Session, - #[cfg(not(feature = "internal"))] - pub(crate) session: &'a Session, - - #[cfg(feature = "internal")] - pub key_expr: ZResult>, - #[cfg(not(feature = "internal"))] - pub(crate) key_expr: ZResult>, - - #[cfg(feature = "internal")] - pub origin: Locality, - #[cfg(not(feature = "internal"))] - pub(crate) origin: Locality, - - #[cfg(feature = "internal")] - pub handler: Handler, - #[cfg(not(feature = "internal"))] - pub(crate) handler: Handler, -} - -impl<'a, 'b> SubscriberBuilder<'a, 'b, DefaultHandler> { - /// Receive the samples for this subscription with a callback. - /// - /// # Examples - /// ``` - /// # #[tokio::main] - /// # async fn main() { - /// - /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); - /// let subscriber = session - /// .declare_subscriber("key/expression") - /// .callback(|sample| { println!("Received: {} {:?}", sample.key_expr(), sample.payload()); }) - /// .await - /// .unwrap(); - /// # } - /// ``` - #[inline] - pub fn callback(self, callback: F) -> SubscriberBuilder<'a, 'b, Callback> - where - F: Fn(Sample) + Send + Sync + 'static, - { - self.with(Callback::new(Arc::new(callback))) - } - - /// Receive the samples for this subscription with a mutable callback. - /// - /// Using this guarantees that your callback will never be called concurrently. - /// If your callback is also accepted by the [`callback`](SubscriberBuilder::callback) method, we suggest you use it instead of `callback_mut`. - /// - /// # Examples - /// ``` - /// # #[tokio::main] - /// # async fn main() { - /// - /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); - /// let mut n = 0; - /// let subscriber = session - /// .declare_subscriber("key/expression") - /// .callback_mut(move |_sample| { n += 1; }) - /// .await - /// .unwrap(); - /// # } - /// ``` - #[inline] - pub fn callback_mut(self, callback: F) -> SubscriberBuilder<'a, 'b, Callback> - where - F: FnMut(Sample) + Send + Sync + 'static, - { - self.callback(locked(callback)) - } - - /// Receive the samples for this subscription with a [`Handler`](crate::handlers::IntoHandler). - /// - /// # Examples - /// ```no_run - /// # #[tokio::main] - /// # async fn main() { - /// - /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); - /// let subscriber = session - /// .declare_subscriber("key/expression") - /// .with(flume::bounded(32)) - /// .await - /// .unwrap(); - /// while let Ok(sample) = subscriber.recv_async().await { - /// println!("Received: {} {:?}", sample.key_expr(), sample.payload()); - /// } - /// # } - /// ``` - #[inline] - pub fn with(self, handler: Handler) -> SubscriberBuilder<'a, 'b, Handler> - where - Handler: IntoHandler, - { - let SubscriberBuilder { - session, - key_expr, - origin, - handler: _, - } = self; - SubscriberBuilder { - session, - key_expr, - origin, - handler, - } - } -} - -impl<'a, 'b> SubscriberBuilder<'a, 'b, Callback> { - /// Register the subscriber callback to be run in background until the session is closed. - /// - /// Background builder doesn't return a `Subscriber` object anymore. - /// - /// # Examples - /// ``` - /// # #[tokio::main] - /// # async fn main() { - /// - /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); - /// // no need to assign and keep a variable with a background subscriber - /// session - /// .declare_subscriber("key/expression") - /// .callback(|sample| { println!("Received: {} {:?}", sample.key_expr(), sample.payload()); }) - /// .background() - /// .await - /// .unwrap(); - /// # } - /// ``` - pub fn background(self) -> SubscriberBuilder<'a, 'b, Callback, true> { - SubscriberBuilder { - session: self.session, - key_expr: self.key_expr, - origin: self.origin, - handler: self.handler, - } - } -} - -impl SubscriberBuilder<'_, '_, Handler, BACKGROUND> { - /// Changes the [`crate::sample::Locality`] of received publications. - /// - /// Restricts the matching publications that will be receive by this [`Subscriber`] to the ones - /// that have the given [`crate::sample::Locality`]. - #[zenoh_macros::unstable] - #[inline] - pub fn allowed_origin(mut self, origin: Locality) -> Self { - self.origin = origin; - self - } -} - -impl Resolvable for SubscriberBuilder<'_, '_, Handler> -where - Handler: IntoHandler + Send, - Handler::Handler: Send, -{ - type To = ZResult>; -} - -impl Wait for SubscriberBuilder<'_, '_, Handler> -where - Handler: IntoHandler + Send, - Handler::Handler: Send, -{ - fn wait(self) -> ::To { - let key_expr = self.key_expr?; - let session = self.session; - let (callback, receiver) = self.handler.into_handler(); - session - .0 - .declare_subscriber_inner(&key_expr, self.origin, callback) - .map(|sub_state| Subscriber { - inner: SubscriberInner { - session: session.downgrade(), - id: sub_state.id, - key_expr: sub_state.key_expr.clone(), - kind: SubscriberKind::Subscriber, - undeclare_on_drop: true, - }, - handler: receiver, - }) - } -} - -impl IntoFuture for SubscriberBuilder<'_, '_, Handler> -where - Handler: IntoHandler + Send, - Handler::Handler: Send, -{ - type Output = ::To; - type IntoFuture = Ready<::To>; - - fn into_future(self) -> Self::IntoFuture { - std::future::ready(self.wait()) - } -} - -impl Resolvable for SubscriberBuilder<'_, '_, Callback, true> { - type To = ZResult<()>; -} - -impl Wait for SubscriberBuilder<'_, '_, Callback, true> { - fn wait(self) -> ::To { - self.session - .0 - .declare_subscriber_inner(&self.key_expr?, self.origin, self.handler)?; - Ok(()) - } -} - -impl IntoFuture for SubscriberBuilder<'_, '_, Callback, true> { - type Output = ::To; - type IntoFuture = Ready<::To>; - - fn into_future(self) -> Self::IntoFuture { - std::future::ready(self.wait()) - } -} - /// A subscriber that provides data through a [`Handler`](crate::handlers::IntoHandler). /// /// Subscribers can be created from a zenoh [`Session`](crate::Session) diff --git a/zenoh/src/lib.rs b/zenoh/src/lib.rs index a5f0bb355e..d2e35bc1ca 100644 --- a/zenoh/src/lib.rs +++ b/zenoh/src/lib.rs @@ -226,12 +226,15 @@ pub mod pubsub { MatchingListener, MatchingListenerBuilder, MatchingListenerUndeclaration, MatchingStatus, }; pub use crate::api::{ - builders::publisher::{ - PublicationBuilder, PublicationBuilderDelete, PublicationBuilderPut, PublisherBuilder, - PublisherDeleteBuilder, PublisherPutBuilder, + builders::{ + publisher::{ + PublicationBuilder, PublicationBuilderDelete, PublicationBuilderPut, + PublisherBuilder, PublisherDeleteBuilder, PublisherPutBuilder, + }, + subscriber::SubscriberBuilder, }, publisher::{Publisher, PublisherUndeclaration}, - subscriber::{Subscriber, SubscriberBuilder}, + subscriber::Subscriber, }; } From 4c6289defd9e211567dc33b84ce6121e2e68107d Mon Sep 17 00:00:00 2001 From: Michael Ilyin Date: Sun, 13 Oct 2024 10:29:21 +0200 Subject: [PATCH 05/15] session open builder moved to builders --- zenoh/src/api/builders/mod.rs | 1 + zenoh/src/api/builders/session.rs | 174 +++++++++++++++++++++++++ zenoh/src/api/session.rs | 206 +++++------------------------- zenoh/src/lib.rs | 9 +- 4 files changed, 214 insertions(+), 176 deletions(-) create mode 100644 zenoh/src/api/builders/session.rs diff --git a/zenoh/src/api/builders/mod.rs b/zenoh/src/api/builders/mod.rs index 10d2415aa3..1379dcc557 100644 --- a/zenoh/src/api/builders/mod.rs +++ b/zenoh/src/api/builders/mod.rs @@ -16,4 +16,5 @@ pub(crate) mod publisher; pub(crate) mod queryable; pub(crate) mod reply; pub(crate) mod sample; +pub(crate) mod session; pub(crate) mod subscriber; diff --git a/zenoh/src/api/builders/session.rs b/zenoh/src/api/builders/session.rs new file mode 100644 index 0000000000..cb4396bfa1 --- /dev/null +++ b/zenoh/src/api/builders/session.rs @@ -0,0 +1,174 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use std::future::{IntoFuture, Ready}; + +use zenoh_core::{Resolvable, Wait}; +use zenoh_keyexpr::OwnedKeyExpr; +use zenoh_result::ZResult; + +use crate::api::session::Session; +#[cfg(feature = "unstable")] +use crate::net::runtime::Runtime; + +/// A builder returned by [`open`] used to open a zenoh [`Session`]. +/// +/// # Examples +/// ``` +/// # #[tokio::main] +/// # async fn main() { +/// +/// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); +/// # } +/// ``` +#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"] +pub struct OpenBuilder +where + TryIntoConfig: std::convert::TryInto + Send + 'static, + >::Error: std::fmt::Debug, +{ + config: TryIntoConfig, + #[cfg(feature = "shared-memory")] + shm_clients: Option>, +} + +impl OpenBuilder +where + TryIntoConfig: std::convert::TryInto + Send + 'static, + >::Error: std::fmt::Debug, +{ + pub(crate) fn new(config: TryIntoConfig) -> Self { + Self { + config, + #[cfg(feature = "shared-memory")] + shm_clients: None, + } + } +} + +#[cfg(feature = "shared-memory")] +impl OpenBuilder +where + TryIntoConfig: std::convert::TryInto + Send + 'static, + >::Error: std::fmt::Debug, +{ + pub fn with_shm_clients(mut self, shm_clients: Arc) -> Self { + self.shm_clients = Some(shm_clients); + self + } +} + +impl Resolvable for OpenBuilder +where + TryIntoConfig: std::convert::TryInto + Send + 'static, + >::Error: std::fmt::Debug, +{ + type To = ZResult; +} + +impl Wait for OpenBuilder +where + TryIntoConfig: std::convert::TryInto + Send + 'static, + >::Error: std::fmt::Debug, +{ + fn wait(self) -> ::To { + let config: crate::config::Config = self + .config + .try_into() + .map_err(|e| zerror!("Invalid Zenoh configuration {:?}", &e))?; + Session::new( + config, + #[cfg(feature = "shared-memory")] + self.shm_clients, + ) + .wait() + } +} + +impl IntoFuture for OpenBuilder +where + TryIntoConfig: std::convert::TryInto + Send + 'static, + >::Error: std::fmt::Debug, +{ + type Output = ::To; + type IntoFuture = Ready<::To>; + + fn into_future(self) -> Self::IntoFuture { + std::future::ready(self.wait()) + } +} + +/// Initialize a Session with an existing Runtime. +/// This operation is used by the plugins to share the same Runtime as the router. +#[zenoh_macros::internal] +pub fn init(runtime: Runtime) -> InitBuilder { + InitBuilder { + runtime, + aggregated_subscribers: vec![], + aggregated_publishers: vec![], + } +} + +/// A builder returned by [`init`] and used to initialize a Session with an existing Runtime. +#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"] +#[doc(hidden)] +#[zenoh_macros::internal] +pub struct InitBuilder { + runtime: Runtime, + aggregated_subscribers: Vec, + aggregated_publishers: Vec, +} + +#[zenoh_macros::internal] +impl InitBuilder { + #[inline] + pub fn aggregated_subscribers(mut self, exprs: Vec) -> Self { + self.aggregated_subscribers = exprs; + self + } + + #[inline] + pub fn aggregated_publishers(mut self, exprs: Vec) -> Self { + self.aggregated_publishers = exprs; + self + } +} + +#[zenoh_macros::internal] +impl Resolvable for InitBuilder { + type To = ZResult; +} + +#[zenoh_macros::internal] +impl Wait for InitBuilder { + fn wait(self) -> ::To { + Ok(Session::init( + self.runtime, + self.aggregated_subscribers, + self.aggregated_publishers, + false, + ) + .wait()) + } +} + +#[zenoh_macros::internal] +impl IntoFuture for InitBuilder { + type Output = ::To; + type IntoFuture = Ready<::To>; + + fn into_future(self) -> Self::IntoFuture { + std::future::ready(self.wait()) + } +} diff --git a/zenoh/src/api/session.rs b/zenoh/src/api/session.rs index ec52eb480e..1707a22766 100644 --- a/zenoh/src/api/session.rs +++ b/zenoh/src/api/session.rs @@ -17,7 +17,6 @@ use std::{ collections::HashMap, convert::TryInto, fmt, - future::{IntoFuture, Ready}, ops::Deref, sync::{ atomic::{AtomicU16, Ordering}, @@ -33,7 +32,7 @@ use uhlc::HLC; use zenoh_buffers::ZBuf; use zenoh_collections::SingleOrVec; use zenoh_config::{unwrap_or_default, wrappers::ZenohId}; -use zenoh_core::{zconfigurable, zread, Resolvable, Resolve, ResolveClosure, ResolveFuture, Wait}; +use zenoh_core::{zconfigurable, zread, Resolve, ResolveClosure, ResolveFuture, Wait}; #[cfg(feature = "unstable")] use zenoh_protocol::network::{ declare::{DeclareToken, SubscriberId, TokenId, UndeclareToken}, @@ -68,43 +67,45 @@ use zenoh_result::ZResult; use zenoh_shm::api::client_storage::ShmClientStorage; use zenoh_task::TaskController; -use super::{ - admin, - builders::{ - publisher::{ - PublicationBuilderDelete, PublicationBuilderPut, PublisherBuilder, - SessionDeleteBuilder, SessionPutBuilder, - }, - queryable::QueryableBuilder, - subscriber::SubscriberBuilder, - }, - bytes::ZBytes, - encoding::Encoding, - handlers::{Callback, DefaultHandler}, - info::SessionInfo, - key_expr::{KeyExpr, KeyExprInner}, - publisher::{Priority, PublisherState}, - query::{ - ConsolidationMode, QueryConsolidation, QueryState, QueryTarget, Reply, SessionGetBuilder, - }, - queryable::{Query, QueryInner, QueryableState}, - sample::{DataInfo, DataInfoIntoSample, Locality, QoS, Sample, SampleKind}, - selector::Selector, - subscriber::{SubscriberKind, SubscriberState}, - value::Value, - Id, -}; #[cfg(feature = "unstable")] -use super::{ +use crate::api::selector::ZenohParameters; +#[cfg(feature = "unstable")] +use crate::api::{ liveliness::{Liveliness, LivelinessTokenState}, publisher::Publisher, publisher::{MatchingListenerState, MatchingStatus}, query::LivelinessQueryState, sample::SourceInfo, }; -#[cfg(feature = "unstable")] -use crate::api::selector::ZenohParameters; use crate::{ + api::{ + admin, + builders::{ + publisher::{ + PublicationBuilderDelete, PublicationBuilderPut, PublisherBuilder, + SessionDeleteBuilder, SessionPutBuilder, + }, + queryable::QueryableBuilder, + session::OpenBuilder, + subscriber::SubscriberBuilder, + }, + bytes::ZBytes, + encoding::Encoding, + handlers::{Callback, DefaultHandler}, + info::SessionInfo, + key_expr::{KeyExpr, KeyExprInner}, + publisher::{Priority, PublisherState}, + query::{ + ConsolidationMode, QueryConsolidation, QueryState, QueryTarget, Reply, + SessionGetBuilder, + }, + queryable::{Query, QueryInner, QueryableState}, + sample::{DataInfo, DataInfoIntoSample, Locality, QoS, Sample, SampleKind}, + selector::Selector, + subscriber::{SubscriberKind, SubscriberState}, + value::Value, + Id, + }, net::{ primitives::Primitives, routing::dispatcher::face::Face, @@ -2879,146 +2880,5 @@ where TryIntoConfig: std::convert::TryInto + Send + 'static, >::Error: std::fmt::Debug, { - OpenBuilder { - config, - #[cfg(feature = "shared-memory")] - shm_clients: None, - } -} - -/// A builder returned by [`open`] used to open a zenoh [`Session`]. -/// -/// # Examples -/// ``` -/// # #[tokio::main] -/// # async fn main() { -/// -/// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); -/// # } -/// ``` -#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"] -pub struct OpenBuilder -where - TryIntoConfig: std::convert::TryInto + Send + 'static, - >::Error: std::fmt::Debug, -{ - config: TryIntoConfig, - #[cfg(feature = "shared-memory")] - shm_clients: Option>, -} - -#[cfg(feature = "shared-memory")] -impl OpenBuilder -where - TryIntoConfig: std::convert::TryInto + Send + 'static, - >::Error: std::fmt::Debug, -{ - pub fn with_shm_clients(mut self, shm_clients: Arc) -> Self { - self.shm_clients = Some(shm_clients); - self - } -} - -impl Resolvable for OpenBuilder -where - TryIntoConfig: std::convert::TryInto + Send + 'static, - >::Error: std::fmt::Debug, -{ - type To = ZResult; -} - -impl Wait for OpenBuilder -where - TryIntoConfig: std::convert::TryInto + Send + 'static, - >::Error: std::fmt::Debug, -{ - fn wait(self) -> ::To { - let config: crate::config::Config = self - .config - .try_into() - .map_err(|e| zerror!("Invalid Zenoh configuration {:?}", &e))?; - Session::new( - config, - #[cfg(feature = "shared-memory")] - self.shm_clients, - ) - .wait() - } -} - -impl IntoFuture for OpenBuilder -where - TryIntoConfig: std::convert::TryInto + Send + 'static, - >::Error: std::fmt::Debug, -{ - type Output = ::To; - type IntoFuture = Ready<::To>; - - fn into_future(self) -> Self::IntoFuture { - std::future::ready(self.wait()) - } -} - -/// Initialize a Session with an existing Runtime. -/// This operation is used by the plugins to share the same Runtime as the router. -#[zenoh_macros::internal] -pub fn init(runtime: Runtime) -> InitBuilder { - InitBuilder { - runtime, - aggregated_subscribers: vec![], - aggregated_publishers: vec![], - } -} - -/// A builder returned by [`init`] and used to initialize a Session with an existing Runtime. -#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"] -#[doc(hidden)] -#[zenoh_macros::internal] -pub struct InitBuilder { - runtime: Runtime, - aggregated_subscribers: Vec, - aggregated_publishers: Vec, -} - -#[zenoh_macros::internal] -impl InitBuilder { - #[inline] - pub fn aggregated_subscribers(mut self, exprs: Vec) -> Self { - self.aggregated_subscribers = exprs; - self - } - - #[inline] - pub fn aggregated_publishers(mut self, exprs: Vec) -> Self { - self.aggregated_publishers = exprs; - self - } -} - -#[zenoh_macros::internal] -impl Resolvable for InitBuilder { - type To = ZResult; -} - -#[zenoh_macros::internal] -impl Wait for InitBuilder { - fn wait(self) -> ::To { - Ok(Session::init( - self.runtime, - self.aggregated_subscribers, - self.aggregated_publishers, - false, - ) - .wait()) - } -} - -#[zenoh_macros::internal] -impl IntoFuture for InitBuilder { - type Output = ::To; - type IntoFuture = Ready<::To>; - - fn into_future(self) -> Self::IntoFuture { - std::future::ready(self.wait()) - } + OpenBuilder::new(config) } diff --git a/zenoh/src/lib.rs b/zenoh/src/lib.rs index d2e35bc1ca..8173b49cb9 100644 --- a/zenoh/src/lib.rs +++ b/zenoh/src/lib.rs @@ -188,12 +188,15 @@ pub mod session { pub use zenoh_protocol::core::EntityId; #[zenoh_macros::internal] - pub use crate::api::session::{init, InitBuilder}; + pub use crate::api::builders::session::{init, InitBuilder}; pub use crate::api::{ - builders::publisher::{SessionDeleteBuilder, SessionPutBuilder}, + builders::{ + publisher::{SessionDeleteBuilder, SessionPutBuilder}, + session::OpenBuilder, + }, info::{PeersZenohIdBuilder, RoutersZenohIdBuilder, SessionInfo, ZenohIdBuilder}, query::SessionGetBuilder, - session::{open, OpenBuilder, Session, SessionClosedError, Undeclarable}, + session::{open, Session, SessionClosedError, Undeclarable}, }; } From 3953fbb5aab4c4b0e89a3f4791d5872184e40436 Mon Sep 17 00:00:00 2001 From: Michael Ilyin Date: Sun, 13 Oct 2024 10:44:23 +0200 Subject: [PATCH 06/15] info builders moved to builders --- zenoh/src/api/builders/info.rs | 171 +++++++++++++++++++++++++++++++++ zenoh/src/api/builders/mod.rs | 1 + zenoh/src/api/info.rs | 156 ++---------------------------- zenoh/src/lib.rs | 3 +- 4 files changed, 181 insertions(+), 150 deletions(-) create mode 100644 zenoh/src/api/builders/info.rs diff --git a/zenoh/src/api/builders/info.rs b/zenoh/src/api/builders/info.rs new file mode 100644 index 0000000000..18e252c12e --- /dev/null +++ b/zenoh/src/api/builders/info.rs @@ -0,0 +1,171 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use std::future::{IntoFuture, Ready}; + +use zenoh_config::wrappers::ZenohId; +use zenoh_core::{Resolvable, Wait}; +use zenoh_protocol::core::WhatAmI; + +use crate::net::runtime::Runtime; + +/// A builder returned by [`SessionInfo::zid()`](SessionInfo::zid) that allows +/// to access the [`ZenohId`] of the current zenoh [`Session`](crate::Session). +/// +/// # Examples +/// ``` +/// # #[tokio::main] +/// # async fn main() { +/// +/// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); +/// let zid = session.info().zid().await; +/// # } +/// ``` +#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"] +pub struct ZenohIdBuilder<'a> { + runtime: &'a Runtime, +} + +impl<'a> ZenohIdBuilder<'a> { + pub(crate) fn new(runtime: &'a Runtime) -> Self { + Self { runtime } + } +} + +impl<'a> Resolvable for ZenohIdBuilder<'a> { + type To = ZenohId; +} + +impl<'a> Wait for ZenohIdBuilder<'a> { + fn wait(self) -> Self::To { + self.runtime.zid() + } +} + +impl<'a> IntoFuture for ZenohIdBuilder<'a> { + type Output = ::To; + type IntoFuture = Ready<::To>; + + fn into_future(self) -> Self::IntoFuture { + std::future::ready(self.wait()) + } +} + +/// A builder returned by [`SessionInfo::routers_zid()`](SessionInfo::routers_zid) that allows +/// to access the [`ZenohId`] of the zenoh routers this process is currently connected to +/// or the [`ZenohId`] of the current router if this code is run from a router (plugin). +/// +/// # Examples +/// ``` +/// # #[tokio::main] +/// # async fn main() { +/// +/// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); +/// let mut routers_zid = session.info().routers_zid().await; +/// while let Some(router_zid) = routers_zid.next() {} +/// # } +/// ``` +#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"] +pub struct RoutersZenohIdBuilder<'a> { + runtime: &'a Runtime, +} + +impl<'a> RoutersZenohIdBuilder<'a> { + pub(crate) fn new(runtime: &'a Runtime) -> Self { + Self { runtime } + } +} + +impl<'a> Resolvable for RoutersZenohIdBuilder<'a> { + type To = Box + Send + Sync>; +} + +impl<'a> Wait for RoutersZenohIdBuilder<'a> { + fn wait(self) -> Self::To { + Box::new( + zenoh_runtime::ZRuntime::Application + .block_in_place(self.runtime.manager().get_transports_unicast()) + .into_iter() + .filter_map(|s| { + s.get_whatami() + .ok() + .and_then(|what| (what == WhatAmI::Router).then_some(())) + .and_then(|_| s.get_zid().map(Into::into).ok()) + }), + ) + } +} + +impl<'a> IntoFuture for RoutersZenohIdBuilder<'a> { + type Output = ::To; + type IntoFuture = Ready<::To>; + + fn into_future(self) -> Self::IntoFuture { + std::future::ready(self.wait()) + } +} + +/// A builder returned by [`SessionInfo::peers_zid()`](SessionInfo::peers_zid) that allows +/// to access the [`ZenohId`] of the zenoh peers this process is currently connected to. +/// +/// # Examples +/// ``` +/// # #[tokio::main] +/// # async fn main() { +/// +/// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); +/// let zid = session.info().zid().await; +/// let mut peers_zid = session.info().peers_zid().await; +/// while let Some(peer_zid) = peers_zid.next() {} +/// # } +/// ``` +#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"] +pub struct PeersZenohIdBuilder<'a> { + runtime: &'a Runtime, +} + +impl<'a> PeersZenohIdBuilder<'a> { + pub(crate) fn new(runtime: &'a Runtime) -> Self { + Self { runtime } + } +} + +impl<'a> Resolvable for PeersZenohIdBuilder<'a> { + type To = Box + Send + Sync>; +} + +impl<'a> Wait for PeersZenohIdBuilder<'a> { + fn wait(self) -> ::To { + Box::new( + zenoh_runtime::ZRuntime::Application + .block_in_place(self.runtime.manager().get_transports_unicast()) + .into_iter() + .filter_map(|s| { + s.get_whatami() + .ok() + .and_then(|what| (what == WhatAmI::Peer).then_some(())) + .and_then(|_| s.get_zid().map(Into::into).ok()) + }), + ) + } +} + +impl<'a> IntoFuture for PeersZenohIdBuilder<'a> { + type Output = ::To; + type IntoFuture = Ready<::To>; + + fn into_future(self) -> Self::IntoFuture { + std::future::ready(self.wait()) + } +} diff --git a/zenoh/src/api/builders/mod.rs b/zenoh/src/api/builders/mod.rs index 1379dcc557..e8d7a6fc0a 100644 --- a/zenoh/src/api/builders/mod.rs +++ b/zenoh/src/api/builders/mod.rs @@ -12,6 +12,7 @@ // ZettaScale Zenoh Team, // +pub(crate) mod info; pub(crate) mod publisher; pub(crate) mod queryable; pub(crate) mod reply; diff --git a/zenoh/src/api/info.rs b/zenoh/src/api/info.rs index 498adf6658..e97cdcfe5a 100644 --- a/zenoh/src/api/info.rs +++ b/zenoh/src/api/info.rs @@ -13,146 +13,10 @@ // //! Tools to access information about the current zenoh [`Session`](crate::Session). -use std::future::{IntoFuture, Ready}; - -use zenoh_config::wrappers::ZenohId; -use zenoh_core::{Resolvable, Wait}; -use zenoh_protocol::core::WhatAmI; - -use crate::net::runtime::Runtime; - -/// A builder returned by [`SessionInfo::zid()`](SessionInfo::zid) that allows -/// to access the [`ZenohId`] of the current zenoh [`Session`](crate::Session). -/// -/// # Examples -/// ``` -/// # #[tokio::main] -/// # async fn main() { -/// -/// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); -/// let zid = session.info().zid().await; -/// # } -/// ``` -#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"] -pub struct ZenohIdBuilder<'a> { - runtime: &'a Runtime, -} - -impl<'a> Resolvable for ZenohIdBuilder<'a> { - type To = ZenohId; -} - -impl<'a> Wait for ZenohIdBuilder<'a> { - fn wait(self) -> Self::To { - self.runtime.zid() - } -} - -impl<'a> IntoFuture for ZenohIdBuilder<'a> { - type Output = ::To; - type IntoFuture = Ready<::To>; - - fn into_future(self) -> Self::IntoFuture { - std::future::ready(self.wait()) - } -} - -/// A builder returned by [`SessionInfo::routers_zid()`](SessionInfo::routers_zid) that allows -/// to access the [`ZenohId`] of the zenoh routers this process is currently connected to -/// or the [`ZenohId`] of the current router if this code is run from a router (plugin). -/// -/// # Examples -/// ``` -/// # #[tokio::main] -/// # async fn main() { -/// -/// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); -/// let mut routers_zid = session.info().routers_zid().await; -/// while let Some(router_zid) = routers_zid.next() {} -/// # } -/// ``` -#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"] -pub struct RoutersZenohIdBuilder<'a> { - runtime: &'a Runtime, -} - -impl<'a> Resolvable for RoutersZenohIdBuilder<'a> { - type To = Box + Send + Sync>; -} - -impl<'a> Wait for RoutersZenohIdBuilder<'a> { - fn wait(self) -> Self::To { - Box::new( - zenoh_runtime::ZRuntime::Application - .block_in_place(self.runtime.manager().get_transports_unicast()) - .into_iter() - .filter_map(|s| { - s.get_whatami() - .ok() - .and_then(|what| (what == WhatAmI::Router).then_some(())) - .and_then(|_| s.get_zid().map(Into::into).ok()) - }), - ) - } -} - -impl<'a> IntoFuture for RoutersZenohIdBuilder<'a> { - type Output = ::To; - type IntoFuture = Ready<::To>; - - fn into_future(self) -> Self::IntoFuture { - std::future::ready(self.wait()) - } -} - -/// A builder returned by [`SessionInfo::peers_zid()`](SessionInfo::peers_zid) that allows -/// to access the [`ZenohId`] of the zenoh peers this process is currently connected to. -/// -/// # Examples -/// ``` -/// # #[tokio::main] -/// # async fn main() { -/// -/// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); -/// let zid = session.info().zid().await; -/// let mut peers_zid = session.info().peers_zid().await; -/// while let Some(peer_zid) = peers_zid.next() {} -/// # } -/// ``` -#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"] -pub struct PeersZenohIdBuilder<'a> { - runtime: &'a Runtime, -} - -impl<'a> Resolvable for PeersZenohIdBuilder<'a> { - type To = Box + Send + Sync>; -} - -impl<'a> Wait for PeersZenohIdBuilder<'a> { - fn wait(self) -> ::To { - Box::new( - zenoh_runtime::ZRuntime::Application - .block_in_place(self.runtime.manager().get_transports_unicast()) - .into_iter() - .filter_map(|s| { - s.get_whatami() - .ok() - .and_then(|what| (what == WhatAmI::Peer).then_some(())) - .and_then(|_| s.get_zid().map(Into::into).ok()) - }), - ) - } -} - -impl<'a> IntoFuture for PeersZenohIdBuilder<'a> { - type Output = ::To; - type IntoFuture = Ready<::To>; - - fn into_future(self) -> Self::IntoFuture { - std::future::ready(self.wait()) - } -} - +use crate::{ + api::builders::info::{PeersZenohIdBuilder, RoutersZenohIdBuilder, ZenohIdBuilder}, + net::runtime::Runtime, +}; /// Struct returned by [`Session::info()`](crate::Session::info) which allows /// to access information about the current zenoh [`Session`](crate::Session). /// @@ -183,9 +47,7 @@ impl SessionInfo { /// # } /// ``` pub fn zid(&self) -> ZenohIdBuilder<'_> { - ZenohIdBuilder { - runtime: &self.runtime, - } + ZenohIdBuilder::new(&self.runtime) } /// Return the [`ZenohId`] of the zenoh routers this process is currently connected to @@ -202,9 +64,7 @@ impl SessionInfo { /// # } /// ``` pub fn routers_zid(&self) -> RoutersZenohIdBuilder<'_> { - RoutersZenohIdBuilder { - runtime: &self.runtime, - } + RoutersZenohIdBuilder::new(&self.runtime) } /// Return the [`ZenohId`] of the zenoh peers this process is currently connected to. @@ -220,8 +80,6 @@ impl SessionInfo { /// # } /// ``` pub fn peers_zid(&self) -> PeersZenohIdBuilder<'_> { - PeersZenohIdBuilder { - runtime: &self.runtime, - } + PeersZenohIdBuilder::new(&self.runtime) } } diff --git a/zenoh/src/lib.rs b/zenoh/src/lib.rs index 8173b49cb9..b8ec6677f4 100644 --- a/zenoh/src/lib.rs +++ b/zenoh/src/lib.rs @@ -191,10 +191,11 @@ pub mod session { pub use crate::api::builders::session::{init, InitBuilder}; pub use crate::api::{ builders::{ + info::{PeersZenohIdBuilder, RoutersZenohIdBuilder, ZenohIdBuilder}, publisher::{SessionDeleteBuilder, SessionPutBuilder}, session::OpenBuilder, }, - info::{PeersZenohIdBuilder, RoutersZenohIdBuilder, SessionInfo, ZenohIdBuilder}, + info::SessionInfo, query::SessionGetBuilder, session::{open, Session, SessionClosedError, Undeclarable}, }; From 902b45b812fe8f2bbd338b738cc1fcefd4242d2b Mon Sep 17 00:00:00 2001 From: Michael Ilyin Date: Sun, 13 Oct 2024 10:56:39 +0200 Subject: [PATCH 07/15] moved session get builder to builders --- zenoh/src/api/builders/mod.rs | 1 + zenoh/src/api/builders/query.rs | 357 ++++++++++++++++++++++++++++++++ zenoh/src/api/query.rs | 346 +------------------------------ zenoh/src/api/session.rs | 6 +- zenoh/src/lib.rs | 2 +- 5 files changed, 366 insertions(+), 346 deletions(-) create mode 100644 zenoh/src/api/builders/query.rs diff --git a/zenoh/src/api/builders/mod.rs b/zenoh/src/api/builders/mod.rs index e8d7a6fc0a..fa8e238c8f 100644 --- a/zenoh/src/api/builders/mod.rs +++ b/zenoh/src/api/builders/mod.rs @@ -14,6 +14,7 @@ pub(crate) mod info; pub(crate) mod publisher; +pub(crate) mod query; pub(crate) mod queryable; pub(crate) mod reply; pub(crate) mod sample; diff --git a/zenoh/src/api/builders/query.rs b/zenoh/src/api/builders/query.rs new file mode 100644 index 0000000000..bb8107b15a --- /dev/null +++ b/zenoh/src/api/builders/query.rs @@ -0,0 +1,357 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +use std::{ + future::{IntoFuture, Ready}, + sync::Arc, + time::Duration, +}; + +use zenoh_core::{Resolvable, Wait}; +use zenoh_protocol::{core::CongestionControl, network::request::ext::QueryTarget}; +use zenoh_result::ZResult; + +#[cfg(feature = "unstable")] +use crate::api::query::ReplyKeyExpr; +#[cfg(feature = "unstable")] +use crate::api::{sample::SourceInfo, selector::ZenohParameters}; +use crate::{ + api::{ + builders::sample::{EncodingBuilderTrait, QoSBuilderTrait, SampleBuilderTrait}, + bytes::ZBytes, + encoding::Encoding, + handlers::{locked, Callback, DefaultHandler, IntoHandler}, + publisher::Priority, + sample::{Locality, QoSBuilder}, + selector::Selector, + session::Session, + value::Value, + }, + bytes::OptionZBytes, + query::{QueryConsolidation, Reply}, +}; + +/// A builder for initializing a `query`. +/// +/// # Examples +/// ``` +/// # #[tokio::main] +/// # async fn main() { +/// use zenoh::{query::{ConsolidationMode, QueryTarget}}; +/// +/// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); +/// let replies = session +/// .get("key/expression?value>1") +/// .target(QueryTarget::All) +/// .consolidation(ConsolidationMode::None) +/// .await +/// .unwrap(); +/// while let Ok(reply) = replies.recv_async().await { +/// println!("Received {:?}", reply.result()) +/// } +/// # } +/// ``` +#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"] +#[derive(Debug)] +pub struct SessionGetBuilder<'a, 'b, Handler> { + pub(crate) session: &'a Session, + pub(crate) selector: ZResult>, + pub(crate) target: QueryTarget, + pub(crate) consolidation: QueryConsolidation, + pub(crate) qos: QoSBuilder, + pub(crate) destination: Locality, + pub(crate) timeout: Duration, + pub(crate) handler: Handler, + pub(crate) value: Option, + pub(crate) attachment: Option, + #[cfg(feature = "unstable")] + pub(crate) source_info: SourceInfo, +} + +#[zenoh_macros::internal_trait] +impl SampleBuilderTrait for SessionGetBuilder<'_, '_, Handler> { + #[zenoh_macros::unstable] + fn source_info(self, source_info: SourceInfo) -> Self { + Self { + source_info, + ..self + } + } + + fn attachment>(self, attachment: T) -> Self { + let attachment: OptionZBytes = attachment.into(); + Self { + attachment: attachment.into(), + ..self + } + } +} + +#[zenoh_macros::internal_trait] +impl QoSBuilderTrait for SessionGetBuilder<'_, '_, DefaultHandler> { + fn congestion_control(self, congestion_control: CongestionControl) -> Self { + let qos = self.qos.congestion_control(congestion_control); + Self { qos, ..self } + } + + fn priority(self, priority: Priority) -> Self { + let qos = self.qos.priority(priority); + Self { qos, ..self } + } + + fn express(self, is_express: bool) -> Self { + let qos = self.qos.express(is_express); + Self { qos, ..self } + } +} + +#[zenoh_macros::internal_trait] +impl EncodingBuilderTrait for SessionGetBuilder<'_, '_, Handler> { + fn encoding>(self, encoding: T) -> Self { + let mut value = self.value.unwrap_or_default(); + value.encoding = encoding.into(); + Self { + value: Some(value), + ..self + } + } +} + +impl<'a, 'b> SessionGetBuilder<'a, 'b, DefaultHandler> { + /// Receive the replies for this query with a callback. + /// + /// # Examples + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); + /// let queryable = session + /// .get("key/expression") + /// .callback(|reply| {println!("Received {:?}", reply.result());}) + /// .await + /// .unwrap(); + /// # } + /// ``` + #[inline] + pub fn callback(self, callback: F) -> SessionGetBuilder<'a, 'b, Callback> + where + F: Fn(Reply) + Send + Sync + 'static, + { + self.with(Callback::new(Arc::new(callback))) + } + + /// Receive the replies for this query with a mutable callback. + /// + /// Using this guarantees that your callback will never be called concurrently. + /// If your callback is also accepted by the [`callback`](crate::session::SessionGetBuilder::callback) method, we suggest you use it instead of `callback_mut`. + /// + /// # Examples + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); + /// let mut n = 0; + /// let queryable = session + /// .get("key/expression") + /// .callback_mut(move |reply| {n += 1;}) + /// .await + /// .unwrap(); + /// # } + /// ``` + #[inline] + pub fn callback_mut(self, callback: F) -> SessionGetBuilder<'a, 'b, Callback> + where + F: FnMut(Reply) + Send + Sync + 'static, + { + self.callback(locked(callback)) + } + + /// Receive the replies for this query with a [`Handler`](crate::handlers::IntoHandler). + /// + /// # Examples + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); + /// let replies = session + /// .get("key/expression") + /// .with(flume::bounded(32)) + /// .await + /// .unwrap(); + /// while let Ok(reply) = replies.recv_async().await { + /// println!("Received {:?}", reply.result()); + /// } + /// # } + /// ``` + #[inline] + pub fn with(self, handler: Handler) -> SessionGetBuilder<'a, 'b, Handler> + where + Handler: IntoHandler, + { + let SessionGetBuilder { + session, + selector, + target, + consolidation, + qos, + destination, + timeout, + value, + attachment, + #[cfg(feature = "unstable")] + source_info, + handler: _, + } = self; + SessionGetBuilder { + session, + selector, + target, + consolidation, + qos, + destination, + timeout, + value, + attachment, + #[cfg(feature = "unstable")] + source_info, + handler, + } + } +} +impl<'a, 'b, Handler> SessionGetBuilder<'a, 'b, Handler> { + #[inline] + pub fn payload(mut self, payload: IntoZBytes) -> Self + where + IntoZBytes: Into, + { + let mut value = self.value.unwrap_or_default(); + value.payload = payload.into(); + self.value = Some(value); + self + } + + /// Change the target of the query. + #[inline] + pub fn target(self, target: QueryTarget) -> Self { + Self { target, ..self } + } + + /// Change the consolidation mode of the query. + #[inline] + pub fn consolidation>(self, consolidation: QC) -> Self { + Self { + consolidation: consolidation.into(), + ..self + } + } + + /// + /// + /// Restrict the matching queryables that will receive the query + /// to the ones that have the given [`Locality`](Locality). + #[zenoh_macros::unstable] + #[inline] + pub fn allowed_destination(self, destination: Locality) -> Self { + Self { + destination, + ..self + } + } + + /// Set query timeout. + #[inline] + pub fn timeout(self, timeout: Duration) -> Self { + Self { timeout, ..self } + } + + /// + /// + /// By default, `get` guarantees that it will only receive replies whose key expressions intersect + /// with the queried key expression. + /// + /// If allowed to through `accept_replies(ReplyKeyExpr::Any)`, queryables may also reply on key + /// expressions that don't intersect with the query's. + #[zenoh_macros::unstable] + pub fn accept_replies(self, accept: ReplyKeyExpr) -> Self { + if accept == ReplyKeyExpr::Any { + if let Ok(Selector { + key_expr, + mut parameters, + }) = self.selector + { + parameters.to_mut().set_reply_key_expr_any(); + let selector = Ok(Selector { + key_expr, + parameters, + }); + return Self { selector, ..self }; + } + } + self + } +} + +impl Resolvable for SessionGetBuilder<'_, '_, Handler> +where + Handler: IntoHandler + Send, + Handler::Handler: Send, +{ + type To = ZResult; +} + +impl Wait for SessionGetBuilder<'_, '_, Handler> +where + Handler: IntoHandler + Send, + Handler::Handler: Send, +{ + fn wait(self) -> ::To { + let (callback, receiver) = self.handler.into_handler(); + let Selector { + key_expr, + parameters, + } = self.selector?; + self.session + .0 + .query( + &key_expr, + ¶meters, + self.target, + self.consolidation, + self.qos.into(), + self.destination, + self.timeout, + self.value, + self.attachment, + #[cfg(feature = "unstable")] + self.source_info, + callback, + ) + .map(|_| receiver) + } +} + +impl IntoFuture for SessionGetBuilder<'_, '_, Handler> +where + Handler: IntoHandler + Send, + Handler::Handler: Send, +{ + type Output = ::To; + type IntoFuture = Ready<::To>; + + fn into_future(self) -> Self::IntoFuture { + std::future::ready(self.wait()) + } +} diff --git a/zenoh/src/api/query.rs b/zenoh/src/api/query.rs index 5d584320c0..a06f41495d 100644 --- a/zenoh/src/api/query.rs +++ b/zenoh/src/api/query.rs @@ -12,43 +12,23 @@ // ZettaScale Zenoh Team, // -use std::{ - collections::HashMap, - error::Error, - fmt::Display, - future::{IntoFuture, Ready}, - sync::Arc, - time::Duration, -}; +use std::{collections::HashMap, error::Error, fmt::Display}; #[cfg(feature = "unstable")] use zenoh_config::ZenohId; -use zenoh_core::{Resolvable, Wait}; use zenoh_keyexpr::OwnedKeyExpr; +use zenoh_protocol::core::Parameters; #[cfg(feature = "unstable")] use zenoh_protocol::core::ZenohIdProto; -use zenoh_protocol::core::{CongestionControl, Parameters}; /// The [`Queryable`](crate::query::Queryable)s that should be target of a [`get`](Session::get). pub use zenoh_protocol::network::request::ext::QueryTarget; #[doc(inline)] pub use zenoh_protocol::zenoh::query::ConsolidationMode; -use zenoh_result::ZResult; -use super::{ - builders::sample::{EncodingBuilderTrait, QoSBuilderTrait, SampleBuilderTrait}, - bytes::ZBytes, - encoding::Encoding, - handlers::{locked, Callback, DefaultHandler, IntoHandler}, - key_expr::KeyExpr, - publisher::Priority, - sample::{Locality, QoSBuilder, Sample}, - selector::Selector, - session::Session, - value::Value, +use crate::api::{ + bytes::ZBytes, encoding::Encoding, handlers::Callback, key_expr::KeyExpr, sample::Sample, + selector::Selector, value::Value, }; -#[cfg(feature = "unstable")] -use super::{sample::SourceInfo, selector::ZenohParameters}; -use crate::bytes::OptionZBytes; /// The replies consolidation strategy to apply on replies to a [`get`](Session::get). #[derive(Clone, Copy, Debug, PartialEq, Eq)] @@ -187,270 +167,6 @@ impl QueryState { Selector::borrowed(&self.key_expr, &self.parameters) } } - -/// A builder for initializing a `query`. -/// -/// # Examples -/// ``` -/// # #[tokio::main] -/// # async fn main() { -/// use zenoh::{query::{ConsolidationMode, QueryTarget}}; -/// -/// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); -/// let replies = session -/// .get("key/expression?value>1") -/// .target(QueryTarget::All) -/// .consolidation(ConsolidationMode::None) -/// .await -/// .unwrap(); -/// while let Ok(reply) = replies.recv_async().await { -/// println!("Received {:?}", reply.result()) -/// } -/// # } -/// ``` -#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"] -#[derive(Debug)] -pub struct SessionGetBuilder<'a, 'b, Handler> { - pub(crate) session: &'a Session, - pub(crate) selector: ZResult>, - pub(crate) target: QueryTarget, - pub(crate) consolidation: QueryConsolidation, - pub(crate) qos: QoSBuilder, - pub(crate) destination: Locality, - pub(crate) timeout: Duration, - pub(crate) handler: Handler, - pub(crate) value: Option, - pub(crate) attachment: Option, - #[cfg(feature = "unstable")] - pub(crate) source_info: SourceInfo, -} - -#[zenoh_macros::internal_trait] -impl SampleBuilderTrait for SessionGetBuilder<'_, '_, Handler> { - #[zenoh_macros::unstable] - fn source_info(self, source_info: SourceInfo) -> Self { - Self { - source_info, - ..self - } - } - - fn attachment>(self, attachment: T) -> Self { - let attachment: OptionZBytes = attachment.into(); - Self { - attachment: attachment.into(), - ..self - } - } -} - -#[zenoh_macros::internal_trait] -impl QoSBuilderTrait for SessionGetBuilder<'_, '_, DefaultHandler> { - fn congestion_control(self, congestion_control: CongestionControl) -> Self { - let qos = self.qos.congestion_control(congestion_control); - Self { qos, ..self } - } - - fn priority(self, priority: Priority) -> Self { - let qos = self.qos.priority(priority); - Self { qos, ..self } - } - - fn express(self, is_express: bool) -> Self { - let qos = self.qos.express(is_express); - Self { qos, ..self } - } -} - -#[zenoh_macros::internal_trait] -impl EncodingBuilderTrait for SessionGetBuilder<'_, '_, Handler> { - fn encoding>(self, encoding: T) -> Self { - let mut value = self.value.unwrap_or_default(); - value.encoding = encoding.into(); - Self { - value: Some(value), - ..self - } - } -} - -impl<'a, 'b> SessionGetBuilder<'a, 'b, DefaultHandler> { - /// Receive the replies for this query with a callback. - /// - /// # Examples - /// ``` - /// # #[tokio::main] - /// # async fn main() { - /// - /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); - /// let queryable = session - /// .get("key/expression") - /// .callback(|reply| {println!("Received {:?}", reply.result());}) - /// .await - /// .unwrap(); - /// # } - /// ``` - #[inline] - pub fn callback(self, callback: F) -> SessionGetBuilder<'a, 'b, Callback> - where - F: Fn(Reply) + Send + Sync + 'static, - { - self.with(Callback::new(Arc::new(callback))) - } - - /// Receive the replies for this query with a mutable callback. - /// - /// Using this guarantees that your callback will never be called concurrently. - /// If your callback is also accepted by the [`callback`](crate::session::SessionGetBuilder::callback) method, we suggest you use it instead of `callback_mut`. - /// - /// # Examples - /// ``` - /// # #[tokio::main] - /// # async fn main() { - /// - /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); - /// let mut n = 0; - /// let queryable = session - /// .get("key/expression") - /// .callback_mut(move |reply| {n += 1;}) - /// .await - /// .unwrap(); - /// # } - /// ``` - #[inline] - pub fn callback_mut(self, callback: F) -> SessionGetBuilder<'a, 'b, Callback> - where - F: FnMut(Reply) + Send + Sync + 'static, - { - self.callback(locked(callback)) - } - - /// Receive the replies for this query with a [`Handler`](crate::handlers::IntoHandler). - /// - /// # Examples - /// ``` - /// # #[tokio::main] - /// # async fn main() { - /// - /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); - /// let replies = session - /// .get("key/expression") - /// .with(flume::bounded(32)) - /// .await - /// .unwrap(); - /// while let Ok(reply) = replies.recv_async().await { - /// println!("Received {:?}", reply.result()); - /// } - /// # } - /// ``` - #[inline] - pub fn with(self, handler: Handler) -> SessionGetBuilder<'a, 'b, Handler> - where - Handler: IntoHandler, - { - let SessionGetBuilder { - session, - selector, - target, - consolidation, - qos, - destination, - timeout, - value, - attachment, - #[cfg(feature = "unstable")] - source_info, - handler: _, - } = self; - SessionGetBuilder { - session, - selector, - target, - consolidation, - qos, - destination, - timeout, - value, - attachment, - #[cfg(feature = "unstable")] - source_info, - handler, - } - } -} -impl<'a, 'b, Handler> SessionGetBuilder<'a, 'b, Handler> { - #[inline] - pub fn payload(mut self, payload: IntoZBytes) -> Self - where - IntoZBytes: Into, - { - let mut value = self.value.unwrap_or_default(); - value.payload = payload.into(); - self.value = Some(value); - self - } - - /// Change the target of the query. - #[inline] - pub fn target(self, target: QueryTarget) -> Self { - Self { target, ..self } - } - - /// Change the consolidation mode of the query. - #[inline] - pub fn consolidation>(self, consolidation: QC) -> Self { - Self { - consolidation: consolidation.into(), - ..self - } - } - - /// - /// - /// Restrict the matching queryables that will receive the query - /// to the ones that have the given [`Locality`](Locality). - #[zenoh_macros::unstable] - #[inline] - pub fn allowed_destination(self, destination: Locality) -> Self { - Self { - destination, - ..self - } - } - - /// Set query timeout. - #[inline] - pub fn timeout(self, timeout: Duration) -> Self { - Self { timeout, ..self } - } - - /// - /// - /// By default, `get` guarantees that it will only receive replies whose key expressions intersect - /// with the queried key expression. - /// - /// If allowed to through `accept_replies(ReplyKeyExpr::Any)`, queryables may also reply on key - /// expressions that don't intersect with the query's. - #[zenoh_macros::unstable] - pub fn accept_replies(self, accept: ReplyKeyExpr) -> Self { - if accept == ReplyKeyExpr::Any { - if let Ok(Selector { - key_expr, - mut parameters, - }) = self.selector - { - parameters.to_mut().set_reply_key_expr_any(); - let selector = Ok(Selector { - key_expr, - parameters, - }); - return Self { selector, ..self }; - } - } - self - } -} - /// The kind of accepted query replies. #[zenoh_macros::unstable] #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)] @@ -461,55 +177,3 @@ pub enum ReplyKeyExpr { /// Accept replies whose key expressions match the query key expression. MatchingQuery, } - -impl Resolvable for SessionGetBuilder<'_, '_, Handler> -where - Handler: IntoHandler + Send, - Handler::Handler: Send, -{ - type To = ZResult; -} - -impl Wait for SessionGetBuilder<'_, '_, Handler> -where - Handler: IntoHandler + Send, - Handler::Handler: Send, -{ - fn wait(self) -> ::To { - let (callback, receiver) = self.handler.into_handler(); - let Selector { - key_expr, - parameters, - } = self.selector?; - self.session - .0 - .query( - &key_expr, - ¶meters, - self.target, - self.consolidation, - self.qos.into(), - self.destination, - self.timeout, - self.value, - self.attachment, - #[cfg(feature = "unstable")] - self.source_info, - callback, - ) - .map(|_| receiver) - } -} - -impl IntoFuture for SessionGetBuilder<'_, '_, Handler> -where - Handler: IntoHandler + Send, - Handler::Handler: Send, -{ - type Output = ::To; - type IntoFuture = Ready<::To>; - - fn into_future(self) -> Self::IntoFuture { - std::future::ready(self.wait()) - } -} diff --git a/zenoh/src/api/session.rs b/zenoh/src/api/session.rs index 1707a22766..c5ba24b98f 100644 --- a/zenoh/src/api/session.rs +++ b/zenoh/src/api/session.rs @@ -85,6 +85,7 @@ use crate::{ PublicationBuilderDelete, PublicationBuilderPut, PublisherBuilder, SessionDeleteBuilder, SessionPutBuilder, }, + query::SessionGetBuilder, queryable::QueryableBuilder, session::OpenBuilder, subscriber::SubscriberBuilder, @@ -95,10 +96,7 @@ use crate::{ info::SessionInfo, key_expr::{KeyExpr, KeyExprInner}, publisher::{Priority, PublisherState}, - query::{ - ConsolidationMode, QueryConsolidation, QueryState, QueryTarget, Reply, - SessionGetBuilder, - }, + query::{ConsolidationMode, QueryConsolidation, QueryState, QueryTarget, Reply}, queryable::{Query, QueryInner, QueryableState}, sample::{DataInfo, DataInfoIntoSample, Locality, QoS, Sample, SampleKind}, selector::Selector, diff --git a/zenoh/src/lib.rs b/zenoh/src/lib.rs index b8ec6677f4..cb8b946782 100644 --- a/zenoh/src/lib.rs +++ b/zenoh/src/lib.rs @@ -193,10 +193,10 @@ pub mod session { builders::{ info::{PeersZenohIdBuilder, RoutersZenohIdBuilder, ZenohIdBuilder}, publisher::{SessionDeleteBuilder, SessionPutBuilder}, + query::SessionGetBuilder, session::OpenBuilder, }, info::SessionInfo, - query::SessionGetBuilder, session::{open, Session, SessionClosedError, Undeclarable}, }; } From 3cebe5eb6ad1490370564c20a9b0e9e321e626ae Mon Sep 17 00:00:00 2001 From: Michael Ilyin Date: Sun, 13 Oct 2024 11:09:37 +0200 Subject: [PATCH 08/15] matching listener builder moved to builders --- zenoh/src/api/builders/matching_listener.rs | 250 ++++++++++++++++++++ zenoh/src/api/builders/mod.rs | 1 + zenoh/src/api/publisher.rs | 235 +----------------- zenoh/src/lib.rs | 5 +- 4 files changed, 259 insertions(+), 232 deletions(-) create mode 100644 zenoh/src/api/builders/matching_listener.rs diff --git a/zenoh/src/api/builders/matching_listener.rs b/zenoh/src/api/builders/matching_listener.rs new file mode 100644 index 0000000000..c726b657c0 --- /dev/null +++ b/zenoh/src/api/builders/matching_listener.rs @@ -0,0 +1,250 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +use std::future::{IntoFuture, Ready}; + +use zenoh_core::{Resolvable, Wait}; +use zenoh_result::ZResult; +#[cfg(feature = "unstable")] +use { + crate::api::{ + handlers::{Callback, DefaultHandler, IntoHandler}, + publisher::{MatchingListener, MatchingListenerInner, MatchingStatus, Publisher}, + }, + std::sync::Arc, +}; + +/// A builder for initializing a [`MatchingListener`]. +#[zenoh_macros::unstable] +#[derive(Debug)] +pub struct MatchingListenerBuilder<'a, 'b, Handler, const BACKGROUND: bool = false> { + pub(crate) publisher: &'a Publisher<'b>, + pub handler: Handler, +} + +#[zenoh_macros::unstable] +impl<'a, 'b> MatchingListenerBuilder<'a, 'b, DefaultHandler> { + /// Receive the MatchingStatuses for this listener with a callback. + /// + /// # Examples + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); + /// let publisher = session.declare_publisher("key/expression").await.unwrap(); + /// let matching_listener = publisher + /// .matching_listener() + /// .callback(|matching_status| { + /// if matching_status.matching_subscribers() { + /// println!("Publisher has matching subscribers."); + /// } else { + /// println!("Publisher has NO MORE matching subscribers."); + /// } + /// }) + /// .await + /// .unwrap(); + /// # } + /// ``` + #[inline] + #[zenoh_macros::unstable] + pub fn callback( + self, + callback: F, + ) -> MatchingListenerBuilder<'a, 'b, Callback> + where + F: Fn(MatchingStatus) + Send + Sync + 'static, + { + self.with(Callback::new(Arc::new(callback))) + } + + /// Receive the MatchingStatuses for this listener with a mutable callback. + /// + /// # Examples + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// + /// let mut n = 0; + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); + /// let publisher = session.declare_publisher("key/expression").await.unwrap(); + /// let matching_listener = publisher + /// .matching_listener() + /// .callback_mut(move |_matching_status| { n += 1; }) + /// .await + /// .unwrap(); + /// # } + /// ``` + #[inline] + #[zenoh_macros::unstable] + pub fn callback_mut( + self, + callback: F, + ) -> MatchingListenerBuilder<'a, 'b, Callback> + where + F: FnMut(MatchingStatus) + Send + Sync + 'static, + { + self.callback(crate::api::handlers::locked(callback)) + } + + /// Receive the MatchingStatuses for this listener with a [`Handler`](IntoHandler). + /// + /// # Examples + /// ```no_run + /// # #[tokio::main] + /// # async fn main() { + /// + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); + /// let publisher = session.declare_publisher("key/expression").await.unwrap(); + /// let matching_listener = publisher + /// .matching_listener() + /// .with(flume::bounded(32)) + /// .await + /// .unwrap(); + /// while let Ok(matching_status) = matching_listener.recv_async().await { + /// if matching_status.matching_subscribers() { + /// println!("Publisher has matching subscribers."); + /// } else { + /// println!("Publisher has NO MORE matching subscribers."); + /// } + /// } + /// # } + /// ``` + #[inline] + #[zenoh_macros::unstable] + pub fn with(self, handler: Handler) -> MatchingListenerBuilder<'a, 'b, Handler> + where + Handler: IntoHandler, + { + let MatchingListenerBuilder { + publisher, + handler: _, + } = self; + MatchingListenerBuilder { publisher, handler } + } +} + +#[zenoh_macros::unstable] +impl<'a, 'b> MatchingListenerBuilder<'a, 'b, Callback> { + /// Register the listener callback to be run in background until the publisher is undeclared. + /// + /// Background builder doesn't return a `MatchingListener` object anymore. + /// + /// # Examples + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); + /// let publisher = session.declare_publisher("key/expression").await.unwrap(); + /// // no need to assign and keep a variable with a background listener + /// publisher + /// .matching_listener() + /// .callback(|matching_status| { + /// if matching_status.matching_subscribers() { + /// println!("Publisher has matching subscribers."); + /// } else { + /// println!("Publisher has NO MORE matching subscribers."); + /// } + /// }) + /// .background() + /// .await + /// .unwrap(); + /// # } + /// ``` + pub fn background(self) -> MatchingListenerBuilder<'a, 'b, Callback, true> { + MatchingListenerBuilder { + publisher: self.publisher, + handler: self.handler, + } + } +} + +#[zenoh_macros::unstable] +impl Resolvable for MatchingListenerBuilder<'_, '_, Handler> +where + Handler: IntoHandler + Send, + Handler::Handler: Send, +{ + type To = ZResult>; +} + +#[zenoh_macros::unstable] +impl Wait for MatchingListenerBuilder<'_, '_, Handler> +where + Handler: IntoHandler + Send, + Handler::Handler: Send, +{ + #[zenoh_macros::unstable] + fn wait(self) -> ::To { + let (callback, handler) = self.handler.into_handler(); + let state = self + .publisher + .session + .declare_matches_listener_inner(self.publisher, callback)?; + zlock!(self.publisher.matching_listeners).insert(state.id); + Ok(MatchingListener { + inner: MatchingListenerInner { + session: self.publisher.session.clone(), + matching_listeners: self.publisher.matching_listeners.clone(), + id: state.id, + undeclare_on_drop: true, + }, + handler, + }) + } +} + +#[zenoh_macros::unstable] +impl IntoFuture for MatchingListenerBuilder<'_, '_, Handler> +where + Handler: IntoHandler + Send, + Handler::Handler: Send, +{ + type Output = ::To; + type IntoFuture = Ready<::To>; + + #[zenoh_macros::unstable] + fn into_future(self) -> Self::IntoFuture { + std::future::ready(self.wait()) + } +} + +#[zenoh_macros::unstable] +impl Resolvable for MatchingListenerBuilder<'_, '_, Callback, true> { + type To = ZResult<()>; +} + +#[zenoh_macros::unstable] +impl Wait for MatchingListenerBuilder<'_, '_, Callback, true> { + #[zenoh_macros::unstable] + fn wait(self) -> ::To { + let state = self + .publisher + .session + .declare_matches_listener_inner(self.publisher, self.handler)?; + zlock!(self.publisher.matching_listeners).insert(state.id); + Ok(()) + } +} + +#[zenoh_macros::unstable] +impl IntoFuture for MatchingListenerBuilder<'_, '_, Callback, true> { + type Output = ::To; + type IntoFuture = Ready<::To>; + + #[zenoh_macros::unstable] + fn into_future(self) -> Self::IntoFuture { + std::future::ready(self.wait()) + } +} diff --git a/zenoh/src/api/builders/mod.rs b/zenoh/src/api/builders/mod.rs index fa8e238c8f..4c32a070d1 100644 --- a/zenoh/src/api/builders/mod.rs +++ b/zenoh/src/api/builders/mod.rs @@ -13,6 +13,7 @@ // pub(crate) mod info; +pub(crate) mod matching_listener; pub(crate) mod publisher; pub(crate) mod query; pub(crate) mod queryable; diff --git a/zenoh/src/api/publisher.rs b/zenoh/src/api/publisher.rs index 5b0cd7d83b..6e4b89481a 100644 --- a/zenoh/src/api/publisher.rs +++ b/zenoh/src/api/publisher.rs @@ -28,7 +28,8 @@ use zenoh_result::{Error, ZResult}; #[cfg(feature = "unstable")] use { crate::api::{ - handlers::{Callback, DefaultHandler, IntoHandler}, + builders::matching_listener::MatchingListenerBuilder, + handlers::{Callback, DefaultHandler}, sample::SourceInfo, }, std::{collections::HashSet, sync::Arc, sync::Mutex}, @@ -37,7 +38,7 @@ use { zenoh_protocol::core::Reliability, }; -use super::{ +use crate::api::{ builders::publisher::{ PublicationBuilder, PublicationBuilderDelete, PublicationBuilderPut, PublisherDeleteBuilder, PublisherPutBuilder, @@ -46,9 +47,9 @@ use super::{ encoding::Encoding, key_expr::KeyExpr, sample::{Locality, Sample, SampleFields}, - session::UndeclarableSealed, + session::{UndeclarableSealed, WeakSession}, + Id, }; -use crate::api::{session::WeakSession, Id}; pub(crate) struct PublisherState { pub(crate) id: Id, @@ -539,232 +540,6 @@ impl MatchingStatus { self.matching } } - -/// A builder for initializing a [`MatchingListener`]. -#[zenoh_macros::unstable] -#[derive(Debug)] -pub struct MatchingListenerBuilder<'a, 'b, Handler, const BACKGROUND: bool = false> { - pub(crate) publisher: &'a Publisher<'b>, - pub handler: Handler, -} - -#[zenoh_macros::unstable] -impl<'a, 'b> MatchingListenerBuilder<'a, 'b, DefaultHandler> { - /// Receive the MatchingStatuses for this listener with a callback. - /// - /// # Examples - /// ``` - /// # #[tokio::main] - /// # async fn main() { - /// - /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); - /// let publisher = session.declare_publisher("key/expression").await.unwrap(); - /// let matching_listener = publisher - /// .matching_listener() - /// .callback(|matching_status| { - /// if matching_status.matching_subscribers() { - /// println!("Publisher has matching subscribers."); - /// } else { - /// println!("Publisher has NO MORE matching subscribers."); - /// } - /// }) - /// .await - /// .unwrap(); - /// # } - /// ``` - #[inline] - #[zenoh_macros::unstable] - pub fn callback( - self, - callback: F, - ) -> MatchingListenerBuilder<'a, 'b, Callback> - where - F: Fn(MatchingStatus) + Send + Sync + 'static, - { - self.with(Callback::new(Arc::new(callback))) - } - - /// Receive the MatchingStatuses for this listener with a mutable callback. - /// - /// # Examples - /// ``` - /// # #[tokio::main] - /// # async fn main() { - /// - /// let mut n = 0; - /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); - /// let publisher = session.declare_publisher("key/expression").await.unwrap(); - /// let matching_listener = publisher - /// .matching_listener() - /// .callback_mut(move |_matching_status| { n += 1; }) - /// .await - /// .unwrap(); - /// # } - /// ``` - #[inline] - #[zenoh_macros::unstable] - pub fn callback_mut( - self, - callback: F, - ) -> MatchingListenerBuilder<'a, 'b, Callback> - where - F: FnMut(MatchingStatus) + Send + Sync + 'static, - { - self.callback(crate::api::handlers::locked(callback)) - } - - /// Receive the MatchingStatuses for this listener with a [`Handler`](IntoHandler). - /// - /// # Examples - /// ```no_run - /// # #[tokio::main] - /// # async fn main() { - /// - /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); - /// let publisher = session.declare_publisher("key/expression").await.unwrap(); - /// let matching_listener = publisher - /// .matching_listener() - /// .with(flume::bounded(32)) - /// .await - /// .unwrap(); - /// while let Ok(matching_status) = matching_listener.recv_async().await { - /// if matching_status.matching_subscribers() { - /// println!("Publisher has matching subscribers."); - /// } else { - /// println!("Publisher has NO MORE matching subscribers."); - /// } - /// } - /// # } - /// ``` - #[inline] - #[zenoh_macros::unstable] - pub fn with(self, handler: Handler) -> MatchingListenerBuilder<'a, 'b, Handler> - where - Handler: IntoHandler, - { - let MatchingListenerBuilder { - publisher, - handler: _, - } = self; - MatchingListenerBuilder { publisher, handler } - } -} - -#[zenoh_macros::unstable] -impl<'a, 'b> MatchingListenerBuilder<'a, 'b, Callback> { - /// Register the listener callback to be run in background until the publisher is undeclared. - /// - /// Background builder doesn't return a `MatchingListener` object anymore. - /// - /// # Examples - /// ``` - /// # #[tokio::main] - /// # async fn main() { - /// - /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); - /// let publisher = session.declare_publisher("key/expression").await.unwrap(); - /// // no need to assign and keep a variable with a background listener - /// publisher - /// .matching_listener() - /// .callback(|matching_status| { - /// if matching_status.matching_subscribers() { - /// println!("Publisher has matching subscribers."); - /// } else { - /// println!("Publisher has NO MORE matching subscribers."); - /// } - /// }) - /// .background() - /// .await - /// .unwrap(); - /// # } - /// ``` - pub fn background(self) -> MatchingListenerBuilder<'a, 'b, Callback, true> { - MatchingListenerBuilder { - publisher: self.publisher, - handler: self.handler, - } - } -} - -#[zenoh_macros::unstable] -impl Resolvable for MatchingListenerBuilder<'_, '_, Handler> -where - Handler: IntoHandler + Send, - Handler::Handler: Send, -{ - type To = ZResult>; -} - -#[zenoh_macros::unstable] -impl Wait for MatchingListenerBuilder<'_, '_, Handler> -where - Handler: IntoHandler + Send, - Handler::Handler: Send, -{ - #[zenoh_macros::unstable] - fn wait(self) -> ::To { - let (callback, handler) = self.handler.into_handler(); - let state = self - .publisher - .session - .declare_matches_listener_inner(self.publisher, callback)?; - zlock!(self.publisher.matching_listeners).insert(state.id); - Ok(MatchingListener { - inner: MatchingListenerInner { - session: self.publisher.session.clone(), - matching_listeners: self.publisher.matching_listeners.clone(), - id: state.id, - undeclare_on_drop: true, - }, - handler, - }) - } -} - -#[zenoh_macros::unstable] -impl IntoFuture for MatchingListenerBuilder<'_, '_, Handler> -where - Handler: IntoHandler + Send, - Handler::Handler: Send, -{ - type Output = ::To; - type IntoFuture = Ready<::To>; - - #[zenoh_macros::unstable] - fn into_future(self) -> Self::IntoFuture { - std::future::ready(self.wait()) - } -} - -#[zenoh_macros::unstable] -impl Resolvable for MatchingListenerBuilder<'_, '_, Callback, true> { - type To = ZResult<()>; -} - -#[zenoh_macros::unstable] -impl Wait for MatchingListenerBuilder<'_, '_, Callback, true> { - #[zenoh_macros::unstable] - fn wait(self) -> ::To { - let state = self - .publisher - .session - .declare_matches_listener_inner(self.publisher, self.handler)?; - zlock!(self.publisher.matching_listeners).insert(state.id); - Ok(()) - } -} - -#[zenoh_macros::unstable] -impl IntoFuture for MatchingListenerBuilder<'_, '_, Callback, true> { - type Output = ::To; - type IntoFuture = Ready<::To>; - - #[zenoh_macros::unstable] - fn into_future(self) -> Self::IntoFuture { - std::future::ready(self.wait()) - } -} - #[zenoh_macros::unstable] pub(crate) struct MatchingListenerState { pub(crate) id: Id, diff --git a/zenoh/src/lib.rs b/zenoh/src/lib.rs index cb8b946782..ea673ef87e 100644 --- a/zenoh/src/lib.rs +++ b/zenoh/src/lib.rs @@ -226,8 +226,9 @@ pub mod bytes { /// Pub/sub primitives pub mod pubsub { #[zenoh_macros::unstable] - pub use crate::api::publisher::{ - MatchingListener, MatchingListenerBuilder, MatchingListenerUndeclaration, MatchingStatus, + pub use crate::api::{ + builders::matching_listener::MatchingListenerBuilder, + publisher::{MatchingListener, MatchingListenerUndeclaration, MatchingStatus}, }; pub use crate::api::{ builders::{ From da21e66cdee3d95b63c3f39461458e3d8c35e7dc Mon Sep 17 00:00:00 2001 From: Michael Ilyin Date: Sun, 13 Oct 2024 11:19:04 +0200 Subject: [PATCH 09/15] scouting builder moved to builder --- zenoh/src/api/builders/mod.rs | 1 + zenoh/src/api/builders/scouting.rs | 168 +++++++++++++++++++++++++++++ zenoh/src/api/scouting.rs | 162 ++-------------------------- zenoh/src/lib.rs | 5 +- 4 files changed, 183 insertions(+), 153 deletions(-) create mode 100644 zenoh/src/api/builders/scouting.rs diff --git a/zenoh/src/api/builders/mod.rs b/zenoh/src/api/builders/mod.rs index 4c32a070d1..a9cfcab630 100644 --- a/zenoh/src/api/builders/mod.rs +++ b/zenoh/src/api/builders/mod.rs @@ -19,5 +19,6 @@ pub(crate) mod query; pub(crate) mod queryable; pub(crate) mod reply; pub(crate) mod sample; +pub(crate) mod scouting; pub(crate) mod session; pub(crate) mod subscriber; diff --git a/zenoh/src/api/builders/scouting.rs b/zenoh/src/api/builders/scouting.rs new file mode 100644 index 0000000000..bfe229069a --- /dev/null +++ b/zenoh/src/api/builders/scouting.rs @@ -0,0 +1,168 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use std::{ + future::{IntoFuture, Ready}, + sync::Arc, +}; + +use zenoh_config::wrappers::Hello; +use zenoh_core::{Resolvable, Wait}; +use zenoh_protocol::core::WhatAmIMatcher; +use zenoh_result::ZResult; + +use crate::api::{ + handlers::{locked, Callback, DefaultHandler, IntoHandler}, + scouting::{Scout, _scout}, +}; + +/// A builder for initializing a [`Scout`]. +/// +/// # Examples +/// ```no_run +/// # #[tokio::main] +/// # async fn main() { +/// use zenoh::config::WhatAmI; +/// +/// let receiver = zenoh::scout(WhatAmI::Peer | WhatAmI::Router, zenoh::Config::default()) +/// .await +/// .unwrap(); +/// while let Ok(hello) = receiver.recv_async().await { +/// println!("{}", hello); +/// } +/// # } +/// ``` +#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"] +#[derive(Debug)] +pub struct ScoutBuilder { + pub(crate) what: WhatAmIMatcher, + pub(crate) config: ZResult, + pub(crate) handler: Handler, +} + +impl ScoutBuilder { + /// Receive the [`Hello`] messages from this scout with a callback. + /// + /// # Examples + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use zenoh::config::WhatAmI; + /// + /// let scout = zenoh::scout(WhatAmI::Peer | WhatAmI::Router, zenoh::Config::default()) + /// .callback(|hello| { println!("{}", hello); }) + /// .await + /// .unwrap(); + /// # } + /// ``` + #[inline] + pub fn callback(self, callback: F) -> ScoutBuilder> + where + F: Fn(Hello) + Send + Sync + 'static, + { + self.with(Callback::new(Arc::new(callback))) + } + + /// Receive the [`Hello`] messages from this scout with a mutable callback. + /// + /// Using this guarantees that your callback will never be called concurrently. + /// If your callback is also accepted by the [`callback`](ScoutBuilder::callback) method, we suggest you use it instead of `callback_mut`. + /// + /// # Examples + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use zenoh::config::WhatAmI; + /// + /// let mut n = 0; + /// let scout = zenoh::scout(WhatAmI::Peer | WhatAmI::Router, zenoh::Config::default()) + /// .callback_mut(move |_hello| { n += 1; }) + /// .await + /// .unwrap(); + /// # } + /// ``` + #[inline] + pub fn callback_mut(self, callback: F) -> ScoutBuilder> + where + F: FnMut(Hello) + Send + Sync + 'static, + { + self.callback(locked(callback)) + } + + /// Receive the [`Hello`] messages from this scout with a [`Handler`](crate::handlers::IntoHandler). + /// + /// # Examples + /// ```no_run + /// # #[tokio::main] + /// # async fn main() { + /// use zenoh::config::WhatAmI; + /// + /// let receiver = zenoh::scout(WhatAmI::Peer | WhatAmI::Router, zenoh::Config::default()) + /// .with(flume::bounded(32)) + /// .await + /// .unwrap(); + /// while let Ok(hello) = receiver.recv_async().await { + /// println!("{}", hello); + /// } + /// # } + /// ``` + #[inline] + pub fn with(self, handler: Handler) -> ScoutBuilder + where + Handler: IntoHandler, + { + let ScoutBuilder { + what, + config, + handler: _, + } = self; + ScoutBuilder { + what, + config, + handler, + } + } +} + +impl Resolvable for ScoutBuilder +where + Handler: IntoHandler + Send, + Handler::Handler: Send, +{ + type To = ZResult>; +} + +impl Wait for ScoutBuilder +where + Handler: IntoHandler + Send, + Handler::Handler: Send, +{ + fn wait(self) -> ::To { + let (callback, receiver) = self.handler.into_handler(); + _scout(self.what, self.config?, callback).map(|scout| Scout { scout, receiver }) + } +} + +impl IntoFuture for ScoutBuilder +where + Handler: IntoHandler + Send, + Handler::Handler: Send, +{ + type Output = ::To; + type IntoFuture = Ready<::To>; + + fn into_future(self) -> Self::IntoFuture { + std::future::ready(self.wait()) + } +} diff --git a/zenoh/src/api/scouting.rs b/zenoh/src/api/scouting.rs index 8b7adbd791..d4e66c7561 100644 --- a/zenoh/src/api/scouting.rs +++ b/zenoh/src/api/scouting.rs @@ -11,168 +11,22 @@ // Contributors: // ZettaScale Zenoh Team, // -use std::{ - fmt, - future::{IntoFuture, Ready}, - net::SocketAddr, - ops::Deref, - sync::Arc, - time::Duration, -}; +use std::{fmt, net::SocketAddr, ops::Deref, time::Duration}; use tokio::net::UdpSocket; use zenoh_config::wrappers::Hello; -use zenoh_core::{Resolvable, Wait}; use zenoh_protocol::core::WhatAmIMatcher; use zenoh_result::ZResult; use zenoh_task::TerminatableTask; use crate::{ - api::handlers::{locked, Callback, DefaultHandler, IntoHandler}, + api::{ + builders::scouting::ScoutBuilder, + handlers::{Callback, DefaultHandler}, + }, net::runtime::{orchestrator::Loop, Runtime}, Config, }; - -/// A builder for initializing a [`Scout`]. -/// -/// # Examples -/// ```no_run -/// # #[tokio::main] -/// # async fn main() { -/// use zenoh::config::WhatAmI; -/// -/// let receiver = zenoh::scout(WhatAmI::Peer | WhatAmI::Router, zenoh::Config::default()) -/// .await -/// .unwrap(); -/// while let Ok(hello) = receiver.recv_async().await { -/// println!("{}", hello); -/// } -/// # } -/// ``` -#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"] -#[derive(Debug)] -pub struct ScoutBuilder { - pub(crate) what: WhatAmIMatcher, - pub(crate) config: ZResult, - pub(crate) handler: Handler, -} - -impl ScoutBuilder { - /// Receive the [`Hello`] messages from this scout with a callback. - /// - /// # Examples - /// ``` - /// # #[tokio::main] - /// # async fn main() { - /// use zenoh::config::WhatAmI; - /// - /// let scout = zenoh::scout(WhatAmI::Peer | WhatAmI::Router, zenoh::Config::default()) - /// .callback(|hello| { println!("{}", hello); }) - /// .await - /// .unwrap(); - /// # } - /// ``` - #[inline] - pub fn callback(self, callback: F) -> ScoutBuilder> - where - F: Fn(Hello) + Send + Sync + 'static, - { - self.with(Callback::new(Arc::new(callback))) - } - - /// Receive the [`Hello`] messages from this scout with a mutable callback. - /// - /// Using this guarantees that your callback will never be called concurrently. - /// If your callback is also accepted by the [`callback`](ScoutBuilder::callback) method, we suggest you use it instead of `callback_mut`. - /// - /// # Examples - /// ``` - /// # #[tokio::main] - /// # async fn main() { - /// use zenoh::config::WhatAmI; - /// - /// let mut n = 0; - /// let scout = zenoh::scout(WhatAmI::Peer | WhatAmI::Router, zenoh::Config::default()) - /// .callback_mut(move |_hello| { n += 1; }) - /// .await - /// .unwrap(); - /// # } - /// ``` - #[inline] - pub fn callback_mut(self, callback: F) -> ScoutBuilder> - where - F: FnMut(Hello) + Send + Sync + 'static, - { - self.callback(locked(callback)) - } - - /// Receive the [`Hello`] messages from this scout with a [`Handler`](crate::handlers::IntoHandler). - /// - /// # Examples - /// ```no_run - /// # #[tokio::main] - /// # async fn main() { - /// use zenoh::config::WhatAmI; - /// - /// let receiver = zenoh::scout(WhatAmI::Peer | WhatAmI::Router, zenoh::Config::default()) - /// .with(flume::bounded(32)) - /// .await - /// .unwrap(); - /// while let Ok(hello) = receiver.recv_async().await { - /// println!("{}", hello); - /// } - /// # } - /// ``` - #[inline] - pub fn with(self, handler: Handler) -> ScoutBuilder - where - Handler: IntoHandler, - { - let ScoutBuilder { - what, - config, - handler: _, - } = self; - ScoutBuilder { - what, - config, - handler, - } - } -} - -impl Resolvable for ScoutBuilder -where - Handler: IntoHandler + Send, - Handler::Handler: Send, -{ - type To = ZResult>; -} - -impl Wait for ScoutBuilder -where - Handler: IntoHandler + Send, - Handler::Handler: Send, -{ - fn wait(self) -> ::To { - let (callback, receiver) = self.handler.into_handler(); - _scout(self.what, self.config?, callback).map(|scout| Scout { scout, receiver }) - } -} - -impl IntoFuture for ScoutBuilder -where - Handler: IntoHandler + Send, - Handler::Handler: Send, -{ - type Output = ::To; - type IntoFuture = Ready<::To>; - - fn into_future(self) -> Self::IntoFuture { - std::future::ready(self.wait()) - } -} - /// A scout that returns [`Hello`] messages through a callback. /// /// # Examples @@ -282,7 +136,11 @@ impl Scout { } } -fn _scout(what: WhatAmIMatcher, config: Config, callback: Callback) -> ZResult { +pub(crate) fn _scout( + what: WhatAmIMatcher, + config: Config, + callback: Callback, +) -> ZResult { tracing::trace!("scout({}, {})", what, &config); let default_addr = SocketAddr::from(zenoh_config::defaults::scouting::multicast::address); let addr = config diff --git a/zenoh/src/lib.rs b/zenoh/src/lib.rs index ea673ef87e..0443560661 100644 --- a/zenoh/src/lib.rs +++ b/zenoh/src/lib.rs @@ -292,7 +292,10 @@ pub mod qos { pub mod scouting { pub use zenoh_config::wrappers::Hello; - pub use crate::api::scouting::{scout, Scout, ScoutBuilder}; + pub use crate::api::{ + builders::scouting::ScoutBuilder, + scouting::{scout, Scout}, + }; } /// Liveliness primitives From 2923f9eeadcde02e3c01f6c9310b73d8c2d2afaa Mon Sep 17 00:00:00 2001 From: Michael Ilyin Date: Sun, 13 Oct 2024 11:28:08 +0200 Subject: [PATCH 10/15] clippy fixes --- zenoh/src/api/builders/matching_listener.rs | 3 +++ zenoh/src/api/builders/queryable.rs | 6 +----- zenoh/src/api/builders/session.rs | 1 + 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/zenoh/src/api/builders/matching_listener.rs b/zenoh/src/api/builders/matching_listener.rs index c726b657c0..d81ba6e4c9 100644 --- a/zenoh/src/api/builders/matching_listener.rs +++ b/zenoh/src/api/builders/matching_listener.rs @@ -11,9 +11,12 @@ // Contributors: // ZettaScale Zenoh Team, // +#[cfg(feature = "unstable")] use std::future::{IntoFuture, Ready}; +#[cfg(feature = "unstable")] use zenoh_core::{Resolvable, Wait}; +#[cfg(feature = "unstable")] use zenoh_result::ZResult; #[cfg(feature = "unstable")] use { diff --git a/zenoh/src/api/builders/queryable.rs b/zenoh/src/api/builders/queryable.rs index 24da21f025..8d4befbef2 100644 --- a/zenoh/src/api/builders/queryable.rs +++ b/zenoh/src/api/builders/queryable.rs @@ -18,16 +18,12 @@ use std::{ use zenoh_core::{Resolvable, Wait}; use zenoh_result::ZResult; -#[zenoh_macros::unstable] -use { - crate::api::queryable::Query, crate::api::queryable::Queryable, - crate::api::queryable::QueryableInner, -}; use crate::{ api::{ handlers::{locked, DefaultHandler, IntoHandler}, key_expr::KeyExpr, + queryable::{Query, Queryable, QueryableInner}, sample::Locality, }, handlers::Callback, diff --git a/zenoh/src/api/builders/session.rs b/zenoh/src/api/builders/session.rs index cb4396bfa1..c79dd8d70b 100644 --- a/zenoh/src/api/builders/session.rs +++ b/zenoh/src/api/builders/session.rs @@ -15,6 +15,7 @@ use std::future::{IntoFuture, Ready}; use zenoh_core::{Resolvable, Wait}; +#[cfg(feature = "internal")] use zenoh_keyexpr::OwnedKeyExpr; use zenoh_result::ZResult; From c2124c94e88fdd5eb35f9c2e13a69d458e63c562 Mon Sep 17 00:00:00 2001 From: Michael Ilyin Date: Sun, 13 Oct 2024 11:33:53 +0200 Subject: [PATCH 11/15] clippy fix --- zenoh/src/api/builders/session.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zenoh/src/api/builders/session.rs b/zenoh/src/api/builders/session.rs index c79dd8d70b..e907734446 100644 --- a/zenoh/src/api/builders/session.rs +++ b/zenoh/src/api/builders/session.rs @@ -20,7 +20,7 @@ use zenoh_keyexpr::OwnedKeyExpr; use zenoh_result::ZResult; use crate::api::session::Session; -#[cfg(feature = "unstable")] +#[cfg(feature = "internal")] use crate::net::runtime::Runtime; /// A builder returned by [`open`] used to open a zenoh [`Session`]. From 91b4fdc72ed13a3222d21597bb3f771d5660da6c Mon Sep 17 00:00:00 2001 From: Michael Ilyin Date: Sun, 13 Oct 2024 16:06:57 +0200 Subject: [PATCH 12/15] clippy fix --- zenoh/src/api/builders/session.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/zenoh/src/api/builders/session.rs b/zenoh/src/api/builders/session.rs index e907734446..03e2a1d09e 100644 --- a/zenoh/src/api/builders/session.rs +++ b/zenoh/src/api/builders/session.rs @@ -13,11 +13,15 @@ // use std::future::{IntoFuture, Ready}; +#[cfg(feature = "shared-memory")] +use std::sync::Arc; use zenoh_core::{Resolvable, Wait}; #[cfg(feature = "internal")] use zenoh_keyexpr::OwnedKeyExpr; use zenoh_result::ZResult; +#[cfg(feature = "shared-memory")] +use zenoh_shm::api::client_storage::ShmClientStorage; use crate::api::session::Session; #[cfg(feature = "internal")] From 05b9a9a9a771076233d8af665852b0d892a87acd Mon Sep 17 00:00:00 2001 From: Michael Ilyin Date: Sun, 13 Oct 2024 16:16:18 +0200 Subject: [PATCH 13/15] replaced 'super::' to absolute `crate::api::` to align 'use' statements --- zenoh/src/api/admin.rs | 2 +- zenoh/src/api/builders/publisher.rs | 2 +- zenoh/src/api/key_expr.rs | 2 +- zenoh/src/api/liveliness.rs | 2 +- zenoh/src/api/loader.rs | 2 +- zenoh/src/api/plugins.rs | 2 +- zenoh/src/api/sample.rs | 2 +- zenoh/src/api/selector.rs | 2 +- zenoh/src/api/value.rs | 2 +- 9 files changed, 9 insertions(+), 9 deletions(-) diff --git a/zenoh/src/api/admin.rs b/zenoh/src/api/admin.rs index b6071f97f8..1621b0199f 100644 --- a/zenoh/src/api/admin.rs +++ b/zenoh/src/api/admin.rs @@ -26,7 +26,7 @@ use zenoh_transport::{ TransportEventHandler, TransportMulticastEventHandler, TransportPeer, TransportPeerEventHandler, }; -use super::{ +use crate::api::{ encoding::Encoding, key_expr::KeyExpr, queryable::Query, diff --git a/zenoh/src/api/builders/publisher.rs b/zenoh/src/api/builders/publisher.rs index a8ccc5022f..59d2201d43 100644 --- a/zenoh/src/api/builders/publisher.rs +++ b/zenoh/src/api/builders/publisher.rs @@ -18,7 +18,7 @@ use zenoh_core::{Resolvable, Result as ZResult, Wait}; use zenoh_protocol::core::Reliability; use zenoh_protocol::{core::CongestionControl, network::Mapping}; -use super::sample::TimestampBuilderTrait; +use crate::api::builders::sample::TimestampBuilderTrait; #[cfg(feature = "unstable")] use crate::api::sample::SourceInfo; use crate::{ diff --git a/zenoh/src/api/key_expr.rs b/zenoh/src/api/key_expr.rs index 2a26df215f..33156c59c6 100644 --- a/zenoh/src/api/key_expr.rs +++ b/zenoh/src/api/key_expr.rs @@ -25,7 +25,7 @@ use zenoh_protocol::{ }; use zenoh_result::ZResult; -use super::session::{Session, SessionInner, UndeclarableSealed}; +use crate::api::session::{Session, SessionInner, UndeclarableSealed}; use crate::net::primitives::Primitives; #[derive(Clone, Debug)] diff --git a/zenoh/src/api/liveliness.rs b/zenoh/src/api/liveliness.rs index fcfb2eb36b..af3aae138d 100644 --- a/zenoh/src/api/liveliness.rs +++ b/zenoh/src/api/liveliness.rs @@ -23,7 +23,7 @@ use tracing::error; use zenoh_config::unwrap_or_default; use zenoh_core::{Resolvable, Resolve, Result as ZResult, Wait}; -use super::{ +use crate::api::{ handlers::{locked, DefaultHandler, IntoHandler}, key_expr::KeyExpr, query::Reply, diff --git a/zenoh/src/api/loader.rs b/zenoh/src/api/loader.rs index 175e0c6816..74acd8621c 100644 --- a/zenoh/src/api/loader.rs +++ b/zenoh/src/api/loader.rs @@ -14,7 +14,7 @@ use zenoh_config::{Config, PluginLoad}; use zenoh_result::ZResult; -use super::plugins::{PluginsManager, PLUGIN_PREFIX}; +use crate::api::plugins::{PluginsManager, PLUGIN_PREFIX}; use crate::net::runtime::Runtime; pub(crate) fn load_plugin( diff --git a/zenoh/src/api/plugins.rs b/zenoh/src/api/plugins.rs index 2623ce2c6f..d4e5390a4b 100644 --- a/zenoh/src/api/plugins.rs +++ b/zenoh/src/api/plugins.rs @@ -21,7 +21,7 @@ use zenoh_plugin_trait::{ use zenoh_protocol::core::key_expr::keyexpr; use zenoh_result::ZResult; -use super::key_expr::KeyExpr; +use crate::api::key_expr::KeyExpr; use crate::net::runtime::Runtime; zconfigurable! { diff --git a/zenoh/src/api/sample.rs b/zenoh/src/api/sample.rs index 20ccdf6617..b3c5e0ede5 100644 --- a/zenoh/src/api/sample.rs +++ b/zenoh/src/api/sample.rs @@ -24,7 +24,7 @@ use zenoh_protocol::{ network::declare::ext::QoSType, }; -use super::{ +use crate::api::{ builders::sample::QoSBuilderTrait, bytes::ZBytes, encoding::Encoding, key_expr::KeyExpr, publisher::Priority, value::Value, }; diff --git a/zenoh/src/api/selector.rs b/zenoh/src/api/selector.rs index 1ff8753406..45d230da3e 100644 --- a/zenoh/src/api/selector.rs +++ b/zenoh/src/api/selector.rs @@ -22,7 +22,7 @@ use zenoh_protocol::core::{ #[cfg(feature = "unstable")] use ::{zenoh_result::ZResult, zenoh_util::time_range::TimeRange}; -use super::{key_expr::KeyExpr, queryable::Query}; +use crate::api::{key_expr::KeyExpr, queryable::Query}; /// A selector is the combination of a [Key Expression](crate::key_expr::KeyExpr), which defines the /// set of keys that are relevant to an operation, and a set of parameters diff --git a/zenoh/src/api/value.rs b/zenoh/src/api/value.rs index 3bb2c7d467..f374634d01 100644 --- a/zenoh/src/api/value.rs +++ b/zenoh/src/api/value.rs @@ -13,7 +13,7 @@ // //! Value primitives. -use super::{bytes::ZBytes, encoding::Encoding}; +use crate::api::{bytes::ZBytes, encoding::Encoding}; /// A zenoh [`Value`] contains a `payload` and an [`Encoding`] that indicates how the payload's [`ZBytes`] should be interpreted. #[non_exhaustive] From d32ed2d880e085bb5a3e723b80655da5e79c4c42 Mon Sep 17 00:00:00 2001 From: Michael Ilyin Date: Sun, 13 Oct 2024 16:17:39 +0200 Subject: [PATCH 14/15] cargo fmt --- zenoh/src/api/admin.rs | 17 ++++++++++------- zenoh/src/api/builders/publisher.rs | 5 +++-- zenoh/src/api/key_expr.rs | 6 ++++-- zenoh/src/api/liveliness.rs | 20 +++++++++++--------- zenoh/src/api/loader.rs | 6 ++++-- zenoh/src/api/plugins.rs | 3 +-- 6 files changed, 33 insertions(+), 24 deletions(-) diff --git a/zenoh/src/api/admin.rs b/zenoh/src/api/admin.rs index 1621b0199f..b0d0bed0f7 100644 --- a/zenoh/src/api/admin.rs +++ b/zenoh/src/api/admin.rs @@ -26,14 +26,17 @@ use zenoh_transport::{ TransportEventHandler, TransportMulticastEventHandler, TransportPeer, TransportPeerEventHandler, }; -use crate::api::{ - encoding::Encoding, - key_expr::KeyExpr, - queryable::Query, - sample::{DataInfo, Locality, SampleKind}, - subscriber::SubscriberKind, +use crate::{ + api::{ + encoding::Encoding, + key_expr::KeyExpr, + queryable::Query, + sample::{DataInfo, Locality, SampleKind}, + session::WeakSession, + subscriber::SubscriberKind, + }, + handlers::Callback, }; -use crate::{api::session::WeakSession, handlers::Callback}; lazy_static::lazy_static!( static ref KE_STARSTAR: &'static keyexpr = unsafe { keyexpr::from_str_unchecked("**") }; diff --git a/zenoh/src/api/builders/publisher.rs b/zenoh/src/api/builders/publisher.rs index 59d2201d43..ac6565cd27 100644 --- a/zenoh/src/api/builders/publisher.rs +++ b/zenoh/src/api/builders/publisher.rs @@ -18,12 +18,13 @@ use zenoh_core::{Resolvable, Result as ZResult, Wait}; use zenoh_protocol::core::Reliability; use zenoh_protocol::{core::CongestionControl, network::Mapping}; -use crate::api::builders::sample::TimestampBuilderTrait; #[cfg(feature = "unstable")] use crate::api::sample::SourceInfo; use crate::{ api::{ - builders::sample::{EncodingBuilderTrait, QoSBuilderTrait, SampleBuilderTrait}, + builders::sample::{ + EncodingBuilderTrait, QoSBuilderTrait, SampleBuilderTrait, TimestampBuilderTrait, + }, bytes::{OptionZBytes, ZBytes}, encoding::Encoding, key_expr::KeyExpr, diff --git a/zenoh/src/api/key_expr.rs b/zenoh/src/api/key_expr.rs index 33156c59c6..2a3c775bfc 100644 --- a/zenoh/src/api/key_expr.rs +++ b/zenoh/src/api/key_expr.rs @@ -25,8 +25,10 @@ use zenoh_protocol::{ }; use zenoh_result::ZResult; -use crate::api::session::{Session, SessionInner, UndeclarableSealed}; -use crate::net::primitives::Primitives; +use crate::{ + api::session::{Session, SessionInner, UndeclarableSealed}, + net::primitives::Primitives, +}; #[derive(Clone, Debug)] pub(crate) enum KeyExprInner<'a> { diff --git a/zenoh/src/api/liveliness.rs b/zenoh/src/api/liveliness.rs index af3aae138d..2c1fca5c73 100644 --- a/zenoh/src/api/liveliness.rs +++ b/zenoh/src/api/liveliness.rs @@ -23,16 +23,18 @@ use tracing::error; use zenoh_config::unwrap_or_default; use zenoh_core::{Resolvable, Resolve, Result as ZResult, Wait}; -use crate::api::{ - handlers::{locked, DefaultHandler, IntoHandler}, - key_expr::KeyExpr, - query::Reply, - sample::{Locality, Sample}, - session::{Session, UndeclarableSealed}, - subscriber::{Subscriber, SubscriberInner}, - Id, +use crate::{ + api::{ + handlers::{locked, DefaultHandler, IntoHandler}, + key_expr::KeyExpr, + query::Reply, + sample::{Locality, Sample}, + session::{Session, UndeclarableSealed, WeakSession}, + subscriber::{Subscriber, SubscriberInner}, + Id, + }, + handlers::Callback, }; -use crate::{api::session::WeakSession, handlers::Callback}; /// A structure with functions to declare a [`LivelinessToken`](LivelinessToken), /// query existing [`LivelinessTokens`](LivelinessToken) diff --git a/zenoh/src/api/loader.rs b/zenoh/src/api/loader.rs index 74acd8621c..951d25020e 100644 --- a/zenoh/src/api/loader.rs +++ b/zenoh/src/api/loader.rs @@ -14,8 +14,10 @@ use zenoh_config::{Config, PluginLoad}; use zenoh_result::ZResult; -use crate::api::plugins::{PluginsManager, PLUGIN_PREFIX}; -use crate::net::runtime::Runtime; +use crate::{ + api::plugins::{PluginsManager, PLUGIN_PREFIX}, + net::runtime::Runtime, +}; pub(crate) fn load_plugin( plugin_mgr: &mut PluginsManager, diff --git a/zenoh/src/api/plugins.rs b/zenoh/src/api/plugins.rs index d4e5390a4b..2f4a2ec856 100644 --- a/zenoh/src/api/plugins.rs +++ b/zenoh/src/api/plugins.rs @@ -21,8 +21,7 @@ use zenoh_plugin_trait::{ use zenoh_protocol::core::key_expr::keyexpr; use zenoh_result::ZResult; -use crate::api::key_expr::KeyExpr; -use crate::net::runtime::Runtime; +use crate::{api::key_expr::KeyExpr, net::runtime::Runtime}; zconfigurable! { pub static ref PLUGIN_PREFIX: String = "zenoh_plugin_".to_string(); From 46c6c05b2a1a79394d76fb66b9ba7f42be29ac92 Mon Sep 17 00:00:00 2001 From: Michael Ilyin Date: Sun, 13 Oct 2024 16:32:06 +0200 Subject: [PATCH 15/15] cargo doc fixes --- zenoh/src/api/builders/info.rs | 6 +++--- zenoh/src/api/builders/session.rs | 2 +- zenoh/src/api/info.rs | 8 ++++---- zenoh/src/api/query.rs | 8 ++++---- zenoh/src/api/queryable.rs | 8 ++++---- zenoh/src/api/subscriber.rs | 8 ++++---- 6 files changed, 20 insertions(+), 20 deletions(-) diff --git a/zenoh/src/api/builders/info.rs b/zenoh/src/api/builders/info.rs index 18e252c12e..e8396eab08 100644 --- a/zenoh/src/api/builders/info.rs +++ b/zenoh/src/api/builders/info.rs @@ -20,7 +20,7 @@ use zenoh_protocol::core::WhatAmI; use crate::net::runtime::Runtime; -/// A builder returned by [`SessionInfo::zid()`](SessionInfo::zid) that allows +/// A builder returned by [`SessionInfo::zid()`](crate::session::SessionInfo::zid) that allows /// to access the [`ZenohId`] of the current zenoh [`Session`](crate::Session). /// /// # Examples @@ -62,7 +62,7 @@ impl<'a> IntoFuture for ZenohIdBuilder<'a> { } } -/// A builder returned by [`SessionInfo::routers_zid()`](SessionInfo::routers_zid) that allows +/// A builder returned by [`SessionInfo::routers_zid()`](crate::session::SessionInfo::routers_zid) that allows /// to access the [`ZenohId`] of the zenoh routers this process is currently connected to /// or the [`ZenohId`] of the current router if this code is run from a router (plugin). /// @@ -116,7 +116,7 @@ impl<'a> IntoFuture for RoutersZenohIdBuilder<'a> { } } -/// A builder returned by [`SessionInfo::peers_zid()`](SessionInfo::peers_zid) that allows +/// A builder returned by [`SessionInfo::peers_zid()`](crate::session::SessionInfo::peers_zid) that allows /// to access the [`ZenohId`] of the zenoh peers this process is currently connected to. /// /// # Examples diff --git a/zenoh/src/api/builders/session.rs b/zenoh/src/api/builders/session.rs index 03e2a1d09e..0ada1f4a67 100644 --- a/zenoh/src/api/builders/session.rs +++ b/zenoh/src/api/builders/session.rs @@ -27,7 +27,7 @@ use crate::api::session::Session; #[cfg(feature = "internal")] use crate::net::runtime::Runtime; -/// A builder returned by [`open`] used to open a zenoh [`Session`]. +/// A builder returned by [`crate::open`] used to open a zenoh [`Session`]. /// /// # Examples /// ``` diff --git a/zenoh/src/api/info.rs b/zenoh/src/api/info.rs index e97cdcfe5a..db562c9610 100644 --- a/zenoh/src/api/info.rs +++ b/zenoh/src/api/info.rs @@ -35,7 +35,7 @@ pub struct SessionInfo { } impl SessionInfo { - /// Return the [`ZenohId`] of the current zenoh [`Session`](crate::Session). + /// Return the [`crate::session::ZenohId`] of the current zenoh [`Session`](crate::Session). /// /// # Examples /// ``` @@ -50,8 +50,8 @@ impl SessionInfo { ZenohIdBuilder::new(&self.runtime) } - /// Return the [`ZenohId`] of the zenoh routers this process is currently connected to - /// or the [`ZenohId`] of the current router if this code is run from a router (plugin). + /// Return the [`crate::session::ZenohId`] of the zenoh routers this process is currently connected to + /// or the [`crate::session::ZenohId`] of the current router if this code is run from a router (plugin). /// /// # Examples /// ``` @@ -67,7 +67,7 @@ impl SessionInfo { RoutersZenohIdBuilder::new(&self.runtime) } - /// Return the [`ZenohId`] of the zenoh peers this process is currently connected to. + /// Return the [`crate::session::ZenohId`] of the zenoh peers this process is currently connected to. /// /// # Examples /// ``` diff --git a/zenoh/src/api/query.rs b/zenoh/src/api/query.rs index a06f41495d..4633f6d437 100644 --- a/zenoh/src/api/query.rs +++ b/zenoh/src/api/query.rs @@ -20,7 +20,7 @@ use zenoh_keyexpr::OwnedKeyExpr; use zenoh_protocol::core::Parameters; #[cfg(feature = "unstable")] use zenoh_protocol::core::ZenohIdProto; -/// The [`Queryable`](crate::query::Queryable)s that should be target of a [`get`](Session::get). +/// The [`Queryable`](crate::query::Queryable)s that should be target of a [`get`](crate::Session::get). pub use zenoh_protocol::network::request::ext::QueryTarget; #[doc(inline)] pub use zenoh_protocol::zenoh::query::ConsolidationMode; @@ -30,7 +30,7 @@ use crate::api::{ selector::Selector, value::Value, }; -/// The replies consolidation strategy to apply on replies to a [`get`](Session::get). +/// The replies consolidation strategy to apply on replies to a [`get`](crate::Session::get). #[derive(Clone, Copy, Debug, PartialEq, Eq)] pub struct QueryConsolidation { pub(crate) mode: ConsolidationMode, @@ -65,7 +65,7 @@ impl Default for QueryConsolidation { } } -/// Error returned by a [`get`](Session::get). +/// Error returned by a [`get`](crate::Session::get). #[derive(Clone, Debug, PartialEq, Eq, Default)] pub struct ReplyError { pub(crate) payload: ZBytes, @@ -108,7 +108,7 @@ impl From for ReplyError { } } -/// Struct returned by a [`get`](Session::get). +/// Struct returned by a [`get`](crate::Session::get). #[non_exhaustive] #[derive(Clone, Debug)] pub struct Reply { diff --git a/zenoh/src/api/queryable.rs b/zenoh/src/api/queryable.rs index 613d1fa1d8..ad8ab59f85 100644 --- a/zenoh/src/api/queryable.rs +++ b/zenoh/src/api/queryable.rs @@ -447,15 +447,15 @@ impl Queryable { } /// Returns a reference to this queryable's handler. - /// An handler is anything that implements [`IntoHandler`]. - /// The default handler is [`DefaultHandler`]. + /// An handler is anything that implements [`crate::handlers::IntoHandler`]. + /// The default handler is [`crate::handlers::DefaultHandler`]. pub fn handler(&self) -> &Handler { &self.handler } /// Returns a mutable reference to this queryable's handler. - /// An handler is anything that implements [`IntoHandler`]. - /// The default handler is [`DefaultHandler`]. + /// An handler is anything that implements [`crate::handlers::IntoHandler`]. + /// The default handler is [`crate::handlers::DefaultHandler`]. pub fn handler_mut(&mut self) -> &mut Handler { &mut self.handler } diff --git a/zenoh/src/api/subscriber.rs b/zenoh/src/api/subscriber.rs index 8c908f2a5b..099a1d174a 100644 --- a/zenoh/src/api/subscriber.rs +++ b/zenoh/src/api/subscriber.rs @@ -174,15 +174,15 @@ impl Subscriber { } /// Returns a reference to this subscriber's handler. - /// An handler is anything that implements [`IntoHandler`]. - /// The default handler is [`DefaultHandler`]. + /// An handler is anything that implements [`crate::handlers::IntoHandler`]. + /// The default handler is [`crate::handlers::DefaultHandler`]. pub fn handler(&self) -> &Handler { &self.handler } /// Returns a mutable reference to this subscriber's handler. - /// An handler is anything that implements [`IntoHandler`]. - /// The default handler is [`DefaultHandler`]. + /// An handler is anything that implements [`crate::handlers::IntoHandler`]. + /// The default handler is [`crate::handlers::DefaultHandler`]. pub fn handler_mut(&mut self) -> &mut Handler { &mut self.handler }