Skip to content

Commit

Permalink
mark querier related features as unstable
Browse files Browse the repository at this point in the history
  • Loading branch information
DenisBiryukov91 committed Dec 2, 2024
1 parent 2882f8f commit 1ba9acc
Show file tree
Hide file tree
Showing 10 changed files with 64 additions and 14 deletions.
5 changes: 5 additions & 0 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ required-features = ["unstable", "shared-memory"]
name = "z_pull"
path = "examples/z_pull.rs"

[[example]]
name = "z_querier"
path = "examples/z_querier.rs"
required-features = ["unstable"]

[[example]]
name = "z_queryable"
path = "examples/z_queryable.rs"
Expand Down
18 changes: 18 additions & 0 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,24 @@
z_get -s 'demo/**'
```

### z_querier

Continuously sends query messages for a selector.
The queryables with a matching path or selector (for instance [z_queryable](#z_queryable) and [z_storage](#z_storage))
will receive these queries and reply with paths/values that will be received by the querier.

Typical usage:

```bash
z_querier
```

or

```bash
z_querier -s 'demo/**'
```

### z_queryable

Declares a queryable function with a path.
Expand Down
1 change: 1 addition & 0 deletions zenoh/src/api/builders/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
pub(crate) mod info;
pub(crate) mod matching_listener;
pub(crate) mod publisher;
#[cfg(feature = "unstable")]
pub(crate) mod querier;
pub(crate) mod query;
pub(crate) mod queryable;
Expand Down
8 changes: 7 additions & 1 deletion zenoh/src/api/builders/querier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

use std::{
future::{IntoFuture, Ready},
sync::Arc,
Expand Down Expand Up @@ -71,6 +70,7 @@ use crate::{
/// }
/// # }
/// ```
#[zenoh_macros::unstable]
#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"]
#[derive(Debug)]
pub struct QuerierBuilder<'a, 'b> {
Expand Down Expand Up @@ -215,6 +215,7 @@ impl IntoFuture for QuerierBuilder<'_, '_> {
/// }
/// # }
/// ```
#[zenoh_macros::unstable]
#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"]
#[derive(Debug)]
pub struct QuerierGetBuilder<'a, 'b, Handler> {
Expand Down Expand Up @@ -280,6 +281,7 @@ impl<'a, 'b> QuerierGetBuilder<'a, 'b, DefaultHandler> {
/// .unwrap();
/// # }
/// ```
#[zenoh_macros::unstable]
#[inline]
pub fn callback<F>(self, callback: F) -> QuerierGetBuilder<'a, 'b, Callback<Reply>>
where
Expand Down Expand Up @@ -313,6 +315,7 @@ impl<'a, 'b> QuerierGetBuilder<'a, 'b, DefaultHandler> {
/// .unwrap();
/// # }
/// ```
#[zenoh_macros::unstable]
#[inline]
pub fn callback_mut<F>(self, callback: F) -> QuerierGetBuilder<'a, 'b, Callback<Reply>>
where
Expand Down Expand Up @@ -345,6 +348,7 @@ impl<'a, 'b> QuerierGetBuilder<'a, 'b, DefaultHandler> {
/// }
/// # }
/// ```
#[zenoh_macros::unstable]
#[inline]
pub fn with<Handler>(self, handler: Handler) -> QuerierGetBuilder<'a, 'b, Handler>
where
Expand Down Expand Up @@ -373,6 +377,7 @@ impl<'a, 'b> QuerierGetBuilder<'a, 'b, DefaultHandler> {
impl<'b, Handler> QuerierGetBuilder<'_, 'b, Handler> {
/// Set the query payload.
#[inline]
#[zenoh_macros::unstable]
pub fn payload<IntoZBytes>(mut self, payload: IntoZBytes) -> Self
where
IntoZBytes: Into<ZBytes>,
Expand All @@ -385,6 +390,7 @@ impl<'b, Handler> QuerierGetBuilder<'_, 'b, Handler> {

/// Set the query selector parameters.
#[inline]
#[zenoh_macros::unstable]
pub fn parameters<P>(mut self, parameters: P) -> Self
where
P: Into<Parameters<'b>>,
Expand Down
4 changes: 2 additions & 2 deletions zenoh/src/api/matching.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,9 @@ pub(crate) struct MatchingListenerInner {
}

/// A listener that sends notifications when the [`MatchingStatus`] of a
/// publisher changes.
/// corresponding Zenoh entity changes.
///
/// Callback matching listeners will run in background until the publisher is undeclared,
/// Callback matching listeners will run in background until the corresponding Zenoh entity is undeclared,
/// or until it is undeclared.
/// On the other hand, matching listener with a handler are automatically undeclared when dropped.
///
Expand Down
1 change: 1 addition & 0 deletions zenoh/src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ pub(crate) mod matching;
#[cfg(feature = "plugins")]
pub(crate) mod plugins;
pub(crate) mod publisher;
#[cfg(feature = "unstable")]
pub(crate) mod querier;
pub(crate) mod query;
pub(crate) mod queryable;
Expand Down
8 changes: 7 additions & 1 deletion zenoh/src/api/querier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ pub(crate) struct QuerierState {
/// let replies = querier.get().await.unwrap();
/// # }
/// ```
#[zenoh_macros::unstable]
#[derive(Debug)]
pub struct Querier<'a> {
pub(crate) session: WeakSession,
Expand Down Expand Up @@ -119,25 +120,28 @@ impl<'a> Querier<'a> {
}

#[inline]
#[zenoh_macros::unstable]
pub fn key_expr(&self) -> &KeyExpr<'a> {
&self.key_expr
}

/// Get the `congestion_control` applied when routing the data.
#[inline]
#[zenoh_macros::unstable]
pub fn congestion_control(&self) -> CongestionControl {
self.qos.congestion_control()
}

/// Get the priority of the written data.
#[inline]
#[zenoh_macros::unstable]
pub fn priority(&self) -> Priority {
self.qos.priority()
}

/// Get type of queryables that can reply to this querier
#[zenoh_macros::unstable]
#[inline]
#[zenoh_macros::unstable]
pub fn accept_replies(&self) -> ReplyKeyExpr {
self.accept_replies
}
Expand All @@ -155,6 +159,7 @@ impl<'a> Querier<'a> {
/// # }
/// ```
#[inline]
#[zenoh_macros::unstable]
pub fn get(&self) -> QuerierGetBuilder<'_, '_, DefaultHandler> {
QuerierGetBuilder {
querier: self,
Expand All @@ -179,6 +184,7 @@ impl<'a> Querier<'a> {
/// querier.undeclare().await.unwrap();
/// # }
/// ```
#[zenoh_macros::unstable]
pub fn undeclare(self) -> impl Resolve<ZResult<()>> + 'a {
UndeclarableSealed::undeclare_inner(self, ())
}
Expand Down
10 changes: 8 additions & 2 deletions zenoh/src/api/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,10 @@ use zenoh_task::TaskController;
use crate::api::selector::ZenohParameters;
#[cfg(feature = "unstable")]
use crate::api::{
builders::querier::QuerierBuilder,
liveliness::Liveliness,
matching::{MatchingListenerState, MatchingStatus, MatchingStatusType},
querier::QuerierState,
query::{LivelinessQueryState, ReplyKeyExpr},
sample::SourceInfo,
};
Expand All @@ -84,7 +86,6 @@ use crate::{
PublicationBuilderDelete, PublicationBuilderPut, PublisherBuilder,
SessionDeleteBuilder, SessionPutBuilder,
},
querier::QuerierBuilder,
query::SessionGetBuilder,
queryable::QueryableBuilder,
session::OpenBuilder,
Expand All @@ -96,7 +97,6 @@ use crate::{
info::SessionInfo,
key_expr::{KeyExpr, KeyExprInner},
publisher::{Priority, PublisherState},
querier::QuerierState,
query::{ConsolidationMode, QueryConsolidation, QueryState, QueryTarget, Reply},
queryable::{Query, QueryInner, QueryableState},
sample::{DataInfo, DataInfoIntoSample, Locality, QoS, Sample, SampleKind},
Expand Down Expand Up @@ -131,6 +131,7 @@ pub(crate) struct SessionState {
#[cfg(feature = "unstable")]
pub(crate) remote_subscribers: HashMap<SubscriberId, KeyExpr<'static>>,
pub(crate) publishers: HashMap<Id, PublisherState>,
#[cfg(feature = "unstable")]
pub(crate) queriers: HashMap<Id, QuerierState>,
#[cfg(feature = "unstable")]
pub(crate) remote_tokens: HashMap<TokenId, KeyExpr<'static>>,
Expand Down Expand Up @@ -165,6 +166,7 @@ impl SessionState {
#[cfg(feature = "unstable")]
remote_subscribers: HashMap::new(),
publishers: HashMap::new(),
#[cfg(feature = "unstable")]
queriers: HashMap::new(),
#[cfg(feature = "unstable")]
remote_tokens: HashMap::new(),
Expand Down Expand Up @@ -304,6 +306,7 @@ impl SessionState {
}
}

#[cfg(feature = "unstable")]
fn register_querier<'a>(
&mut self,
id: EntityId,
Expand Down Expand Up @@ -972,6 +975,7 @@ impl Session {
/// let replies = querier.get().await.unwrap();
/// # }
/// ```
#[zenoh_macros::unstable]
pub fn declare_querier<'b, TryIntoKeyExpr>(
&self,
key_expr: TryIntoKeyExpr,
Expand Down Expand Up @@ -1426,6 +1430,7 @@ impl SessionInner {
}
}

#[cfg(feature = "unstable")]
pub(crate) fn declare_querier_inner(
&self,
key_expr: KeyExpr,
Expand All @@ -1451,6 +1456,7 @@ impl SessionInner {
Ok(id)
}

#[cfg(feature = "unstable")]
pub(crate) fn undeclare_querier_inner(&self, pid: Id) -> ZResult<()> {
let mut state = zwrite!(self.state);
let Ok(primitives) = state.primitives() else {
Expand Down
12 changes: 7 additions & 5 deletions zenoh/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,6 @@ pub mod session {
builders::{
info::{PeersZenohIdBuilder, RoutersZenohIdBuilder, ZenohIdBuilder},
publisher::{SessionDeleteBuilder, SessionPutBuilder},
querier::QuerierBuilder,
query::SessionGetBuilder,
session::OpenBuilder,
},
Expand Down Expand Up @@ -275,19 +274,22 @@ pub mod query {

#[zenoh_macros::internal]
pub use crate::api::queryable::ReplySample;
#[zenoh_macros::unstable]
pub use crate::api::{
builders::querier::{QuerierBuilder, QuerierGetBuilder},
querier::Querier,
query::ReplyKeyExpr,
selector::ZenohParameters,
};
pub use crate::api::{
builders::{
querier::QuerierGetBuilder,
queryable::QueryableBuilder,
reply::{ReplyBuilder, ReplyBuilderDelete, ReplyBuilderPut, ReplyErrBuilder},
},
querier::Querier,
query::{ConsolidationMode, QueryConsolidation, QueryTarget, Reply, ReplyError},
queryable::{Query, Queryable, QueryableUndeclaration},
selector::Selector,
};
#[zenoh_macros::unstable]
pub use crate::api::{query::ReplyKeyExpr, selector::ZenohParameters};
}

#[zenoh_macros::unstable]
Expand Down
11 changes: 8 additions & 3 deletions zenoh/tests/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ use std::{
use zenoh::internal::runtime::{Runtime, RuntimeBuilder};
#[cfg(feature = "unstable")]
use zenoh::qos::Reliability;
use zenoh::{
key_expr::KeyExpr, qos::CongestionControl, query::Querier, sample::SampleKind, Session,
};
#[cfg(feature = "unstable")]
use zenoh::query::Querier;
use zenoh::{key_expr::KeyExpr, qos::CongestionControl, sample::SampleKind, Session};

use zenoh_core::ztimeout;
#[cfg(not(feature = "unstable"))]
use zenoh_protocol::core::Reliability;
Expand Down Expand Up @@ -177,10 +178,12 @@ impl HasGet for SessionGetter<'_, '_> {
}
}

#[cfg(feature = "unstable")]
struct QuerierGetter<'a> {
querier: Querier<'a>,
}

#[cfg(feature = "unstable")]
impl HasGet for QuerierGetter<'_> {
async fn get(&self, params: &str) -> zenoh::handlers::FifoChannelHandler<zenoh::query::Reply> {
ztimeout!(self.querier.get().parameters(params)).unwrap()
Expand Down Expand Up @@ -312,6 +315,7 @@ async fn test_session_getrep(peer01: &Session, peer02: &Session, reliability: Re
))
}

#[cfg(feature = "unstable")]
async fn test_session_qrrep(peer01: &Session, peer02: &Session, reliability: Reliability) {
let key_expr = "test/session";
println!("[QQ][00c] Declaring Querier on peer02 session");
Expand All @@ -333,6 +337,7 @@ async fn zenoh_session_unicast() {
let (peer01, peer02) = open_session_unicast(&["tcp/127.0.0.1:17447"]).await;
test_session_pubsub(&peer01, &peer02, Reliability::Reliable).await;
test_session_getrep(&peer01, &peer02, Reliability::Reliable).await;
#[cfg(feature = "unstable")]
test_session_qrrep(&peer01, &peer02, Reliability::Reliable).await;
close_session(peer01, peer02).await;
}
Expand Down

0 comments on commit 1ba9acc

Please sign in to comment.