From 3489a1fce0704b8dd587916877cbd72b4d95980e Mon Sep 17 00:00:00 2001 From: Oussama Teffahi Date: Tue, 26 Nov 2024 16:33:49 +0100 Subject: [PATCH 01/27] Initial implementation of publisher builder config overwrites --- Cargo.lock | 1 + DEFAULT_CONFIG.json5 | 16 ++++ commons/zenoh-config/Cargo.toml | 2 + commons/zenoh-config/src/builders.rs | 127 +++++++++++++++++++++++++++ commons/zenoh-config/src/lib.rs | 10 +++ zenoh/Cargo.toml | 2 +- zenoh/src/api/builders/session.rs | 10 +++ zenoh/src/api/publisher.rs | 15 ++++ zenoh/src/api/sample.rs | 12 ++- zenoh/src/api/session.rs | 61 ++++++++++--- 10 files changed, 244 insertions(+), 12 deletions(-) create mode 100644 commons/zenoh-config/src/builders.rs diff --git a/Cargo.lock b/Cargo.lock index 317af86fff..75c9e14218 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5545,6 +5545,7 @@ dependencies = [ "uhlc", "validated_struct", "zenoh-core", + "zenoh-keyexpr", "zenoh-macros", "zenoh-protocol", "zenoh-result", diff --git a/DEFAULT_CONFIG.json5 b/DEFAULT_CONFIG.json5 index 7a3e614635..7ed6518ece 100644 --- a/DEFAULT_CONFIG.json5 +++ b/DEFAULT_CONFIG.json5 @@ -184,6 +184,22 @@ }, }, + // /// Overwrite default builders for specific key expressions + // builders: { + // /// A list of publisher builder configurations for specific key expressions. + // publishers: [ + // // key_expressions + // key_exprs: ["demo/**", "example/key"], + // // builder configuration + // congestion_control: "Block", + // encoding: "zenoh/example", + // priority: "data_high", + // express: true, + // reliability: "best_effort", + // allowed_destination: "remote", + // ], + // }, + // /// The declarations aggregation strategy. // aggregation: { // /// A list of key-expressions for which all included subscribers will be aggregated into. diff --git a/commons/zenoh-config/Cargo.toml b/commons/zenoh-config/Cargo.toml index 43472626d4..8c24ac6539 100644 --- a/commons/zenoh-config/Cargo.toml +++ b/commons/zenoh-config/Cargo.toml @@ -25,6 +25,7 @@ description = "Internal crate for zenoh." [features] internal = [] +unstable = [] [dependencies] tracing = { workspace = true } @@ -35,6 +36,7 @@ serde_json = { workspace = true } serde_yaml = { workspace = true } validated_struct = { workspace = true, features = ["json5", "json_get"] } zenoh-core = { workspace = true } +zenoh-keyexpr = { workspace = true } zenoh-protocol = { workspace = true } zenoh-result = { workspace = true } zenoh-util = { workspace = true } diff --git a/commons/zenoh-config/src/builders.rs b/commons/zenoh-config/src/builders.rs new file mode 100644 index 0000000000..07888ea91c --- /dev/null +++ b/commons/zenoh-config/src/builders.rs @@ -0,0 +1,127 @@ +use std::collections::HashSet; + +use serde::{Deserialize, Deserializer, Serialize}; +pub use validated_struct::{GetError, ValidatedMap}; +use zenoh_keyexpr::keyexpr_tree::{IKeyExprTreeMut, KeBoxTree}; +use zenoh_protocol::core::{key_expr::OwnedKeyExpr, CongestionControl, Reliability}; +pub use zenoh_protocol::core::{ + whatami, EndPoint, Locator, WhatAmI, WhatAmIMatcher, WhatAmIMatcherVisitor, +}; + +#[derive(Debug, Deserialize, Default, Serialize, Clone)] +#[serde(remote = "Self")] +pub struct PublisherBuildersConf(pub(crate) Vec); + +impl<'de> Deserialize<'de> for PublisherBuildersConf { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let builders = PublisherBuildersConf::deserialize(deserializer)?; + // check for invariant: each key_expr should be unique + let mut key_set = HashSet::new(); + for builder in &builders.0 { + for key_expr in &builder.key_exprs { + if !key_set.insert(key_expr) { + return Err(format!( + "duplicated key_expr '{key_expr}' found in publisher builders config" + )) + .map_err(serde::de::Error::custom); + } + } + } + Ok(builders) + } +} + +impl Serialize for PublisherBuildersConf { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + PublisherBuildersConf::serialize(self, serializer) + } +} + +impl From for KeBoxTree { + fn from(value: PublisherBuildersConf) -> KeBoxTree { + let mut tree = KeBoxTree::new(); + for conf in value.0 { + for key_expr in conf.key_exprs { + // key_expr unicity is checked at deserialization + tree.insert(&key_expr, conf.builder_conf.clone()); + } + } + tree + } +} + +#[derive(Debug, Deserialize, Serialize, Clone)] +pub(crate) struct PublisherBuildersInnerConf { + pub key_exprs: Vec, + #[serde(flatten)] + pub builder_conf: PublisherBuilderOptionsConf, +} + +#[derive(Debug, Default, Deserialize, Serialize, Clone)] +pub struct PublisherBuilderOptionsConf { + pub congestion_control: Option, + pub encoding: Option, // Encoding has From<&str> + pub priority: Option, + pub express: Option, + #[cfg(feature = "unstable")] + pub reliability: Option, + pub allowed_destination: Option, +} + +#[derive(Debug, Deserialize, Serialize, Clone, Copy)] +#[serde(rename_all = "lowercase")] +pub enum PublisherCongestionControlConf { + Drop, + Block, +} + +impl From for CongestionControl { + fn from(value: PublisherCongestionControlConf) -> Self { + match value { + PublisherCongestionControlConf::Drop => CongestionControl::Drop, + PublisherCongestionControlConf::Block => CongestionControl::Block, + } + } +} + +#[derive(Debug, Deserialize, Serialize, Clone, Copy)] +#[serde(rename_all = "snake_case")] +pub enum PublisherPriorityConf { + RealTime = 1, + InteractiveHigh = 2, + InteractiveLow = 3, + DataHigh = 4, + Data = 5, + DataLow = 6, + Background = 7, +} + +#[derive(Debug, Deserialize, Serialize, Clone, Copy)] +#[serde(rename_all = "snake_case")] +pub enum PublisherReliabilityConf { + BestEffort, + Reliable, +} + +impl From for Reliability { + fn from(value: PublisherReliabilityConf) -> Reliability { + match value { + PublisherReliabilityConf::BestEffort => Reliability::BestEffort, + PublisherReliabilityConf::Reliable => Reliability::Reliable, + } + } +} + +#[derive(Debug, Deserialize, Serialize, Clone, Copy)] +#[serde(rename_all = "snake_case")] +pub enum PublisherLocalityConf { + SessionLocal, + Remote, + Any, +} diff --git a/commons/zenoh-config/src/lib.rs b/commons/zenoh-config/src/lib.rs index d25ccc63c3..ed14e9b67b 100644 --- a/commons/zenoh-config/src/lib.rs +++ b/commons/zenoh-config/src/lib.rs @@ -19,6 +19,7 @@ //! [Click here for Zenoh's documentation](https://docs.rs/zenoh/latest/zenoh) //! //! Configuration to pass to `zenoh::open()` and `zenoh::scout()` functions and associated constants. +pub mod builders; pub mod defaults; mod include; pub mod wrappers; @@ -29,6 +30,7 @@ use std::{ any::Any, collections::HashSet, fmt, io::Read, net::SocketAddr, ops, path::Path, sync::Weak, }; +use builders::PublisherBuildersConf; use include::recursive_include; use secrecy::{CloneableSecret, DebugSecret, Secret, SerializableSecret, Zeroize}; use serde::{Deserialize, Serialize}; @@ -360,6 +362,14 @@ validated_struct::validator! { /// A list of key-expressions for which all included publishers will be aggregated into. publishers: Vec, }, + + /// Overwrite default builders for specific key expressions + pub builders: #[derive(Default)] + BuildersConf { + /// A list of publisher builder configurations for specific key expressions. + publishers: PublisherBuildersConf, + }, + pub transport: #[derive(Default)] TransportConf { pub unicast: TransportUnicastConf { diff --git a/zenoh/Cargo.toml b/zenoh/Cargo.toml index 94f6d6eb48..947a13f453 100644 --- a/zenoh/Cargo.toml +++ b/zenoh/Cargo.toml @@ -64,7 +64,7 @@ transport_udp = ["zenoh-transport/transport_udp"] transport_unixsock-stream = ["zenoh-transport/transport_unixsock-stream"] transport_ws = ["zenoh-transport/transport_ws"] transport_vsock = ["zenoh-transport/transport_vsock"] -unstable = ["internal_config", "zenoh-keyexpr/unstable"] +unstable = ["internal_config", "zenoh-keyexpr/unstable", "zenoh-config/unstable"] internal_config = [] [dependencies] diff --git a/zenoh/src/api/builders/session.rs b/zenoh/src/api/builders/session.rs index 0ada1f4a67..3ef35040de 100644 --- a/zenoh/src/api/builders/session.rs +++ b/zenoh/src/api/builders/session.rs @@ -16,6 +16,8 @@ use std::future::{IntoFuture, Ready}; #[cfg(feature = "shared-memory")] use std::sync::Arc; +#[cfg(feature = "internal")] +use zenoh_config::builders::PublisherBuildersConf; use zenoh_core::{Resolvable, Wait}; #[cfg(feature = "internal")] use zenoh_keyexpr::OwnedKeyExpr; @@ -122,6 +124,7 @@ pub fn init(runtime: Runtime) -> InitBuilder { runtime, aggregated_subscribers: vec![], aggregated_publishers: vec![], + publisher_builders: PublisherBuildersConf::default(), } } @@ -133,6 +136,7 @@ pub struct InitBuilder { runtime: Runtime, aggregated_subscribers: Vec, aggregated_publishers: Vec, + publisher_builders: PublisherBuildersConf, } #[zenoh_macros::internal] @@ -148,6 +152,11 @@ impl InitBuilder { self.aggregated_publishers = exprs; self } + + pub fn publisher_builders(mut self, builder_conf: PublisherBuildersConf) -> Self { + self.publisher_builders = builder_conf; + self + } } #[zenoh_macros::internal] @@ -162,6 +171,7 @@ impl Wait for InitBuilder { self.runtime, self.aggregated_subscribers, self.aggregated_publishers, + self.publisher_builders, false, ) .wait()) diff --git a/zenoh/src/api/publisher.rs b/zenoh/src/api/publisher.rs index 2b996504db..ec4e5d22e6 100644 --- a/zenoh/src/api/publisher.rs +++ b/zenoh/src/api/publisher.rs @@ -22,6 +22,7 @@ use std::{ use futures::Sink; use tracing::error; +use zenoh_config::builders::PublisherPriorityConf; use zenoh_core::{Resolvable, Resolve, Wait}; use zenoh_protocol::core::CongestionControl; use zenoh_result::{Error, ZResult}; @@ -469,6 +470,20 @@ impl TryFrom for Priority { } } +impl From for Priority { + fn from(value: PublisherPriorityConf) -> Self { + match value { + PublisherPriorityConf::RealTime => Priority::RealTime, + PublisherPriorityConf::InteractiveHigh => Priority::InteractiveHigh, + PublisherPriorityConf::InteractiveLow => Priority::InteractiveLow, + PublisherPriorityConf::DataHigh => Priority::DataHigh, + PublisherPriorityConf::Data => Priority::Data, + PublisherPriorityConf::DataLow => Priority::DataLow, + PublisherPriorityConf::Background => Priority::Background, + } + } +} + type ProtocolPriority = zenoh_protocol::core::Priority; impl From for ProtocolPriority { fn from(prio: Priority) -> Self { diff --git a/zenoh/src/api/sample.rs b/zenoh/src/api/sample.rs index 75d6c32b63..16e7eb69f4 100644 --- a/zenoh/src/api/sample.rs +++ b/zenoh/src/api/sample.rs @@ -16,7 +16,7 @@ use std::{convert::TryFrom, fmt}; use serde::{Deserialize, Serialize}; -use zenoh_config::wrappers::EntityGlobalId; +use zenoh_config::{builders::PublisherLocalityConf, wrappers::EntityGlobalId}; #[cfg(feature = "unstable")] use zenoh_protocol::core::Reliability; use zenoh_protocol::{ @@ -50,6 +50,16 @@ pub(crate) enum Locality { Any, } +impl From for Locality { + fn from(value: PublisherLocalityConf) -> Self { + match value { + PublisherLocalityConf::SessionLocal => Locality::SessionLocal, + PublisherLocalityConf::Remote => Locality::Remote, + PublisherLocalityConf::Any => Locality::Any, + } + } +} + #[derive(Debug, Clone, PartialEq, Eq, Default)] pub(crate) struct DataInfo { pub kind: SampleKind, diff --git a/zenoh/src/api/session.rs b/zenoh/src/api/session.rs index 0c01bffdbd..2c02fb2548 100644 --- a/zenoh/src/api/session.rs +++ b/zenoh/src/api/session.rs @@ -31,8 +31,13 @@ use uhlc::Timestamp; use uhlc::HLC; use zenoh_buffers::ZBuf; use zenoh_collections::SingleOrVec; -use zenoh_config::{unwrap_or_default, wrappers::ZenohId}; +use zenoh_config::{ + builders::{PublisherBuilderOptionsConf, PublisherBuildersConf}, + unwrap_or_default, + wrappers::ZenohId, +}; use zenoh_core::{zconfigurable, zread, Resolve, ResolveClosure, ResolveFuture, Wait}; +use zenoh_keyexpr::keyexpr_tree::{IKeyExprTree, IKeyExprTreeNode, KeBoxTree}; #[cfg(feature = "unstable")] use zenoh_protocol::network::{ declare::{DeclareToken, SubscriberId, TokenId, UndeclareToken}, @@ -145,12 +150,14 @@ pub(crate) struct SessionState { pub(crate) liveliness_queries: HashMap, pub(crate) aggregated_subscribers: Vec, pub(crate) aggregated_publishers: Vec, + pub(crate) publisher_builders_tree: KeBoxTree, } impl SessionState { pub(crate) fn new( aggregated_subscribers: Vec, aggregated_publishers: Vec, + publisher_builders_tree: KeBoxTree, ) -> SessionState { SessionState { primitives: None, @@ -178,6 +185,7 @@ impl SessionState { liveliness_queries: HashMap::new(), aggregated_subscribers, aggregated_publishers, + publisher_builders_tree, } } } @@ -540,6 +548,7 @@ impl Session { runtime: Runtime, aggregated_subscribers: Vec, aggregated_publishers: Vec, + publisher_builders: PublisherBuildersConf, owns_runtime: bool, ) -> impl Resolve { ResolveClosure::new(move || { @@ -547,6 +556,7 @@ impl Session { let state = RwLock::new(SessionState::new( aggregated_subscribers, aggregated_publishers, + publisher_builders.into(), )); let session = Session(Arc::new(SessionInner { weak_counter: Mutex::new(0), @@ -826,17 +836,46 @@ impl Session { TryIntoKeyExpr: TryInto>, >>::Error: Into, { - PublisherBuilder { + let maybe_key_expr = key_expr.try_into().map_err(Into::into); + let mut builder_overwrites = PublisherBuilderOptionsConf::default(); + if let Ok(key_expr) = &maybe_key_expr { + // get overwritten builder + let state = zread!(self.0.state); + for node in state.publisher_builders_tree.nodes_including(key_expr) { + // Take the first one yielded by the iterator that has overwrites + if let Some(overwrites) = node.weight() { + builder_overwrites = overwrites.clone(); + break; + } + } + } + + return PublisherBuilder { session: self, - key_expr: key_expr.try_into().map_err(Into::into), - encoding: Encoding::default(), - congestion_control: CongestionControl::DEFAULT, - priority: Priority::DEFAULT, - is_express: false, + key_expr: maybe_key_expr, + encoding: builder_overwrites + .encoding + .map(|encoding| encoding.into()) + .unwrap_or(Encoding::default()), + congestion_control: builder_overwrites + .congestion_control + .map(|cc| cc.into()) + .unwrap_or(CongestionControl::DEFAULT), + priority: builder_overwrites + .priority + .map(|p| p.into()) + .unwrap_or(Priority::DEFAULT), + is_express: builder_overwrites.express.unwrap_or(false), #[cfg(feature = "unstable")] - reliability: Reliability::DEFAULT, - destination: Locality::default(), - } + reliability: builder_overwrites + .reliability + .map(|r| r.into()) + .unwrap_or(Reliability::DEFAULT), + destination: builder_overwrites + .allowed_destination + .map(|d| d.into()) + .unwrap_or(Locality::default()), + }; } /// Obtain a [`Liveliness`] struct tied to this Zenoh [`Session`]. @@ -1053,6 +1092,7 @@ impl Session { tracing::debug!("Config: {:?}", &config); let aggregated_subscribers = config.0.aggregation().subscribers().clone(); let aggregated_publishers = config.0.aggregation().publishers().clone(); + let publisher_builders = config.0.builders.publishers().clone(); #[allow(unused_mut)] // Required for shared-memory let mut runtime = RuntimeBuilder::new(config); #[cfg(feature = "shared-memory")] @@ -1065,6 +1105,7 @@ impl Session { runtime.clone(), aggregated_subscribers, aggregated_publishers, + publisher_builders, true, ) .await; From c8083f729156d20cde342b96b559abd8fcc603f5 Mon Sep 17 00:00:00 2001 From: Oussama Teffahi Date: Tue, 26 Nov 2024 17:51:41 +0100 Subject: [PATCH 02/27] Add publisher builder overwrite test --- zenoh/tests/qos.rs | 64 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 64 insertions(+) diff --git a/zenoh/tests/qos.rs b/zenoh/tests/qos.rs index 4995d05e70..d27b8045dc 100644 --- a/zenoh/tests/qos.rs +++ b/zenoh/tests/qos.rs @@ -62,3 +62,67 @@ async fn qos_pubsub() { assert_eq!(sample.congestion_control(), CongestionControl::Block); assert!(!sample.express()); } + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn qos_pubsub_overwrite_builder() { + let builder_config_overwrite = zenoh::Config::from_json5( + r#" + { + builders: { + publishers: [ + { + key_exprs: ["test/qos/overwritten", "test/not_applicable/**"], + priority: "real_time", + encoding: "zenoh/string", + congestion_control: "drop", + express: false, + }, + { + key_exprs: ["test/not_applicable"], + priority: "data_high", + encoding: "zenoh/bytes", + congestion_control: "drop", + express: false, + }, + ] + } + } + "#, + ) + .unwrap(); + let session1 = ztimeout!(zenoh::open(builder_config_overwrite)).unwrap(); + let session2 = ztimeout!(zenoh::open(zenoh::Config::default())).unwrap(); + + let overwrite_config_publisher = ztimeout!(session1 + .declare_publisher("test/qos/overwritten") + .congestion_control(CongestionControl::Block) + .express(true)) + .unwrap(); + + let no_overwrite_config_publisher = ztimeout!(session1 + .declare_publisher("test/qos/no_overwrite") + .encoding(Encoding::TEXT_PLAIN) + .priority(Priority::DataLow) + .congestion_control(CongestionControl::Drop) + .express(false)) + .unwrap(); + + let subscriber = ztimeout!(session2.declare_subscriber("test/qos/**")).unwrap(); + tokio::time::sleep(SLEEP).await; + + ztimeout!(overwrite_config_publisher.put("qos")).unwrap(); + let sample = ztimeout!(subscriber.recv_async()).unwrap(); + + assert_eq!(sample.priority(), Priority::RealTime); + assert_eq!(sample.encoding(), &Encoding::ZENOH_STRING); + assert_eq!(sample.congestion_control(), CongestionControl::Block); + assert!(sample.express()); + + ztimeout!(no_overwrite_config_publisher.put("qos")).unwrap(); + let sample = ztimeout!(subscriber.recv_async()).unwrap(); + + assert_eq!(sample.encoding(), &Encoding::TEXT_PLAIN); + assert_eq!(sample.priority(), Priority::DataLow); + assert_eq!(sample.congestion_control(), CongestionControl::Drop); + assert!(!sample.express()); +} From e45bd6f99664ff612b306dfbefed11ceda9c6d41 Mon Sep 17 00:00:00 2001 From: Oussama Teffahi Date: Tue, 26 Nov 2024 18:11:00 +0100 Subject: [PATCH 03/27] Remove unnecessary return --- zenoh/src/api/session.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/zenoh/src/api/session.rs b/zenoh/src/api/session.rs index 2c02fb2548..5470fe9b34 100644 --- a/zenoh/src/api/session.rs +++ b/zenoh/src/api/session.rs @@ -850,7 +850,7 @@ impl Session { } } - return PublisherBuilder { + PublisherBuilder { session: self, key_expr: maybe_key_expr, encoding: builder_overwrites @@ -875,7 +875,7 @@ impl Session { .allowed_destination .map(|d| d.into()) .unwrap_or(Locality::default()), - }; + } } /// Obtain a [`Liveliness`] struct tied to this Zenoh [`Session`]. From 1d695dbe5afe916091dfb44585750c624c317c93 Mon Sep 17 00:00:00 2001 From: Oussama Teffahi Date: Wed, 27 Nov 2024 15:46:21 +0100 Subject: [PATCH 04/27] Fix default config spacing --- DEFAULT_CONFIG.json5 | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/DEFAULT_CONFIG.json5 b/DEFAULT_CONFIG.json5 index 7ed6518ece..51d6a7e430 100644 --- a/DEFAULT_CONFIG.json5 +++ b/DEFAULT_CONFIG.json5 @@ -186,18 +186,20 @@ // /// Overwrite default builders for specific key expressions // builders: { - // /// A list of publisher builder configurations for specific key expressions. - // publishers: [ - // // key_expressions + // /// A list of publisher builder configurations for specific key expressions. + // publishers: [ + // { + // /// key_expressions // key_exprs: ["demo/**", "example/key"], - // // builder configuration + // /// builder configuration // congestion_control: "Block", // encoding: "zenoh/example", // priority: "data_high", // express: true, // reliability: "best_effort", // allowed_destination: "remote", - // ], + // } + // ], // }, // /// The declarations aggregation strategy. From 0d9d145f44f68795b9f47d6ffba58667fd9950fd Mon Sep 17 00:00:00 2001 From: Oussama Teffahi Date: Wed, 27 Nov 2024 16:14:41 +0100 Subject: [PATCH 05/27] Change config format to publishers/default_builders --- DEFAULT_CONFIG.json5 | 30 ++++++++++++++++------------ commons/zenoh-config/src/builders.rs | 5 ++--- commons/zenoh-config/src/lib.rs | 8 ++++---- zenoh/src/api/session.rs | 2 +- 4 files changed, 24 insertions(+), 21 deletions(-) diff --git a/DEFAULT_CONFIG.json5 b/DEFAULT_CONFIG.json5 index 51d6a7e430..65b268bd55 100644 --- a/DEFAULT_CONFIG.json5 +++ b/DEFAULT_CONFIG.json5 @@ -184,21 +184,25 @@ }, }, - // /// Overwrite default builders for specific key expressions - // builders: { - // /// A list of publisher builder configurations for specific key expressions. - // publishers: [ + // /// Configuration options for Zenoh publishers + // publishers: { + // /// Overwrite default builders for certain key expressions + // default_builders: [ // { - // /// key_expressions + // /// Publisher key expressions that are included by these key expressions + // /// will have their default publisher builder overwritten by the given config. // key_exprs: ["demo/**", "example/key"], - // /// builder configuration - // congestion_control: "Block", - // encoding: "zenoh/example", - // priority: "data_high", - // express: true, - // reliability: "best_effort", - // allowed_destination: "remote", - // } + // /// Configurations that will be applied on the publisher builder + // /// Note: these can still be overwritten in the application code via the PublisherBuilder API + // config: { + // congestion_control: "Block", + // encoding: "zenoh/example", + // priority: "data_high", + // express: true, + // reliability: "best_effort", + // allowed_destination: "remote", + // }, + // }, // ], // }, diff --git a/commons/zenoh-config/src/builders.rs b/commons/zenoh-config/src/builders.rs index 07888ea91c..c78e61d96e 100644 --- a/commons/zenoh-config/src/builders.rs +++ b/commons/zenoh-config/src/builders.rs @@ -49,7 +49,7 @@ impl From for KeBoxTree { for conf in value.0 { for key_expr in conf.key_exprs { // key_expr unicity is checked at deserialization - tree.insert(&key_expr, conf.builder_conf.clone()); + tree.insert(&key_expr, conf.config.clone()); } } tree @@ -59,8 +59,7 @@ impl From for KeBoxTree { #[derive(Debug, Deserialize, Serialize, Clone)] pub(crate) struct PublisherBuildersInnerConf { pub key_exprs: Vec, - #[serde(flatten)] - pub builder_conf: PublisherBuilderOptionsConf, + pub config: PublisherBuilderOptionsConf, } #[derive(Debug, Default, Deserialize, Serialize, Clone)] diff --git a/commons/zenoh-config/src/lib.rs b/commons/zenoh-config/src/lib.rs index ed14e9b67b..e65fa5703d 100644 --- a/commons/zenoh-config/src/lib.rs +++ b/commons/zenoh-config/src/lib.rs @@ -364,10 +364,10 @@ validated_struct::validator! { }, /// Overwrite default builders for specific key expressions - pub builders: #[derive(Default)] - BuildersConf { - /// A list of publisher builder configurations for specific key expressions. - publishers: PublisherBuildersConf, + pub publishers: #[derive(Default)] + PublishersConfig { + /// A list of publisher builder configurations for key expressions. + default_builders: PublisherBuildersConf, }, pub transport: #[derive(Default)] diff --git a/zenoh/src/api/session.rs b/zenoh/src/api/session.rs index 5470fe9b34..8e8d2de5ac 100644 --- a/zenoh/src/api/session.rs +++ b/zenoh/src/api/session.rs @@ -1092,7 +1092,7 @@ impl Session { tracing::debug!("Config: {:?}", &config); let aggregated_subscribers = config.0.aggregation().subscribers().clone(); let aggregated_publishers = config.0.aggregation().publishers().clone(); - let publisher_builders = config.0.builders.publishers().clone(); + let publisher_builders = config.0.publishers().default_builders().clone(); #[allow(unused_mut)] // Required for shared-memory let mut runtime = RuntimeBuilder::new(config); #[cfg(feature = "shared-memory")] From 3fc0492ecfb9e624c8e4a2e1c1531367e3b71a0d Mon Sep 17 00:00:00 2001 From: Oussama Teffahi Date: Wed, 27 Nov 2024 16:22:36 +0100 Subject: [PATCH 06/27] Update QoS test config --- zenoh/tests/qos.rs | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/zenoh/tests/qos.rs b/zenoh/tests/qos.rs index d27b8045dc..2334ad9cf9 100644 --- a/zenoh/tests/qos.rs +++ b/zenoh/tests/qos.rs @@ -68,21 +68,25 @@ async fn qos_pubsub_overwrite_builder() { let builder_config_overwrite = zenoh::Config::from_json5( r#" { - builders: { - publishers: [ + publishers: { + default_builders: [ { key_exprs: ["test/qos/overwritten", "test/not_applicable/**"], - priority: "real_time", - encoding: "zenoh/string", - congestion_control: "drop", - express: false, + config: { + priority: "real_time", + encoding: "zenoh/string", + congestion_control: "drop", + express: false, + }, }, { key_exprs: ["test/not_applicable"], - priority: "data_high", - encoding: "zenoh/bytes", - congestion_control: "drop", - express: false, + config: { + priority: "data_high", + encoding: "zenoh/bytes", + congestion_control: "drop", + express: false, + }, }, ] } From e7f7fa64c9d2a8082d4424a51632bf582ed7499e Mon Sep 17 00:00:00 2001 From: Oussama Teffahi Date: Wed, 27 Nov 2024 17:28:25 +0100 Subject: [PATCH 07/27] Move new PublisherBuilder logic --- zenoh/src/api/builders/publisher.rs | 50 +++++++++++++++++++++++++++++ zenoh/src/api/session.rs | 43 ++----------------------- 2 files changed, 52 insertions(+), 41 deletions(-) diff --git a/zenoh/src/api/builders/publisher.rs b/zenoh/src/api/builders/publisher.rs index ac6565cd27..d8afa4e0da 100644 --- a/zenoh/src/api/builders/publisher.rs +++ b/zenoh/src/api/builders/publisher.rs @@ -13,7 +13,9 @@ // use std::future::{IntoFuture, Ready}; +use zenoh_config::builders::PublisherBuilderOptionsConf; use zenoh_core::{Resolvable, Result as ZResult, Wait}; +use zenoh_keyexpr::keyexpr_tree::{IKeyExprTree, IKeyExprTreeNode}; #[cfg(feature = "unstable")] use zenoh_protocol::core::Reliability; use zenoh_protocol::{core::CongestionControl, network::Mapping}; @@ -341,6 +343,54 @@ impl QoSBuilderTrait for PublisherBuilder<'_, '_> { } impl<'a, 'b> PublisherBuilder<'a, 'b> { + pub fn new<'c, TryIntoKeyExpr>(session: &'a Session, key_expr: TryIntoKeyExpr) -> Self + where + TryIntoKeyExpr: TryInto>, + >>::Error: Into, + 'c: 'b, + { + let maybe_key_expr = key_expr.try_into().map_err(Into::into); + let mut builder_overwrites = PublisherBuilderOptionsConf::default(); + if let Ok(key_expr) = &maybe_key_expr { + // get overwritten builder + let state = zread!(session.0.state); + for node in state.publisher_builders_tree.nodes_including(key_expr) { + // Take the first one yielded by the iterator that has overwrites + if let Some(overwrites) = node.weight() { + builder_overwrites = overwrites.clone(); + break; + } + } + } + + Self { + session, + key_expr: maybe_key_expr, + encoding: builder_overwrites + .encoding + .map(|encoding| encoding.into()) + .unwrap_or(Encoding::default()), + congestion_control: builder_overwrites + .congestion_control + .map(|cc| cc.into()) + .unwrap_or(CongestionControl::DEFAULT), + priority: builder_overwrites + .priority + .map(|p| p.into()) + .unwrap_or(Priority::DEFAULT), + is_express: builder_overwrites.express.unwrap_or(false), + #[cfg(feature = "unstable")] + reliability: builder_overwrites + .reliability + .map(|r| r.into()) + .unwrap_or(Reliability::DEFAULT), + destination: builder_overwrites + .allowed_destination + .map(|d| d.into()) + .unwrap_or(Locality::default()), + } + } + /// Changes the [`crate::sample::Locality`] applied when routing the data. /// /// This restricts the matching subscribers that will receive the published data to the ones diff --git a/zenoh/src/api/session.rs b/zenoh/src/api/session.rs index 8e8d2de5ac..3506974aa6 100644 --- a/zenoh/src/api/session.rs +++ b/zenoh/src/api/session.rs @@ -37,7 +37,7 @@ use zenoh_config::{ wrappers::ZenohId, }; use zenoh_core::{zconfigurable, zread, Resolve, ResolveClosure, ResolveFuture, Wait}; -use zenoh_keyexpr::keyexpr_tree::{IKeyExprTree, IKeyExprTreeNode, KeBoxTree}; +use zenoh_keyexpr::keyexpr_tree::KeBoxTree; #[cfg(feature = "unstable")] use zenoh_protocol::network::{ declare::{DeclareToken, SubscriberId, TokenId, UndeclareToken}, @@ -836,46 +836,7 @@ impl Session { TryIntoKeyExpr: TryInto>, >>::Error: Into, { - let maybe_key_expr = key_expr.try_into().map_err(Into::into); - let mut builder_overwrites = PublisherBuilderOptionsConf::default(); - if let Ok(key_expr) = &maybe_key_expr { - // get overwritten builder - let state = zread!(self.0.state); - for node in state.publisher_builders_tree.nodes_including(key_expr) { - // Take the first one yielded by the iterator that has overwrites - if let Some(overwrites) = node.weight() { - builder_overwrites = overwrites.clone(); - break; - } - } - } - - PublisherBuilder { - session: self, - key_expr: maybe_key_expr, - encoding: builder_overwrites - .encoding - .map(|encoding| encoding.into()) - .unwrap_or(Encoding::default()), - congestion_control: builder_overwrites - .congestion_control - .map(|cc| cc.into()) - .unwrap_or(CongestionControl::DEFAULT), - priority: builder_overwrites - .priority - .map(|p| p.into()) - .unwrap_or(Priority::DEFAULT), - is_express: builder_overwrites.express.unwrap_or(false), - #[cfg(feature = "unstable")] - reliability: builder_overwrites - .reliability - .map(|r| r.into()) - .unwrap_or(Reliability::DEFAULT), - destination: builder_overwrites - .allowed_destination - .map(|d| d.into()) - .unwrap_or(Locality::default()), - } + PublisherBuilder::new(self, key_expr) } /// Obtain a [`Liveliness`] struct tied to this Zenoh [`Session`]. From 9eca04039515ea7ea6569a97a9287436cd8ddb67 Mon Sep 17 00:00:00 2001 From: Oussama Teffahi Date: Wed, 27 Nov 2024 17:45:33 +0100 Subject: [PATCH 08/27] Log warning when multiple builder configs can apply for the same keyexpr --- zenoh/src/api/builders/publisher.rs | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/zenoh/src/api/builders/publisher.rs b/zenoh/src/api/builders/publisher.rs index d8afa4e0da..a6598eb7b1 100644 --- a/zenoh/src/api/builders/publisher.rs +++ b/zenoh/src/api/builders/publisher.rs @@ -13,6 +13,7 @@ // use std::future::{IntoFuture, Ready}; +use itertools::Itertools; use zenoh_config::builders::PublisherBuilderOptionsConf; use zenoh_core::{Resolvable, Result as ZResult, Wait}; use zenoh_keyexpr::keyexpr_tree::{IKeyExprTree, IKeyExprTreeNode}; @@ -354,10 +355,22 @@ impl<'a, 'b> PublisherBuilder<'a, 'b> { if let Ok(key_expr) = &maybe_key_expr { // get overwritten builder let state = zread!(session.0.state); - for node in state.publisher_builders_tree.nodes_including(key_expr) { + let nodes_including = state + .publisher_builders_tree + .nodes_including(key_expr) + .collect_vec(); + for node in &nodes_including { // Take the first one yielded by the iterator that has overwrites if let Some(overwrites) = node.weight() { builder_overwrites = overwrites.clone(); + // log warning if multiple keyexprs include it + if nodes_including.len() > 1 { + tracing::warn!( + "Publisher declared on `{}` which is included by multiple key_exprs in builders config. Using builder config for `{}`", + key_expr, + node.keyexpr(), + ); + } break; } } From 95c8cd3191bd0ac7fcaa3fb073d06704f2bbe34a Mon Sep 17 00:00:00 2001 From: Oussama Teffahi Date: Thu, 28 Nov 2024 14:15:07 +0100 Subject: [PATCH 09/27] Rename builders module to publishers --- commons/zenoh-config/src/lib.rs | 4 ++-- commons/zenoh-config/src/{builders.rs => publishers.rs} | 0 zenoh/src/api/builders/publisher.rs | 2 +- zenoh/src/api/builders/session.rs | 2 +- zenoh/src/api/publisher.rs | 2 +- zenoh/src/api/sample.rs | 2 +- zenoh/src/api/session.rs | 2 +- 7 files changed, 7 insertions(+), 7 deletions(-) rename commons/zenoh-config/src/{builders.rs => publishers.rs} (100%) diff --git a/commons/zenoh-config/src/lib.rs b/commons/zenoh-config/src/lib.rs index e65fa5703d..38dd011871 100644 --- a/commons/zenoh-config/src/lib.rs +++ b/commons/zenoh-config/src/lib.rs @@ -19,9 +19,9 @@ //! [Click here for Zenoh's documentation](https://docs.rs/zenoh/latest/zenoh) //! //! Configuration to pass to `zenoh::open()` and `zenoh::scout()` functions and associated constants. -pub mod builders; pub mod defaults; mod include; +pub mod publishers; pub mod wrappers; #[allow(unused_imports)] @@ -30,8 +30,8 @@ use std::{ any::Any, collections::HashSet, fmt, io::Read, net::SocketAddr, ops, path::Path, sync::Weak, }; -use builders::PublisherBuildersConf; use include::recursive_include; +use publishers::PublisherBuildersConf; use secrecy::{CloneableSecret, DebugSecret, Secret, SerializableSecret, Zeroize}; use serde::{Deserialize, Serialize}; use serde_json::{Map, Value}; diff --git a/commons/zenoh-config/src/builders.rs b/commons/zenoh-config/src/publishers.rs similarity index 100% rename from commons/zenoh-config/src/builders.rs rename to commons/zenoh-config/src/publishers.rs diff --git a/zenoh/src/api/builders/publisher.rs b/zenoh/src/api/builders/publisher.rs index a6598eb7b1..1428edb7e8 100644 --- a/zenoh/src/api/builders/publisher.rs +++ b/zenoh/src/api/builders/publisher.rs @@ -14,7 +14,7 @@ use std::future::{IntoFuture, Ready}; use itertools::Itertools; -use zenoh_config::builders::PublisherBuilderOptionsConf; +use zenoh_config::publishers::PublisherBuilderOptionsConf; use zenoh_core::{Resolvable, Result as ZResult, Wait}; use zenoh_keyexpr::keyexpr_tree::{IKeyExprTree, IKeyExprTreeNode}; #[cfg(feature = "unstable")] diff --git a/zenoh/src/api/builders/session.rs b/zenoh/src/api/builders/session.rs index 3ef35040de..04481324c9 100644 --- a/zenoh/src/api/builders/session.rs +++ b/zenoh/src/api/builders/session.rs @@ -17,7 +17,7 @@ use std::future::{IntoFuture, Ready}; use std::sync::Arc; #[cfg(feature = "internal")] -use zenoh_config::builders::PublisherBuildersConf; +use zenoh_config::publishers::PublisherBuildersConf; use zenoh_core::{Resolvable, Wait}; #[cfg(feature = "internal")] use zenoh_keyexpr::OwnedKeyExpr; diff --git a/zenoh/src/api/publisher.rs b/zenoh/src/api/publisher.rs index ec4e5d22e6..85bfb5ad0c 100644 --- a/zenoh/src/api/publisher.rs +++ b/zenoh/src/api/publisher.rs @@ -22,7 +22,7 @@ use std::{ use futures::Sink; use tracing::error; -use zenoh_config::builders::PublisherPriorityConf; +use zenoh_config::publishers::PublisherPriorityConf; use zenoh_core::{Resolvable, Resolve, Wait}; use zenoh_protocol::core::CongestionControl; use zenoh_result::{Error, ZResult}; diff --git a/zenoh/src/api/sample.rs b/zenoh/src/api/sample.rs index 16e7eb69f4..28648c378e 100644 --- a/zenoh/src/api/sample.rs +++ b/zenoh/src/api/sample.rs @@ -16,7 +16,7 @@ use std::{convert::TryFrom, fmt}; use serde::{Deserialize, Serialize}; -use zenoh_config::{builders::PublisherLocalityConf, wrappers::EntityGlobalId}; +use zenoh_config::{publishers::PublisherLocalityConf, wrappers::EntityGlobalId}; #[cfg(feature = "unstable")] use zenoh_protocol::core::Reliability; use zenoh_protocol::{ diff --git a/zenoh/src/api/session.rs b/zenoh/src/api/session.rs index 3506974aa6..b846de62e9 100644 --- a/zenoh/src/api/session.rs +++ b/zenoh/src/api/session.rs @@ -32,7 +32,7 @@ use uhlc::HLC; use zenoh_buffers::ZBuf; use zenoh_collections::SingleOrVec; use zenoh_config::{ - builders::{PublisherBuilderOptionsConf, PublisherBuildersConf}, + publishers::{PublisherBuilderOptionsConf, PublisherBuildersConf}, unwrap_or_default, wrappers::ZenohId, }; From ea15fd89ba41bd4dfb2e5f8a5756e8ad573dca65 Mon Sep 17 00:00:00 2001 From: Oussama Teffahi Date: Fri, 29 Nov 2024 17:29:42 +0100 Subject: [PATCH 10/27] Change config format to qos/put --- DEFAULT_CONFIG.json5 | 13 ++++++----- commons/zenoh-config/src/lib.rs | 12 +++++------ commons/zenoh-config/src/publishers.rs | 30 +++++++++++--------------- zenoh/src/api/builders/publisher.rs | 4 ++-- zenoh/src/api/builders/session.rs | 8 +++---- zenoh/src/api/session.rs | 10 ++++----- 6 files changed, 36 insertions(+), 41 deletions(-) diff --git a/DEFAULT_CONFIG.json5 b/DEFAULT_CONFIG.json5 index 65b268bd55..118bbe8881 100644 --- a/DEFAULT_CONFIG.json5 +++ b/DEFAULT_CONFIG.json5 @@ -184,16 +184,15 @@ }, }, - // /// Configuration options for Zenoh publishers - // publishers: { - // /// Overwrite default builders for certain key expressions - // default_builders: [ + // /// Overwrite QoS options for Zenoh messages by key expression (ignores Zenoh API QoS config) + // qos: { + // /// Overwrite QoS options for PUT messages + // put: [ // { - // /// Publisher key expressions that are included by these key expressions - // /// will have their default publisher builder overwritten by the given config. + // /// PUT messages on key expressions that are included by these key expressions + // /// will have their QoS options overwritten by the given config. // key_exprs: ["demo/**", "example/key"], // /// Configurations that will be applied on the publisher builder - // /// Note: these can still be overwritten in the application code via the PublisherBuilder API // config: { // congestion_control: "Block", // encoding: "zenoh/example", diff --git a/commons/zenoh-config/src/lib.rs b/commons/zenoh-config/src/lib.rs index 38dd011871..4404f4305a 100644 --- a/commons/zenoh-config/src/lib.rs +++ b/commons/zenoh-config/src/lib.rs @@ -31,7 +31,7 @@ use std::{ }; use include::recursive_include; -use publishers::PublisherBuildersConf; +use publishers::PublisherQoSConfList; use secrecy::{CloneableSecret, DebugSecret, Secret, SerializableSecret, Zeroize}; use serde::{Deserialize, Serialize}; use serde_json::{Map, Value}; @@ -363,11 +363,11 @@ validated_struct::validator! { publishers: Vec, }, - /// Overwrite default builders for specific key expressions - pub publishers: #[derive(Default)] - PublishersConfig { - /// A list of publisher builder configurations for key expressions. - default_builders: PublisherBuildersConf, + /// Overwrite QoS options for Zenoh messages by key expression (ignores Zenoh API QoS config) + pub qos: #[derive(Default)] + QoSConfig { + /// A list of QoS configurations for PUT messages by key expressions + put: PublisherQoSConfList, }, pub transport: #[derive(Default)] diff --git a/commons/zenoh-config/src/publishers.rs b/commons/zenoh-config/src/publishers.rs index c78e61d96e..c992b96f15 100644 --- a/commons/zenoh-config/src/publishers.rs +++ b/commons/zenoh-config/src/publishers.rs @@ -1,27 +1,23 @@ use std::collections::HashSet; use serde::{Deserialize, Deserializer, Serialize}; -pub use validated_struct::{GetError, ValidatedMap}; use zenoh_keyexpr::keyexpr_tree::{IKeyExprTreeMut, KeBoxTree}; use zenoh_protocol::core::{key_expr::OwnedKeyExpr, CongestionControl, Reliability}; -pub use zenoh_protocol::core::{ - whatami, EndPoint, Locator, WhatAmI, WhatAmIMatcher, WhatAmIMatcherVisitor, -}; #[derive(Debug, Deserialize, Default, Serialize, Clone)] #[serde(remote = "Self")] -pub struct PublisherBuildersConf(pub(crate) Vec); +pub struct PublisherQoSConfList(pub(crate) Vec); -impl<'de> Deserialize<'de> for PublisherBuildersConf { +impl<'de> Deserialize<'de> for PublisherQoSConfList { fn deserialize(deserializer: D) -> Result where D: Deserializer<'de>, { - let builders = PublisherBuildersConf::deserialize(deserializer)?; + let put_qos_list = PublisherQoSConfList::deserialize(deserializer)?; // check for invariant: each key_expr should be unique let mut key_set = HashSet::new(); - for builder in &builders.0 { - for key_expr in &builder.key_exprs { + for put_qos in &put_qos_list.0 { + for key_expr in &put_qos.key_exprs { if !key_set.insert(key_expr) { return Err(format!( "duplicated key_expr '{key_expr}' found in publisher builders config" @@ -30,21 +26,21 @@ impl<'de> Deserialize<'de> for PublisherBuildersConf { } } } - Ok(builders) + Ok(put_qos_list) } } -impl Serialize for PublisherBuildersConf { +impl Serialize for PublisherQoSConfList { fn serialize(&self, serializer: S) -> Result where S: serde::Serializer, { - PublisherBuildersConf::serialize(self, serializer) + PublisherQoSConfList::serialize(self, serializer) } } -impl From for KeBoxTree { - fn from(value: PublisherBuildersConf) -> KeBoxTree { +impl From for KeBoxTree { + fn from(value: PublisherQoSConfList) -> KeBoxTree { let mut tree = KeBoxTree::new(); for conf in value.0 { for key_expr in conf.key_exprs { @@ -57,13 +53,13 @@ impl From for KeBoxTree { } #[derive(Debug, Deserialize, Serialize, Clone)] -pub(crate) struct PublisherBuildersInnerConf { +pub(crate) struct PublisherQoSConf { pub key_exprs: Vec, - pub config: PublisherBuilderOptionsConf, + pub config: PublisherQoSConfig, } #[derive(Debug, Default, Deserialize, Serialize, Clone)] -pub struct PublisherBuilderOptionsConf { +pub struct PublisherQoSConfig { pub congestion_control: Option, pub encoding: Option, // Encoding has From<&str> pub priority: Option, diff --git a/zenoh/src/api/builders/publisher.rs b/zenoh/src/api/builders/publisher.rs index 1428edb7e8..4684858fae 100644 --- a/zenoh/src/api/builders/publisher.rs +++ b/zenoh/src/api/builders/publisher.rs @@ -14,7 +14,7 @@ use std::future::{IntoFuture, Ready}; use itertools::Itertools; -use zenoh_config::publishers::PublisherBuilderOptionsConf; +use zenoh_config::publishers::PublisherQoSConfig; use zenoh_core::{Resolvable, Result as ZResult, Wait}; use zenoh_keyexpr::keyexpr_tree::{IKeyExprTree, IKeyExprTreeNode}; #[cfg(feature = "unstable")] @@ -351,7 +351,7 @@ impl<'a, 'b> PublisherBuilder<'a, 'b> { 'c: 'b, { let maybe_key_expr = key_expr.try_into().map_err(Into::into); - let mut builder_overwrites = PublisherBuilderOptionsConf::default(); + let mut builder_overwrites = PublisherQoSConfig::default(); if let Ok(key_expr) = &maybe_key_expr { // get overwritten builder let state = zread!(session.0.state); diff --git a/zenoh/src/api/builders/session.rs b/zenoh/src/api/builders/session.rs index 04481324c9..b894ae79c9 100644 --- a/zenoh/src/api/builders/session.rs +++ b/zenoh/src/api/builders/session.rs @@ -17,7 +17,7 @@ use std::future::{IntoFuture, Ready}; use std::sync::Arc; #[cfg(feature = "internal")] -use zenoh_config::publishers::PublisherBuildersConf; +use zenoh_config::publishers::PublisherQoSConfList; use zenoh_core::{Resolvable, Wait}; #[cfg(feature = "internal")] use zenoh_keyexpr::OwnedKeyExpr; @@ -124,7 +124,7 @@ pub fn init(runtime: Runtime) -> InitBuilder { runtime, aggregated_subscribers: vec![], aggregated_publishers: vec![], - publisher_builders: PublisherBuildersConf::default(), + publisher_builders: PublisherQoSConfList::default(), } } @@ -136,7 +136,7 @@ pub struct InitBuilder { runtime: Runtime, aggregated_subscribers: Vec, aggregated_publishers: Vec, - publisher_builders: PublisherBuildersConf, + publisher_builders: PublisherQoSConfList, } #[zenoh_macros::internal] @@ -153,7 +153,7 @@ impl InitBuilder { self } - pub fn publisher_builders(mut self, builder_conf: PublisherBuildersConf) -> Self { + pub fn publisher_builders(mut self, builder_conf: PublisherQoSConfList) -> Self { self.publisher_builders = builder_conf; self } diff --git a/zenoh/src/api/session.rs b/zenoh/src/api/session.rs index b846de62e9..ff215f64db 100644 --- a/zenoh/src/api/session.rs +++ b/zenoh/src/api/session.rs @@ -32,7 +32,7 @@ use uhlc::HLC; use zenoh_buffers::ZBuf; use zenoh_collections::SingleOrVec; use zenoh_config::{ - publishers::{PublisherBuilderOptionsConf, PublisherBuildersConf}, + publishers::{PublisherQoSConfList, PublisherQoSConfig}, unwrap_or_default, wrappers::ZenohId, }; @@ -150,14 +150,14 @@ pub(crate) struct SessionState { pub(crate) liveliness_queries: HashMap, pub(crate) aggregated_subscribers: Vec, pub(crate) aggregated_publishers: Vec, - pub(crate) publisher_builders_tree: KeBoxTree, + pub(crate) publisher_builders_tree: KeBoxTree, } impl SessionState { pub(crate) fn new( aggregated_subscribers: Vec, aggregated_publishers: Vec, - publisher_builders_tree: KeBoxTree, + publisher_builders_tree: KeBoxTree, ) -> SessionState { SessionState { primitives: None, @@ -548,7 +548,7 @@ impl Session { runtime: Runtime, aggregated_subscribers: Vec, aggregated_publishers: Vec, - publisher_builders: PublisherBuildersConf, + publisher_builders: PublisherQoSConfList, owns_runtime: bool, ) -> impl Resolve { ResolveClosure::new(move || { @@ -1053,7 +1053,7 @@ impl Session { tracing::debug!("Config: {:?}", &config); let aggregated_subscribers = config.0.aggregation().subscribers().clone(); let aggregated_publishers = config.0.aggregation().publishers().clone(); - let publisher_builders = config.0.publishers().default_builders().clone(); + let publisher_builders = config.0.qos().put().clone(); #[allow(unused_mut)] // Required for shared-memory let mut runtime = RuntimeBuilder::new(config); #[cfg(feature = "shared-memory")] From eb8645f85d9af08ca7dbdc43858ba67fc6defe71 Mon Sep 17 00:00:00 2001 From: Oussama Teffahi Date: Fri, 29 Nov 2024 17:32:24 +0100 Subject: [PATCH 11/27] Rename publishers module to qos --- commons/zenoh-config/src/lib.rs | 4 ++-- commons/zenoh-config/src/{publishers.rs => qos.rs} | 0 zenoh/src/api/builders/publisher.rs | 2 +- zenoh/src/api/builders/session.rs | 2 +- zenoh/src/api/publisher.rs | 2 +- zenoh/src/api/sample.rs | 2 +- zenoh/src/api/session.rs | 2 +- 7 files changed, 7 insertions(+), 7 deletions(-) rename commons/zenoh-config/src/{publishers.rs => qos.rs} (100%) diff --git a/commons/zenoh-config/src/lib.rs b/commons/zenoh-config/src/lib.rs index 4404f4305a..bec830a30e 100644 --- a/commons/zenoh-config/src/lib.rs +++ b/commons/zenoh-config/src/lib.rs @@ -21,7 +21,7 @@ //! Configuration to pass to `zenoh::open()` and `zenoh::scout()` functions and associated constants. pub mod defaults; mod include; -pub mod publishers; +pub mod qos; pub mod wrappers; #[allow(unused_imports)] @@ -31,7 +31,7 @@ use std::{ }; use include::recursive_include; -use publishers::PublisherQoSConfList; +use qos::PublisherQoSConfList; use secrecy::{CloneableSecret, DebugSecret, Secret, SerializableSecret, Zeroize}; use serde::{Deserialize, Serialize}; use serde_json::{Map, Value}; diff --git a/commons/zenoh-config/src/publishers.rs b/commons/zenoh-config/src/qos.rs similarity index 100% rename from commons/zenoh-config/src/publishers.rs rename to commons/zenoh-config/src/qos.rs diff --git a/zenoh/src/api/builders/publisher.rs b/zenoh/src/api/builders/publisher.rs index 4684858fae..6f1e939fc3 100644 --- a/zenoh/src/api/builders/publisher.rs +++ b/zenoh/src/api/builders/publisher.rs @@ -14,7 +14,7 @@ use std::future::{IntoFuture, Ready}; use itertools::Itertools; -use zenoh_config::publishers::PublisherQoSConfig; +use zenoh_config::qos::PublisherQoSConfig; use zenoh_core::{Resolvable, Result as ZResult, Wait}; use zenoh_keyexpr::keyexpr_tree::{IKeyExprTree, IKeyExprTreeNode}; #[cfg(feature = "unstable")] diff --git a/zenoh/src/api/builders/session.rs b/zenoh/src/api/builders/session.rs index b894ae79c9..4851d1712e 100644 --- a/zenoh/src/api/builders/session.rs +++ b/zenoh/src/api/builders/session.rs @@ -17,7 +17,7 @@ use std::future::{IntoFuture, Ready}; use std::sync::Arc; #[cfg(feature = "internal")] -use zenoh_config::publishers::PublisherQoSConfList; +use zenoh_config::qos::PublisherQoSConfList; use zenoh_core::{Resolvable, Wait}; #[cfg(feature = "internal")] use zenoh_keyexpr::OwnedKeyExpr; diff --git a/zenoh/src/api/publisher.rs b/zenoh/src/api/publisher.rs index 85bfb5ad0c..cb10b9a588 100644 --- a/zenoh/src/api/publisher.rs +++ b/zenoh/src/api/publisher.rs @@ -22,7 +22,7 @@ use std::{ use futures::Sink; use tracing::error; -use zenoh_config::publishers::PublisherPriorityConf; +use zenoh_config::qos::PublisherPriorityConf; use zenoh_core::{Resolvable, Resolve, Wait}; use zenoh_protocol::core::CongestionControl; use zenoh_result::{Error, ZResult}; diff --git a/zenoh/src/api/sample.rs b/zenoh/src/api/sample.rs index 28648c378e..a9fe7c30af 100644 --- a/zenoh/src/api/sample.rs +++ b/zenoh/src/api/sample.rs @@ -16,7 +16,7 @@ use std::{convert::TryFrom, fmt}; use serde::{Deserialize, Serialize}; -use zenoh_config::{publishers::PublisherLocalityConf, wrappers::EntityGlobalId}; +use zenoh_config::{qos::PublisherLocalityConf, wrappers::EntityGlobalId}; #[cfg(feature = "unstable")] use zenoh_protocol::core::Reliability; use zenoh_protocol::{ diff --git a/zenoh/src/api/session.rs b/zenoh/src/api/session.rs index ff215f64db..4306d53405 100644 --- a/zenoh/src/api/session.rs +++ b/zenoh/src/api/session.rs @@ -32,7 +32,7 @@ use uhlc::HLC; use zenoh_buffers::ZBuf; use zenoh_collections::SingleOrVec; use zenoh_config::{ - publishers::{PublisherQoSConfList, PublisherQoSConfig}, + qos::{PublisherQoSConfList, PublisherQoSConfig}, unwrap_or_default, wrappers::ZenohId, }; From bd1bf11d90f9e8976e154f652782698075c02589 Mon Sep 17 00:00:00 2001 From: Oussama Teffahi Date: Fri, 29 Nov 2024 18:02:19 +0100 Subject: [PATCH 12/27] Get QoS config from runtime instead of drilling through session API --- zenoh/src/api/builders/publisher.rs | 2 +- zenoh/src/api/builders/session.rs | 10 ---------- zenoh/src/api/session.rs | 20 ++++++++------------ 3 files changed, 9 insertions(+), 23 deletions(-) diff --git a/zenoh/src/api/builders/publisher.rs b/zenoh/src/api/builders/publisher.rs index 6f1e939fc3..6e83919251 100644 --- a/zenoh/src/api/builders/publisher.rs +++ b/zenoh/src/api/builders/publisher.rs @@ -356,7 +356,7 @@ impl<'a, 'b> PublisherBuilder<'a, 'b> { // get overwritten builder let state = zread!(session.0.state); let nodes_including = state - .publisher_builders_tree + .publisher_qos_tree .nodes_including(key_expr) .collect_vec(); for node in &nodes_including { diff --git a/zenoh/src/api/builders/session.rs b/zenoh/src/api/builders/session.rs index 4851d1712e..0ada1f4a67 100644 --- a/zenoh/src/api/builders/session.rs +++ b/zenoh/src/api/builders/session.rs @@ -16,8 +16,6 @@ use std::future::{IntoFuture, Ready}; #[cfg(feature = "shared-memory")] use std::sync::Arc; -#[cfg(feature = "internal")] -use zenoh_config::qos::PublisherQoSConfList; use zenoh_core::{Resolvable, Wait}; #[cfg(feature = "internal")] use zenoh_keyexpr::OwnedKeyExpr; @@ -124,7 +122,6 @@ pub fn init(runtime: Runtime) -> InitBuilder { runtime, aggregated_subscribers: vec![], aggregated_publishers: vec![], - publisher_builders: PublisherQoSConfList::default(), } } @@ -136,7 +133,6 @@ pub struct InitBuilder { runtime: Runtime, aggregated_subscribers: Vec, aggregated_publishers: Vec, - publisher_builders: PublisherQoSConfList, } #[zenoh_macros::internal] @@ -152,11 +148,6 @@ impl InitBuilder { self.aggregated_publishers = exprs; self } - - pub fn publisher_builders(mut self, builder_conf: PublisherQoSConfList) -> Self { - self.publisher_builders = builder_conf; - self - } } #[zenoh_macros::internal] @@ -171,7 +162,6 @@ impl Wait for InitBuilder { self.runtime, self.aggregated_subscribers, self.aggregated_publishers, - self.publisher_builders, false, ) .wait()) diff --git a/zenoh/src/api/session.rs b/zenoh/src/api/session.rs index 4306d53405..cee4006631 100644 --- a/zenoh/src/api/session.rs +++ b/zenoh/src/api/session.rs @@ -31,11 +31,7 @@ use uhlc::Timestamp; use uhlc::HLC; use zenoh_buffers::ZBuf; use zenoh_collections::SingleOrVec; -use zenoh_config::{ - qos::{PublisherQoSConfList, PublisherQoSConfig}, - unwrap_or_default, - wrappers::ZenohId, -}; +use zenoh_config::{qos::PublisherQoSConfig, unwrap_or_default, wrappers::ZenohId}; use zenoh_core::{zconfigurable, zread, Resolve, ResolveClosure, ResolveFuture, Wait}; use zenoh_keyexpr::keyexpr_tree::KeBoxTree; #[cfg(feature = "unstable")] @@ -150,14 +146,14 @@ pub(crate) struct SessionState { pub(crate) liveliness_queries: HashMap, pub(crate) aggregated_subscribers: Vec, pub(crate) aggregated_publishers: Vec, - pub(crate) publisher_builders_tree: KeBoxTree, + pub(crate) publisher_qos_tree: KeBoxTree, } impl SessionState { pub(crate) fn new( aggregated_subscribers: Vec, aggregated_publishers: Vec, - publisher_builders_tree: KeBoxTree, + publisher_qos_tree: KeBoxTree, ) -> SessionState { SessionState { primitives: None, @@ -185,7 +181,7 @@ impl SessionState { liveliness_queries: HashMap::new(), aggregated_subscribers, aggregated_publishers, - publisher_builders_tree, + publisher_qos_tree, } } } @@ -548,15 +544,17 @@ impl Session { runtime: Runtime, aggregated_subscribers: Vec, aggregated_publishers: Vec, - publisher_builders: PublisherQoSConfList, owns_runtime: bool, ) -> impl Resolve { ResolveClosure::new(move || { let router = runtime.router(); + let config = runtime.config().lock(); + let publisher_qos = config.0.qos().put().clone(); + drop(config); let state = RwLock::new(SessionState::new( aggregated_subscribers, aggregated_publishers, - publisher_builders.into(), + publisher_qos.into(), )); let session = Session(Arc::new(SessionInner { weak_counter: Mutex::new(0), @@ -1053,7 +1051,6 @@ impl Session { tracing::debug!("Config: {:?}", &config); let aggregated_subscribers = config.0.aggregation().subscribers().clone(); let aggregated_publishers = config.0.aggregation().publishers().clone(); - let publisher_builders = config.0.qos().put().clone(); #[allow(unused_mut)] // Required for shared-memory let mut runtime = RuntimeBuilder::new(config); #[cfg(feature = "shared-memory")] @@ -1066,7 +1063,6 @@ impl Session { runtime.clone(), aggregated_subscribers, aggregated_publishers, - publisher_builders, true, ) .await; From dafadcc10fbdcf94a0dcdda3dc0a7e8293744507 Mon Sep 17 00:00:00 2001 From: Oussama Teffahi Date: Fri, 29 Nov 2024 18:40:38 +0100 Subject: [PATCH 13/27] Update QoS config test --- zenoh/tests/qos.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/zenoh/tests/qos.rs b/zenoh/tests/qos.rs index 2334ad9cf9..9558af761a 100644 --- a/zenoh/tests/qos.rs +++ b/zenoh/tests/qos.rs @@ -64,12 +64,12 @@ async fn qos_pubsub() { } #[tokio::test(flavor = "multi_thread", worker_threads = 4)] -async fn qos_pubsub_overwrite_builder() { +async fn qos_pubsub_overwrite_config() { let builder_config_overwrite = zenoh::Config::from_json5( r#" { - publishers: { - default_builders: [ + qos: { + put: [ { key_exprs: ["test/qos/overwritten", "test/not_applicable/**"], config: { From 2d869aed53a19ccc6633329d06800ceb0197203c Mon Sep 17 00:00:00 2001 From: Oussama Teffahi Date: Fri, 29 Nov 2024 18:55:50 +0100 Subject: [PATCH 14/27] Remove unnecessary lifetime --- zenoh/src/api/builders/publisher.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/zenoh/src/api/builders/publisher.rs b/zenoh/src/api/builders/publisher.rs index 5978db4507..ebcf1ef01a 100644 --- a/zenoh/src/api/builders/publisher.rs +++ b/zenoh/src/api/builders/publisher.rs @@ -344,11 +344,10 @@ impl QoSBuilderTrait for PublisherBuilder<'_, '_> { } impl<'a, 'b> PublisherBuilder<'a, 'b> { - pub fn new<'c, TryIntoKeyExpr>(session: &'a Session, key_expr: TryIntoKeyExpr) -> Self + pub fn new(session: &'a Session, key_expr: TryIntoKeyExpr) -> Self where - TryIntoKeyExpr: TryInto>, - >>::Error: Into, - 'c: 'b, + TryIntoKeyExpr: TryInto>, + >>::Error: Into, { let maybe_key_expr = key_expr.try_into().map_err(Into::into); let mut builder_overwrites = PublisherQoSConfig::default(); From 9ca7467c091f77fb413537417a309cec377cd24b Mon Sep 17 00:00:00 2001 From: Oussama Teffahi Date: Mon, 2 Dec 2024 13:10:59 +0100 Subject: [PATCH 15/27] Overwrite PublisherBuilder API calls with config --- zenoh/src/api/builders/publisher.rs | 46 +++++++++++++++++++++-------- 1 file changed, 34 insertions(+), 12 deletions(-) diff --git a/zenoh/src/api/builders/publisher.rs b/zenoh/src/api/builders/publisher.rs index ebcf1ef01a..4ae73d24b2 100644 --- a/zenoh/src/api/builders/publisher.rs +++ b/zenoh/src/api/builders/publisher.rs @@ -153,9 +153,12 @@ impl PublicationBuilder, T> { #[zenoh_macros::internal_trait] impl EncodingBuilderTrait for PublisherBuilder<'_, '_> { fn encoding>(self, encoding: T) -> Self { - Self { - encoding: encoding.into(), - ..self + match self.config_overwrite { + true => self, + false => Self { + encoding: encoding.into(), + ..self + }, } } } @@ -295,6 +298,7 @@ pub struct PublisherBuilder<'a, 'b> { #[cfg(feature = "unstable")] pub(crate) reliability: Reliability, pub(crate) destination: Locality, + pub(crate) config_overwrite: bool, } impl Clone for PublisherBuilder<'_, '_> { @@ -312,6 +316,7 @@ impl Clone for PublisherBuilder<'_, '_> { #[cfg(feature = "unstable")] reliability: self.reliability, destination: self.destination, + config_overwrite: self.config_overwrite, } } } @@ -321,16 +326,22 @@ impl QoSBuilderTrait for PublisherBuilder<'_, '_> { /// Changes the [`crate::qos::CongestionControl`] to apply when routing the data. #[inline] fn congestion_control(self, congestion_control: CongestionControl) -> Self { - Self { - congestion_control, - ..self + match self.config_overwrite { + true => self, + false => Self { + congestion_control, + ..self + }, } } /// Changes the [`crate::qos::Priority`] of the written data. #[inline] fn priority(self, priority: Priority) -> Self { - Self { priority, ..self } + match self.config_overwrite { + true => self, + false => Self { priority, ..self }, + } } /// Changes the Express policy to apply when routing the data. @@ -339,7 +350,10 @@ impl QoSBuilderTrait for PublisherBuilder<'_, '_> { /// This usually has a positive impact on latency but negative impact on throughput. #[inline] fn express(self, is_express: bool) -> Self { - Self { is_express, ..self } + match self.config_overwrite { + true => self, + false => Self { is_express, ..self }, + } } } @@ -351,6 +365,7 @@ impl<'a, 'b> PublisherBuilder<'a, 'b> { { let maybe_key_expr = key_expr.try_into().map_err(Into::into); let mut builder_overwrites = PublisherQoSConfig::default(); + let mut config_overwrite = false; if let Ok(key_expr) = &maybe_key_expr { // get overwritten builder let state = zread!(session.0.state); @@ -370,6 +385,7 @@ impl<'a, 'b> PublisherBuilder<'a, 'b> { node.keyexpr(), ); } + config_overwrite = true; break; } } @@ -400,6 +416,7 @@ impl<'a, 'b> PublisherBuilder<'a, 'b> { .allowed_destination .map(|d| d.into()) .unwrap_or(Locality::default()), + config_overwrite, } } @@ -410,7 +427,9 @@ impl<'a, 'b> PublisherBuilder<'a, 'b> { #[zenoh_macros::unstable] #[inline] pub fn allowed_destination(mut self, destination: Locality) -> Self { - self.destination = destination; + if !self.config_overwrite { + self.destination = destination; + } self } @@ -422,9 +441,12 @@ impl<'a, 'b> PublisherBuilder<'a, 'b> { #[zenoh_macros::unstable] #[inline] pub fn reliability(self, reliability: Reliability) -> Self { - Self { - reliability, - ..self + match self.config_overwrite { + true => self, + false => Self { + reliability, + ..self + }, } } } From e37407ca811e759d2649ae3bd9b1d907378811eb Mon Sep 17 00:00:00 2001 From: Oussama Teffahi Date: Mon, 2 Dec 2024 13:20:34 +0100 Subject: [PATCH 16/27] Update QoS config test --- zenoh/tests/qos.rs | 58 ++++++++++++++++++++++++++++++++++++---------- 1 file changed, 46 insertions(+), 12 deletions(-) diff --git a/zenoh/tests/qos.rs b/zenoh/tests/qos.rs index 9558af761a..6409c24f5b 100644 --- a/zenoh/tests/qos.rs +++ b/zenoh/tests/qos.rs @@ -63,8 +63,11 @@ async fn qos_pubsub() { assert!(!sample.express()); } +#[cfg(feature = "unstable")] #[tokio::test(flavor = "multi_thread", worker_threads = 4)] async fn qos_pubsub_overwrite_config() { + use zenoh::qos::Reliability; + let builder_config_overwrite = zenoh::Config::from_json5( r#" { @@ -73,19 +76,23 @@ async fn qos_pubsub_overwrite_config() { { key_exprs: ["test/qos/overwritten", "test/not_applicable/**"], config: { - priority: "real_time", - encoding: "zenoh/string", congestion_control: "drop", + encoding: "zenoh/string", + priority: "real_time", express: false, + reliability: "best_effort", + allowed_destination: "any", }, }, { key_exprs: ["test/not_applicable"], config: { - priority: "data_high", - encoding: "zenoh/bytes", congestion_control: "drop", + encoding: "zenoh/bytes", + priority: "data_high", express: false, + reliability: "best_effort", + allowed_destination: "any", }, }, ] @@ -100,15 +107,20 @@ async fn qos_pubsub_overwrite_config() { let overwrite_config_publisher = ztimeout!(session1 .declare_publisher("test/qos/overwritten") .congestion_control(CongestionControl::Block) - .express(true)) + .encoding(Encoding::TEXT_PLAIN) + .priority(Priority::DataLow) + .express(true) + .reliability(zenoh::qos::Reliability::Reliable) + .allowed_destination(zenoh::sample::Locality::SessionLocal)) .unwrap(); let no_overwrite_config_publisher = ztimeout!(session1 .declare_publisher("test/qos/no_overwrite") + .congestion_control(CongestionControl::Block) .encoding(Encoding::TEXT_PLAIN) .priority(Priority::DataLow) - .congestion_control(CongestionControl::Drop) - .express(false)) + .express(true) + .reliability(zenoh::qos::Reliability::Reliable)) .unwrap(); let subscriber = ztimeout!(session2.declare_subscriber("test/qos/**")).unwrap(); @@ -117,16 +129,38 @@ async fn qos_pubsub_overwrite_config() { ztimeout!(overwrite_config_publisher.put("qos")).unwrap(); let sample = ztimeout!(subscriber.recv_async()).unwrap(); - assert_eq!(sample.priority(), Priority::RealTime); + assert_eq!(sample.congestion_control(), CongestionControl::Drop); assert_eq!(sample.encoding(), &Encoding::ZENOH_STRING); - assert_eq!(sample.congestion_control(), CongestionControl::Block); - assert!(sample.express()); + assert_eq!(sample.priority(), Priority::RealTime); + assert!(!sample.express()); + assert_eq!(sample.reliability(), Reliability::BestEffort); + + ztimeout!(overwrite_config_publisher.delete()).unwrap(); + let sample = ztimeout!(subscriber.recv_async()).unwrap(); + + assert_eq!(sample.congestion_control(), CongestionControl::Drop); + // Delete encoding is hardcoded to ZENOH_BYTES + assert_eq!(sample.encoding(), &Encoding::ZENOH_BYTES); + assert_eq!(sample.priority(), Priority::RealTime); + assert!(!sample.express()); + assert_eq!(sample.reliability(), Reliability::BestEffort); ztimeout!(no_overwrite_config_publisher.put("qos")).unwrap(); let sample = ztimeout!(subscriber.recv_async()).unwrap(); + assert_eq!(sample.congestion_control(), CongestionControl::Block); assert_eq!(sample.encoding(), &Encoding::TEXT_PLAIN); assert_eq!(sample.priority(), Priority::DataLow); - assert_eq!(sample.congestion_control(), CongestionControl::Drop); - assert!(!sample.express()); + assert!(sample.express()); + assert_eq!(sample.reliability(), Reliability::Reliable); + + ztimeout!(no_overwrite_config_publisher.delete()).unwrap(); + let sample = ztimeout!(subscriber.recv_async()).unwrap(); + + assert_eq!(sample.congestion_control(), CongestionControl::Block); + // Delete encoding is hardcoded to ZENOH_BYTES + assert_eq!(sample.encoding(), &Encoding::ZENOH_BYTES); + assert_eq!(sample.priority(), Priority::DataLow); + assert!(sample.express()); + assert_eq!(sample.reliability(), Reliability::Reliable); } From 2ec8a0f5dda7427783bd912b59dd52d27bd784d1 Mon Sep 17 00:00:00 2001 From: Oussama Teffahi Date: Mon, 2 Dec 2024 14:54:37 +0100 Subject: [PATCH 17/27] Remove encoding from QoS config --- DEFAULT_CONFIG.json5 | 1 - commons/zenoh-config/src/qos.rs | 1 - zenoh/src/api/builders/publisher.rs | 5 +---- zenoh/tests/qos.rs | 10 ---------- 4 files changed, 1 insertion(+), 16 deletions(-) diff --git a/DEFAULT_CONFIG.json5 b/DEFAULT_CONFIG.json5 index 9bca19a97a..edf8914dcc 100644 --- a/DEFAULT_CONFIG.json5 +++ b/DEFAULT_CONFIG.json5 @@ -195,7 +195,6 @@ // /// Configurations that will be applied on the publisher builder // config: { // congestion_control: "Block", - // encoding: "zenoh/example", // priority: "data_high", // express: true, // reliability: "best_effort", diff --git a/commons/zenoh-config/src/qos.rs b/commons/zenoh-config/src/qos.rs index c992b96f15..f4707f9f25 100644 --- a/commons/zenoh-config/src/qos.rs +++ b/commons/zenoh-config/src/qos.rs @@ -61,7 +61,6 @@ pub(crate) struct PublisherQoSConf { #[derive(Debug, Default, Deserialize, Serialize, Clone)] pub struct PublisherQoSConfig { pub congestion_control: Option, - pub encoding: Option, // Encoding has From<&str> pub priority: Option, pub express: Option, #[cfg(feature = "unstable")] diff --git a/zenoh/src/api/builders/publisher.rs b/zenoh/src/api/builders/publisher.rs index 4ae73d24b2..989511ecb0 100644 --- a/zenoh/src/api/builders/publisher.rs +++ b/zenoh/src/api/builders/publisher.rs @@ -394,10 +394,7 @@ impl<'a, 'b> PublisherBuilder<'a, 'b> { Self { session, key_expr: maybe_key_expr, - encoding: builder_overwrites - .encoding - .map(|encoding| encoding.into()) - .unwrap_or(Encoding::default()), + encoding: Encoding::default(), congestion_control: builder_overwrites .congestion_control .map(|cc| cc.into()) diff --git a/zenoh/tests/qos.rs b/zenoh/tests/qos.rs index 6409c24f5b..d1fe8684c2 100644 --- a/zenoh/tests/qos.rs +++ b/zenoh/tests/qos.rs @@ -77,7 +77,6 @@ async fn qos_pubsub_overwrite_config() { key_exprs: ["test/qos/overwritten", "test/not_applicable/**"], config: { congestion_control: "drop", - encoding: "zenoh/string", priority: "real_time", express: false, reliability: "best_effort", @@ -88,7 +87,6 @@ async fn qos_pubsub_overwrite_config() { key_exprs: ["test/not_applicable"], config: { congestion_control: "drop", - encoding: "zenoh/bytes", priority: "data_high", express: false, reliability: "best_effort", @@ -107,7 +105,6 @@ async fn qos_pubsub_overwrite_config() { let overwrite_config_publisher = ztimeout!(session1 .declare_publisher("test/qos/overwritten") .congestion_control(CongestionControl::Block) - .encoding(Encoding::TEXT_PLAIN) .priority(Priority::DataLow) .express(true) .reliability(zenoh::qos::Reliability::Reliable) @@ -117,7 +114,6 @@ async fn qos_pubsub_overwrite_config() { let no_overwrite_config_publisher = ztimeout!(session1 .declare_publisher("test/qos/no_overwrite") .congestion_control(CongestionControl::Block) - .encoding(Encoding::TEXT_PLAIN) .priority(Priority::DataLow) .express(true) .reliability(zenoh::qos::Reliability::Reliable)) @@ -130,7 +126,6 @@ async fn qos_pubsub_overwrite_config() { let sample = ztimeout!(subscriber.recv_async()).unwrap(); assert_eq!(sample.congestion_control(), CongestionControl::Drop); - assert_eq!(sample.encoding(), &Encoding::ZENOH_STRING); assert_eq!(sample.priority(), Priority::RealTime); assert!(!sample.express()); assert_eq!(sample.reliability(), Reliability::BestEffort); @@ -139,8 +134,6 @@ async fn qos_pubsub_overwrite_config() { let sample = ztimeout!(subscriber.recv_async()).unwrap(); assert_eq!(sample.congestion_control(), CongestionControl::Drop); - // Delete encoding is hardcoded to ZENOH_BYTES - assert_eq!(sample.encoding(), &Encoding::ZENOH_BYTES); assert_eq!(sample.priority(), Priority::RealTime); assert!(!sample.express()); assert_eq!(sample.reliability(), Reliability::BestEffort); @@ -149,7 +142,6 @@ async fn qos_pubsub_overwrite_config() { let sample = ztimeout!(subscriber.recv_async()).unwrap(); assert_eq!(sample.congestion_control(), CongestionControl::Block); - assert_eq!(sample.encoding(), &Encoding::TEXT_PLAIN); assert_eq!(sample.priority(), Priority::DataLow); assert!(sample.express()); assert_eq!(sample.reliability(), Reliability::Reliable); @@ -158,8 +150,6 @@ async fn qos_pubsub_overwrite_config() { let sample = ztimeout!(subscriber.recv_async()).unwrap(); assert_eq!(sample.congestion_control(), CongestionControl::Block); - // Delete encoding is hardcoded to ZENOH_BYTES - assert_eq!(sample.encoding(), &Encoding::ZENOH_BYTES); assert_eq!(sample.priority(), Priority::DataLow); assert!(sample.express()); assert_eq!(sample.reliability(), Reliability::Reliable); From 097d1944e2bfbfb761b4b51851c563523ffa2e0e Mon Sep 17 00:00:00 2001 From: Oussama Teffahi Date: Mon, 2 Dec 2024 17:46:20 +0100 Subject: [PATCH 18/27] Allow API to change non-overwritten QoS config --- zenoh/src/api/builders/publisher.rs | 62 ++++++++++++++++++----------- zenoh/tests/qos.rs | 6 +-- 2 files changed, 40 insertions(+), 28 deletions(-) diff --git a/zenoh/src/api/builders/publisher.rs b/zenoh/src/api/builders/publisher.rs index 989511ecb0..bc28a45e0e 100644 --- a/zenoh/src/api/builders/publisher.rs +++ b/zenoh/src/api/builders/publisher.rs @@ -153,12 +153,9 @@ impl PublicationBuilder, T> { #[zenoh_macros::internal_trait] impl EncodingBuilderTrait for PublisherBuilder<'_, '_> { fn encoding>(self, encoding: T) -> Self { - match self.config_overwrite { - true => self, - false => Self { - encoding: encoding.into(), - ..self - }, + Self { + encoding: encoding.into(), + ..self } } } @@ -270,6 +267,18 @@ impl IntoFuture for PublicationBuilder, PublicationBuil } } +/// Marks which [`PublisherBuilder`] QoS configurations were overwritten by Zenoh config. +/// Associated [`PublisherBuilder`] methods calls will not modify overwritten configurations +#[derive(Clone, Copy, Debug)] +pub(crate) struct PublicationOverwrittenQoS { + pub(crate) congestion_control: bool, + pub(crate) priority: bool, + pub(crate) express: bool, + #[cfg(feature = "unstable")] + pub(crate) reliability: bool, + pub(crate) destination: bool, +} + /// A builder for initializing a [`Publisher`]. /// /// # Examples @@ -298,7 +307,7 @@ pub struct PublisherBuilder<'a, 'b> { #[cfg(feature = "unstable")] pub(crate) reliability: Reliability, pub(crate) destination: Locality, - pub(crate) config_overwrite: bool, + pub(crate) qos_overwrites: PublicationOverwrittenQoS, } impl Clone for PublisherBuilder<'_, '_> { @@ -316,7 +325,7 @@ impl Clone for PublisherBuilder<'_, '_> { #[cfg(feature = "unstable")] reliability: self.reliability, destination: self.destination, - config_overwrite: self.config_overwrite, + qos_overwrites: self.qos_overwrites, } } } @@ -326,7 +335,7 @@ impl QoSBuilderTrait for PublisherBuilder<'_, '_> { /// Changes the [`crate::qos::CongestionControl`] to apply when routing the data. #[inline] fn congestion_control(self, congestion_control: CongestionControl) -> Self { - match self.config_overwrite { + match self.qos_overwrites.congestion_control { true => self, false => Self { congestion_control, @@ -338,7 +347,7 @@ impl QoSBuilderTrait for PublisherBuilder<'_, '_> { /// Changes the [`crate::qos::Priority`] of the written data. #[inline] fn priority(self, priority: Priority) -> Self { - match self.config_overwrite { + match self.qos_overwrites.priority { true => self, false => Self { priority, ..self }, } @@ -350,7 +359,7 @@ impl QoSBuilderTrait for PublisherBuilder<'_, '_> { /// This usually has a positive impact on latency but negative impact on throughput. #[inline] fn express(self, is_express: bool) -> Self { - match self.config_overwrite { + match self.qos_overwrites.express { true => self, false => Self { is_express, ..self }, } @@ -364,8 +373,7 @@ impl<'a, 'b> PublisherBuilder<'a, 'b> { >>::Error: Into, { let maybe_key_expr = key_expr.try_into().map_err(Into::into); - let mut builder_overwrites = PublisherQoSConfig::default(); - let mut config_overwrite = false; + let mut qos_overwrites = PublisherQoSConfig::default(); if let Ok(key_expr) = &maybe_key_expr { // get overwritten builder let state = zread!(session.0.state); @@ -376,16 +384,15 @@ impl<'a, 'b> PublisherBuilder<'a, 'b> { for node in &nodes_including { // Take the first one yielded by the iterator that has overwrites if let Some(overwrites) = node.weight() { - builder_overwrites = overwrites.clone(); + qos_overwrites = overwrites.clone(); // log warning if multiple keyexprs include it if nodes_including.len() > 1 { tracing::warn!( - "Publisher declared on `{}` which is included by multiple key_exprs in builders config. Using builder config for `{}`", + "Publisher declared on `{}` which is included by multiple key_exprs in qos config. Using qos config for `{}`", key_expr, node.keyexpr(), ); } - config_overwrite = true; break; } } @@ -395,25 +402,32 @@ impl<'a, 'b> PublisherBuilder<'a, 'b> { session, key_expr: maybe_key_expr, encoding: Encoding::default(), - congestion_control: builder_overwrites + congestion_control: qos_overwrites .congestion_control .map(|cc| cc.into()) .unwrap_or(CongestionControl::DEFAULT), - priority: builder_overwrites + priority: qos_overwrites .priority .map(|p| p.into()) .unwrap_or(Priority::DEFAULT), - is_express: builder_overwrites.express.unwrap_or(false), + is_express: qos_overwrites.express.unwrap_or(false), #[cfg(feature = "unstable")] - reliability: builder_overwrites + reliability: qos_overwrites .reliability .map(|r| r.into()) .unwrap_or(Reliability::DEFAULT), - destination: builder_overwrites + destination: qos_overwrites .allowed_destination .map(|d| d.into()) .unwrap_or(Locality::default()), - config_overwrite, + qos_overwrites: PublicationOverwrittenQoS { + congestion_control: qos_overwrites.congestion_control.is_some(), + priority: qos_overwrites.priority.is_some(), + express: qos_overwrites.express.is_some(), + #[cfg(feature = "unstable")] + reliability: qos_overwrites.reliability.is_some(), + destination: qos_overwrites.allowed_destination.is_some(), + }, } } @@ -424,7 +438,7 @@ impl<'a, 'b> PublisherBuilder<'a, 'b> { #[zenoh_macros::unstable] #[inline] pub fn allowed_destination(mut self, destination: Locality) -> Self { - if !self.config_overwrite { + if !self.qos_overwrites.destination { self.destination = destination; } self @@ -438,7 +452,7 @@ impl<'a, 'b> PublisherBuilder<'a, 'b> { #[zenoh_macros::unstable] #[inline] pub fn reliability(self, reliability: Reliability) -> Self { - match self.config_overwrite { + match self.qos_overwrites.reliability { true => self, false => Self { reliability, diff --git a/zenoh/tests/qos.rs b/zenoh/tests/qos.rs index d1fe8684c2..d9da51099c 100644 --- a/zenoh/tests/qos.rs +++ b/zenoh/tests/qos.rs @@ -77,7 +77,6 @@ async fn qos_pubsub_overwrite_config() { key_exprs: ["test/qos/overwritten", "test/not_applicable/**"], config: { congestion_control: "drop", - priority: "real_time", express: false, reliability: "best_effort", allowed_destination: "any", @@ -87,7 +86,6 @@ async fn qos_pubsub_overwrite_config() { key_exprs: ["test/not_applicable"], config: { congestion_control: "drop", - priority: "data_high", express: false, reliability: "best_effort", allowed_destination: "any", @@ -126,7 +124,7 @@ async fn qos_pubsub_overwrite_config() { let sample = ztimeout!(subscriber.recv_async()).unwrap(); assert_eq!(sample.congestion_control(), CongestionControl::Drop); - assert_eq!(sample.priority(), Priority::RealTime); + assert_eq!(sample.priority(), Priority::DataLow); assert!(!sample.express()); assert_eq!(sample.reliability(), Reliability::BestEffort); @@ -134,7 +132,7 @@ async fn qos_pubsub_overwrite_config() { let sample = ztimeout!(subscriber.recv_async()).unwrap(); assert_eq!(sample.congestion_control(), CongestionControl::Drop); - assert_eq!(sample.priority(), Priority::RealTime); + assert_eq!(sample.priority(), Priority::DataLow); assert!(!sample.express()); assert_eq!(sample.reliability(), Reliability::BestEffort); From 028c8fa602a174fe5fbaa6bdced1b9f3bf330788 Mon Sep 17 00:00:00 2001 From: Oussama Teffahi Date: Mon, 2 Dec 2024 18:06:35 +0100 Subject: [PATCH 19/27] Rename config qos/put to qos/publication --- DEFAULT_CONFIG.json5 | 11 ++++++----- commons/zenoh-config/src/lib.rs | 4 ++-- zenoh/src/api/session.rs | 2 +- zenoh/tests/qos.rs | 2 +- 4 files changed, 10 insertions(+), 9 deletions(-) diff --git a/DEFAULT_CONFIG.json5 b/DEFAULT_CONFIG.json5 index edf8914dcc..7d8eb79118 100644 --- a/DEFAULT_CONFIG.json5 +++ b/DEFAULT_CONFIG.json5 @@ -184,15 +184,16 @@ }, }, - // /// Overwrite QoS options for Zenoh messages by key expression (ignores Zenoh API QoS config) + // /// Overwrite QoS options for Zenoh messages by key expression (ignores Zenoh API QoS config for overwritten values) // qos: { - // /// Overwrite QoS options for PUT messages - // put: [ + // /// Overwrite QoS options for PUT and DELETE messages + // publication: [ // { - // /// PUT messages on key expressions that are included by these key expressions + // /// PUT and DELETE messages on key expressions that are included by these key expressions // /// will have their QoS options overwritten by the given config. // key_exprs: ["demo/**", "example/key"], - // /// Configurations that will be applied on the publisher builder + // /// Configurations that will be applied on the publisher. + // /// Options that are supplied here will overwrite the configuration given in Zenoh API // config: { // congestion_control: "Block", // priority: "data_high", diff --git a/commons/zenoh-config/src/lib.rs b/commons/zenoh-config/src/lib.rs index 284ce31a4c..87c1b9b90d 100644 --- a/commons/zenoh-config/src/lib.rs +++ b/commons/zenoh-config/src/lib.rs @@ -366,8 +366,8 @@ validated_struct::validator! { /// Overwrite QoS options for Zenoh messages by key expression (ignores Zenoh API QoS config) pub qos: #[derive(Default)] QoSConfig { - /// A list of QoS configurations for PUT messages by key expressions - put: PublisherQoSConfList, + /// A list of QoS configurations for PUT and DELETE messages by key expressions + publication: PublisherQoSConfList, }, pub transport: #[derive(Default)] diff --git a/zenoh/src/api/session.rs b/zenoh/src/api/session.rs index c061e15dea..46b56da815 100644 --- a/zenoh/src/api/session.rs +++ b/zenoh/src/api/session.rs @@ -545,7 +545,7 @@ impl Session { ResolveClosure::new(move || { let router = runtime.router(); let config = runtime.config().lock(); - let publisher_qos = config.0.qos().put().clone(); + let publisher_qos = config.0.qos().publication().clone(); drop(config); let state = RwLock::new(SessionState::new( aggregated_subscribers, diff --git a/zenoh/tests/qos.rs b/zenoh/tests/qos.rs index d9da51099c..3c911fd4bc 100644 --- a/zenoh/tests/qos.rs +++ b/zenoh/tests/qos.rs @@ -72,7 +72,7 @@ async fn qos_pubsub_overwrite_config() { r#" { qos: { - put: [ + publication: [ { key_exprs: ["test/qos/overwritten", "test/not_applicable/**"], config: { From 58ec7bd8dcf3091b1a943f371a4be7cc2071719e Mon Sep 17 00:00:00 2001 From: Oussama Teffahi Date: Mon, 2 Dec 2024 18:14:10 +0100 Subject: [PATCH 20/27] Enforce 1-to-1 mapping between config and zenoh enums at compile time --- commons/zenoh-config/src/qos.rs | 28 +++++++++++++++++++++++----- zenoh/src/api/publisher.rs | 28 +++++++++++++++++++++------- zenoh/src/api/sample.rs | 16 +++++++++++++--- 3 files changed, 57 insertions(+), 15 deletions(-) diff --git a/commons/zenoh-config/src/qos.rs b/commons/zenoh-config/src/qos.rs index f4707f9f25..089ad4006f 100644 --- a/commons/zenoh-config/src/qos.rs +++ b/commons/zenoh-config/src/qos.rs @@ -78,8 +78,17 @@ pub enum PublisherCongestionControlConf { impl From for CongestionControl { fn from(value: PublisherCongestionControlConf) -> Self { match value { - PublisherCongestionControlConf::Drop => CongestionControl::Drop, - PublisherCongestionControlConf::Block => CongestionControl::Block, + PublisherCongestionControlConf::Drop => Self::Drop, + PublisherCongestionControlConf::Block => Self::Block, + } + } +} + +impl From for PublisherCongestionControlConf { + fn from(value: CongestionControl) -> Self { + match value { + CongestionControl::Drop => Self::Drop, + CongestionControl::Block => Self::Block, } } } @@ -104,10 +113,19 @@ pub enum PublisherReliabilityConf { } impl From for Reliability { - fn from(value: PublisherReliabilityConf) -> Reliability { + fn from(value: PublisherReliabilityConf) -> Self { + match value { + PublisherReliabilityConf::BestEffort => Self::BestEffort, + PublisherReliabilityConf::Reliable => Self::Reliable, + } + } +} + +impl From for PublisherReliabilityConf { + fn from(value: Reliability) -> Self { match value { - PublisherReliabilityConf::BestEffort => Reliability::BestEffort, - PublisherReliabilityConf::Reliable => Reliability::Reliable, + Reliability::BestEffort => Self::BestEffort, + Reliability::Reliable => Self::Reliable, } } } diff --git a/zenoh/src/api/publisher.rs b/zenoh/src/api/publisher.rs index 903951f2c6..1701d9244e 100644 --- a/zenoh/src/api/publisher.rs +++ b/zenoh/src/api/publisher.rs @@ -473,13 +473,27 @@ impl TryFrom for Priority { impl From for Priority { fn from(value: PublisherPriorityConf) -> Self { match value { - PublisherPriorityConf::RealTime => Priority::RealTime, - PublisherPriorityConf::InteractiveHigh => Priority::InteractiveHigh, - PublisherPriorityConf::InteractiveLow => Priority::InteractiveLow, - PublisherPriorityConf::DataHigh => Priority::DataHigh, - PublisherPriorityConf::Data => Priority::Data, - PublisherPriorityConf::DataLow => Priority::DataLow, - PublisherPriorityConf::Background => Priority::Background, + PublisherPriorityConf::RealTime => Self::RealTime, + PublisherPriorityConf::InteractiveHigh => Self::InteractiveHigh, + PublisherPriorityConf::InteractiveLow => Self::InteractiveLow, + PublisherPriorityConf::DataHigh => Self::DataHigh, + PublisherPriorityConf::Data => Self::Data, + PublisherPriorityConf::DataLow => Self::DataLow, + PublisherPriorityConf::Background => Self::Background, + } + } +} + +impl From for PublisherPriorityConf { + fn from(value: Priority) -> Self { + match value { + Priority::RealTime => Self::RealTime, + Priority::InteractiveHigh => Self::InteractiveHigh, + Priority::InteractiveLow => Self::InteractiveLow, + Priority::DataHigh => Self::DataHigh, + Priority::Data => Self::Data, + Priority::DataLow => Self::DataLow, + Priority::Background => Self::Background, } } } diff --git a/zenoh/src/api/sample.rs b/zenoh/src/api/sample.rs index a9fe7c30af..27b43b1b89 100644 --- a/zenoh/src/api/sample.rs +++ b/zenoh/src/api/sample.rs @@ -53,9 +53,19 @@ pub(crate) enum Locality { impl From for Locality { fn from(value: PublisherLocalityConf) -> Self { match value { - PublisherLocalityConf::SessionLocal => Locality::SessionLocal, - PublisherLocalityConf::Remote => Locality::Remote, - PublisherLocalityConf::Any => Locality::Any, + PublisherLocalityConf::SessionLocal => Self::SessionLocal, + PublisherLocalityConf::Remote => Self::Remote, + PublisherLocalityConf::Any => Self::Any, + } + } +} + +impl From for PublisherLocalityConf { + fn from(value: Locality) -> Self { + match value { + Locality::SessionLocal => Self::SessionLocal, + Locality::Remote => Self::Remote, + Locality::Any => Self::Any, } } } From c0ef9d15a3e26b1baccd882f28a2bf9f25a8ff28 Mon Sep 17 00:00:00 2001 From: Oussama Teffahi Date: Mon, 2 Dec 2024 18:41:14 +0100 Subject: [PATCH 21/27] Add session API tests --- zenoh/tests/qos.rs | 90 ++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 80 insertions(+), 10 deletions(-) diff --git a/zenoh/tests/qos.rs b/zenoh/tests/qos.rs index 3c911fd4bc..14a3f2b657 100644 --- a/zenoh/tests/qos.rs +++ b/zenoh/tests/qos.rs @@ -66,9 +66,9 @@ async fn qos_pubsub() { #[cfg(feature = "unstable")] #[tokio::test(flavor = "multi_thread", worker_threads = 4)] async fn qos_pubsub_overwrite_config() { - use zenoh::qos::Reliability; + use zenoh::{qos::Reliability, sample::Locality}; - let builder_config_overwrite = zenoh::Config::from_json5( + let qos_config_overwrite = zenoh::Config::from_json5( r#" { qos: { @@ -94,19 +94,88 @@ async fn qos_pubsub_overwrite_config() { ] } } - "#, + "#, ) .unwrap(); - let session1 = ztimeout!(zenoh::open(builder_config_overwrite)).unwrap(); + let session1 = ztimeout!(zenoh::open(qos_config_overwrite)).unwrap(); let session2 = ztimeout!(zenoh::open(zenoh::Config::default())).unwrap(); + let subscriber = ztimeout!(session2.declare_subscriber("test/qos/**")).unwrap(); + tokio::time::sleep(SLEEP).await; + + // Session API tests + + // Session API - overwritten PUT + ztimeout!(session1 + .put("test/qos/overwritten", "qos") + .congestion_control(CongestionControl::Block) + .priority(Priority::DataLow) + .express(true) + .reliability(Reliability::Reliable) + .allowed_destination(Locality::SessionLocal)) + .unwrap(); + let sample = ztimeout!(subscriber.recv_async()).unwrap(); + + assert_eq!(sample.congestion_control(), CongestionControl::Drop); + assert_eq!(sample.priority(), Priority::DataLow); + assert!(!sample.express()); + assert_eq!(sample.reliability(), Reliability::BestEffort); + + // Session API - overwritten DELETE + ztimeout!(session1 + .delete("test/qos/overwritten") + .congestion_control(CongestionControl::Block) + .priority(Priority::DataLow) + .express(true) + .reliability(Reliability::Reliable) + .allowed_destination(Locality::SessionLocal)) + .unwrap(); + let sample = ztimeout!(subscriber.recv_async()).unwrap(); + + assert_eq!(sample.congestion_control(), CongestionControl::Drop); + assert_eq!(sample.priority(), Priority::DataLow); + assert!(!sample.express()); + assert_eq!(sample.reliability(), Reliability::BestEffort); + + // Session API - non-overwritten PUT + ztimeout!(session1 + .put("test/qos/no_overwrite", "qos") + .congestion_control(CongestionControl::Block) + .priority(Priority::DataLow) + .express(true) + .reliability(Reliability::Reliable)) + .unwrap(); + let sample = ztimeout!(subscriber.recv_async()).unwrap(); + + assert_eq!(sample.congestion_control(), CongestionControl::Block); + assert_eq!(sample.priority(), Priority::DataLow); + assert!(sample.express()); + assert_eq!(sample.reliability(), Reliability::Reliable); + + // Session API - non-overwritten DELETE + ztimeout!(session1 + .delete("test/qos/no_overwrite") + .congestion_control(CongestionControl::Block) + .priority(Priority::DataLow) + .express(true) + .reliability(Reliability::Reliable)) + .unwrap(); + let sample = ztimeout!(subscriber.recv_async()).unwrap(); + + assert_eq!(sample.congestion_control(), CongestionControl::Block); + assert_eq!(sample.priority(), Priority::DataLow); + assert!(sample.express()); + assert_eq!(sample.reliability(), Reliability::Reliable); + + // Publisher API tests + let overwrite_config_publisher = ztimeout!(session1 .declare_publisher("test/qos/overwritten") .congestion_control(CongestionControl::Block) .priority(Priority::DataLow) .express(true) - .reliability(zenoh::qos::Reliability::Reliable) - .allowed_destination(zenoh::sample::Locality::SessionLocal)) + .reliability(Reliability::Reliable) + .allowed_destination(Locality::SessionLocal)) .unwrap(); let no_overwrite_config_publisher = ztimeout!(session1 @@ -114,12 +183,10 @@ async fn qos_pubsub_overwrite_config() { .congestion_control(CongestionControl::Block) .priority(Priority::DataLow) .express(true) - .reliability(zenoh::qos::Reliability::Reliable)) + .reliability(Reliability::Reliable)) .unwrap(); - let subscriber = ztimeout!(session2.declare_subscriber("test/qos/**")).unwrap(); - tokio::time::sleep(SLEEP).await; - + // PublisherBuilder API - overwritten PUT ztimeout!(overwrite_config_publisher.put("qos")).unwrap(); let sample = ztimeout!(subscriber.recv_async()).unwrap(); @@ -128,6 +195,7 @@ async fn qos_pubsub_overwrite_config() { assert!(!sample.express()); assert_eq!(sample.reliability(), Reliability::BestEffort); + // PublisherBuilder API - overwritten DELETE ztimeout!(overwrite_config_publisher.delete()).unwrap(); let sample = ztimeout!(subscriber.recv_async()).unwrap(); @@ -136,6 +204,7 @@ async fn qos_pubsub_overwrite_config() { assert!(!sample.express()); assert_eq!(sample.reliability(), Reliability::BestEffort); + // PublisherBuilder API - non-overwritten PUT ztimeout!(no_overwrite_config_publisher.put("qos")).unwrap(); let sample = ztimeout!(subscriber.recv_async()).unwrap(); @@ -144,6 +213,7 @@ async fn qos_pubsub_overwrite_config() { assert!(sample.express()); assert_eq!(sample.reliability(), Reliability::Reliable); + // PublisherBuilder API - non-overwritten DELETE ztimeout!(no_overwrite_config_publisher.delete()).unwrap(); let sample = ztimeout!(subscriber.recv_async()).unwrap(); From 77b9929b2583fd638f9e8f39173b7bc74d4a6994 Mon Sep 17 00:00:00 2001 From: Oussama Teffahi Date: Mon, 2 Dec 2024 18:49:48 +0100 Subject: [PATCH 22/27] Fix case-sensitive parameter --- DEFAULT_CONFIG.json5 | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/DEFAULT_CONFIG.json5 b/DEFAULT_CONFIG.json5 index 7d8eb79118..6422759952 100644 --- a/DEFAULT_CONFIG.json5 +++ b/DEFAULT_CONFIG.json5 @@ -195,7 +195,7 @@ // /// Configurations that will be applied on the publisher. // /// Options that are supplied here will overwrite the configuration given in Zenoh API // config: { - // congestion_control: "Block", + // congestion_control: "block", // priority: "data_high", // express: true, // reliability: "best_effort", From 02fcd2e803ff8d1093bfd9de5abd979e46f798da Mon Sep 17 00:00:00 2001 From: Oussama Teffahi Date: Mon, 2 Dec 2024 18:58:39 +0100 Subject: [PATCH 23/27] Mark destination QoS config as unstable --- commons/zenoh-config/src/qos.rs | 1 + zenoh/src/api/builders/publisher.rs | 5 +++++ 2 files changed, 6 insertions(+) diff --git a/commons/zenoh-config/src/qos.rs b/commons/zenoh-config/src/qos.rs index 089ad4006f..32623ca345 100644 --- a/commons/zenoh-config/src/qos.rs +++ b/commons/zenoh-config/src/qos.rs @@ -65,6 +65,7 @@ pub struct PublisherQoSConfig { pub express: Option, #[cfg(feature = "unstable")] pub reliability: Option, + #[cfg(feature = "unstable")] pub allowed_destination: Option, } diff --git a/zenoh/src/api/builders/publisher.rs b/zenoh/src/api/builders/publisher.rs index bc28a45e0e..edc4c476a4 100644 --- a/zenoh/src/api/builders/publisher.rs +++ b/zenoh/src/api/builders/publisher.rs @@ -276,6 +276,7 @@ pub(crate) struct PublicationOverwrittenQoS { pub(crate) express: bool, #[cfg(feature = "unstable")] pub(crate) reliability: bool, + #[cfg(feature = "unstable")] pub(crate) destination: bool, } @@ -416,16 +417,20 @@ impl<'a, 'b> PublisherBuilder<'a, 'b> { .reliability .map(|r| r.into()) .unwrap_or(Reliability::DEFAULT), + #[cfg(feature = "unstable")] destination: qos_overwrites .allowed_destination .map(|d| d.into()) .unwrap_or(Locality::default()), + #[cfg(not(feature = "unstable"))] + destination: Locality::default(), qos_overwrites: PublicationOverwrittenQoS { congestion_control: qos_overwrites.congestion_control.is_some(), priority: qos_overwrites.priority.is_some(), express: qos_overwrites.express.is_some(), #[cfg(feature = "unstable")] reliability: qos_overwrites.reliability.is_some(), + #[cfg(feature = "unstable")] destination: qos_overwrites.allowed_destination.is_some(), }, } From 15273a7e42bfa7d67875a0a90c7c7906e5599d13 Mon Sep 17 00:00:00 2001 From: Oussama Teffahi Date: Tue, 3 Dec 2024 11:08:19 +0100 Subject: [PATCH 24/27] Apply QoS overwrites at resolution of PublisherBuilder and PublicationBuilder --- zenoh/src/api/builders/publisher.rs | 94 ++++++++--------------------- zenoh/src/api/session.rs | 12 +++- 2 files changed, 37 insertions(+), 69 deletions(-) diff --git a/zenoh/src/api/builders/publisher.rs b/zenoh/src/api/builders/publisher.rs index edc4c476a4..163c734a22 100644 --- a/zenoh/src/api/builders/publisher.rs +++ b/zenoh/src/api/builders/publisher.rs @@ -207,7 +207,8 @@ impl Resolvable for PublicationBuilder { impl Wait for PublicationBuilder, PublicationBuilderPut> { #[inline] - fn wait(self) -> ::To { + fn wait(mut self) -> ::To { + self.publisher = self.publisher.apply_qos_overwrites(); self.publisher.session.0.resolve_put( &self.publisher.key_expr?, self.kind.payload, @@ -229,7 +230,8 @@ impl Wait for PublicationBuilder, PublicationBuilderPut impl Wait for PublicationBuilder, PublicationBuilderDelete> { #[inline] - fn wait(self) -> ::To { + fn wait(mut self) -> ::To { + self.publisher = self.publisher.apply_qos_overwrites(); self.publisher.session.0.resolve_put( &self.publisher.key_expr?, ZBytes::new(), @@ -267,19 +269,6 @@ impl IntoFuture for PublicationBuilder, PublicationBuil } } -/// Marks which [`PublisherBuilder`] QoS configurations were overwritten by Zenoh config. -/// Associated [`PublisherBuilder`] methods calls will not modify overwritten configurations -#[derive(Clone, Copy, Debug)] -pub(crate) struct PublicationOverwrittenQoS { - pub(crate) congestion_control: bool, - pub(crate) priority: bool, - pub(crate) express: bool, - #[cfg(feature = "unstable")] - pub(crate) reliability: bool, - #[cfg(feature = "unstable")] - pub(crate) destination: bool, -} - /// A builder for initializing a [`Publisher`]. /// /// # Examples @@ -308,7 +297,6 @@ pub struct PublisherBuilder<'a, 'b> { #[cfg(feature = "unstable")] pub(crate) reliability: Reliability, pub(crate) destination: Locality, - pub(crate) qos_overwrites: PublicationOverwrittenQoS, } impl Clone for PublisherBuilder<'_, '_> { @@ -326,7 +314,6 @@ impl Clone for PublisherBuilder<'_, '_> { #[cfg(feature = "unstable")] reliability: self.reliability, destination: self.destination, - qos_overwrites: self.qos_overwrites, } } } @@ -336,22 +323,16 @@ impl QoSBuilderTrait for PublisherBuilder<'_, '_> { /// Changes the [`crate::qos::CongestionControl`] to apply when routing the data. #[inline] fn congestion_control(self, congestion_control: CongestionControl) -> Self { - match self.qos_overwrites.congestion_control { - true => self, - false => Self { - congestion_control, - ..self - }, + Self { + congestion_control, + ..self } } /// Changes the [`crate::qos::Priority`] of the written data. #[inline] fn priority(self, priority: Priority) -> Self { - match self.qos_overwrites.priority { - true => self, - false => Self { priority, ..self }, - } + Self { priority, ..self } } /// Changes the Express policy to apply when routing the data. @@ -360,24 +341,18 @@ impl QoSBuilderTrait for PublisherBuilder<'_, '_> { /// This usually has a positive impact on latency but negative impact on throughput. #[inline] fn express(self, is_express: bool) -> Self { - match self.qos_overwrites.express { - true => self, - false => Self { is_express, ..self }, - } + Self { is_express, ..self } } } impl<'a, 'b> PublisherBuilder<'a, 'b> { - pub fn new(session: &'a Session, key_expr: TryIntoKeyExpr) -> Self - where - TryIntoKeyExpr: TryInto>, - >>::Error: Into, - { - let maybe_key_expr = key_expr.try_into().map_err(Into::into); + /// Looks up if any configured QoS overwrites apply on the builder's key expression. + /// Returns a new builder with the overwritten QoS parameters. + pub(crate) fn apply_qos_overwrites(self) -> Self { let mut qos_overwrites = PublisherQoSConfig::default(); - if let Ok(key_expr) = &maybe_key_expr { + if let Ok(key_expr) = &self.key_expr { // get overwritten builder - let state = zread!(session.0.state); + let state = zread!(self.session.0.state); let nodes_including = state .publisher_qos_tree .nodes_including(key_expr) @@ -400,39 +375,26 @@ impl<'a, 'b> PublisherBuilder<'a, 'b> { } Self { - session, - key_expr: maybe_key_expr, - encoding: Encoding::default(), congestion_control: qos_overwrites .congestion_control .map(|cc| cc.into()) - .unwrap_or(CongestionControl::DEFAULT), + .unwrap_or(self.congestion_control), priority: qos_overwrites .priority .map(|p| p.into()) - .unwrap_or(Priority::DEFAULT), - is_express: qos_overwrites.express.unwrap_or(false), + .unwrap_or(self.priority), + is_express: qos_overwrites.express.unwrap_or(self.is_express), #[cfg(feature = "unstable")] reliability: qos_overwrites .reliability .map(|r| r.into()) - .unwrap_or(Reliability::DEFAULT), + .unwrap_or(self.reliability), #[cfg(feature = "unstable")] destination: qos_overwrites .allowed_destination .map(|d| d.into()) - .unwrap_or(Locality::default()), - #[cfg(not(feature = "unstable"))] - destination: Locality::default(), - qos_overwrites: PublicationOverwrittenQoS { - congestion_control: qos_overwrites.congestion_control.is_some(), - priority: qos_overwrites.priority.is_some(), - express: qos_overwrites.express.is_some(), - #[cfg(feature = "unstable")] - reliability: qos_overwrites.reliability.is_some(), - #[cfg(feature = "unstable")] - destination: qos_overwrites.allowed_destination.is_some(), - }, + .unwrap_or(self.destination), + ..self } } @@ -443,9 +405,7 @@ impl<'a, 'b> PublisherBuilder<'a, 'b> { #[zenoh_macros::unstable] #[inline] pub fn allowed_destination(mut self, destination: Locality) -> Self { - if !self.qos_overwrites.destination { - self.destination = destination; - } + self.destination = destination; self } @@ -457,12 +417,9 @@ impl<'a, 'b> PublisherBuilder<'a, 'b> { #[zenoh_macros::unstable] #[inline] pub fn reliability(self, reliability: Reliability) -> Self { - match self.qos_overwrites.reliability { - true => self, - false => Self { - reliability, - ..self - }, + Self { + reliability, + ..self } } } @@ -472,7 +429,8 @@ impl<'b> Resolvable for PublisherBuilder<'_, 'b> { } impl Wait for PublisherBuilder<'_, '_> { - fn wait(self) -> ::To { + fn wait(mut self) -> ::To { + self = self.apply_qos_overwrites(); let mut key_expr = self.key_expr?; if !key_expr.is_fully_optimized(&self.session.0) { let session_id = self.session.0.id; diff --git a/zenoh/src/api/session.rs b/zenoh/src/api/session.rs index 46b56da815..09dc267c2a 100644 --- a/zenoh/src/api/session.rs +++ b/zenoh/src/api/session.rs @@ -830,7 +830,17 @@ impl Session { TryIntoKeyExpr: TryInto>, >>::Error: Into, { - PublisherBuilder::new(self, key_expr) + PublisherBuilder { + session: self, + key_expr: key_expr.try_into().map_err(Into::into), + encoding: Encoding::default(), + congestion_control: CongestionControl::DEFAULT, + priority: Priority::DEFAULT, + is_express: false, + #[cfg(feature = "unstable")] + reliability: Reliability::DEFAULT, + destination: Locality::default(), + } } /// Obtain a [`Liveliness`] struct tied to this Zenoh [`Session`]. From f74fb188484f624949d377b1c006fb478ddb78ce Mon Sep 17 00:00:00 2001 From: Oussama Teffahi Date: Tue, 3 Dec 2024 11:31:24 +0100 Subject: [PATCH 25/27] Remove impl lifetimes --- zenoh/src/api/builders/publisher.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zenoh/src/api/builders/publisher.rs b/zenoh/src/api/builders/publisher.rs index 1a73278d56..54070ad607 100644 --- a/zenoh/src/api/builders/publisher.rs +++ b/zenoh/src/api/builders/publisher.rs @@ -345,7 +345,7 @@ impl QoSBuilderTrait for PublisherBuilder<'_, '_> { } } -impl<'a, 'b> PublisherBuilder<'a, 'b> { +impl PublisherBuilder<'_, '_> { /// Looks up if any configured QoS overwrites apply on the builder's key expression. /// Returns a new builder with the overwritten QoS parameters. pub(crate) fn apply_qos_overwrites(self) -> Self { From 1d6af814eae920a493c0a80d7bac67f60c8cc056 Mon Sep 17 00:00:00 2001 From: Oussama Teffahi Date: Tue, 3 Dec 2024 11:57:53 +0100 Subject: [PATCH 26/27] Remove unicity check of keyexpr in qos config --- commons/zenoh-config/src/qos.rs | 38 ++------------------------------- 1 file changed, 2 insertions(+), 36 deletions(-) diff --git a/commons/zenoh-config/src/qos.rs b/commons/zenoh-config/src/qos.rs index 32623ca345..6052546f19 100644 --- a/commons/zenoh-config/src/qos.rs +++ b/commons/zenoh-config/src/qos.rs @@ -1,50 +1,16 @@ -use std::collections::HashSet; - -use serde::{Deserialize, Deserializer, Serialize}; +use serde::{Deserialize, Serialize}; use zenoh_keyexpr::keyexpr_tree::{IKeyExprTreeMut, KeBoxTree}; use zenoh_protocol::core::{key_expr::OwnedKeyExpr, CongestionControl, Reliability}; #[derive(Debug, Deserialize, Default, Serialize, Clone)] -#[serde(remote = "Self")] pub struct PublisherQoSConfList(pub(crate) Vec); -impl<'de> Deserialize<'de> for PublisherQoSConfList { - fn deserialize(deserializer: D) -> Result - where - D: Deserializer<'de>, - { - let put_qos_list = PublisherQoSConfList::deserialize(deserializer)?; - // check for invariant: each key_expr should be unique - let mut key_set = HashSet::new(); - for put_qos in &put_qos_list.0 { - for key_expr in &put_qos.key_exprs { - if !key_set.insert(key_expr) { - return Err(format!( - "duplicated key_expr '{key_expr}' found in publisher builders config" - )) - .map_err(serde::de::Error::custom); - } - } - } - Ok(put_qos_list) - } -} - -impl Serialize for PublisherQoSConfList { - fn serialize(&self, serializer: S) -> Result - where - S: serde::Serializer, - { - PublisherQoSConfList::serialize(self, serializer) - } -} - impl From for KeBoxTree { fn from(value: PublisherQoSConfList) -> KeBoxTree { let mut tree = KeBoxTree::new(); for conf in value.0 { for key_expr in conf.key_exprs { - // key_expr unicity is checked at deserialization + // NOTE: we don't check key_expr unicity tree.insert(&key_expr, conf.config.clone()); } } From a04790291a58bfc316e9cc46ba00a28d1fbb6038 Mon Sep 17 00:00:00 2001 From: Oussama Teffahi Date: Tue, 3 Dec 2024 16:01:59 +0100 Subject: [PATCH 27/27] Add copyright header --- commons/zenoh-config/src/qos.rs | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/commons/zenoh-config/src/qos.rs b/commons/zenoh-config/src/qos.rs index 6052546f19..526c02d175 100644 --- a/commons/zenoh-config/src/qos.rs +++ b/commons/zenoh-config/src/qos.rs @@ -1,3 +1,16 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// use serde::{Deserialize, Serialize}; use zenoh_keyexpr::keyexpr_tree::{IKeyExprTreeMut, KeBoxTree}; use zenoh_protocol::core::{key_expr::OwnedKeyExpr, CongestionControl, Reliability};