diff --git a/zenoh/src/net/runtime/adminspace.rs b/zenoh/src/net/runtime/adminspace.rs index c63d900094..3ec7b511d9 100644 --- a/zenoh/src/net/runtime/adminspace.rs +++ b/zenoh/src/net/runtime/adminspace.rs @@ -14,22 +14,25 @@ use super::routing::face::Face; use super::Runtime; use crate::key_expr::KeyExpr; use crate::plugins::sealed as plugins; +use crate::prelude::sync::Sample; +use crate::queryable::Query; +use crate::queryable::QueryInner; +use crate::value::Value; use async_std::task; -use futures::future::{BoxFuture, FutureExt}; use log::{error, trace}; use serde_json::json; use std::collections::HashMap; +use std::convert::TryFrom; use std::convert::TryInto; use std::sync::Arc; use std::sync::Mutex; use zenoh_buffers::{SplitBuffer, ZBuf}; use zenoh_config::ValidatedMap; -use zenoh_config::WhatAmI; +use zenoh_core::SyncResolve; use zenoh_protocol::{ core::{ - key_expr::OwnedKeyExpr, Channel, CongestionControl, ConsolidationMode, Encoding, - KnownEncoding, QueryTarget, QueryableInfo, SampleKind, SubInfo, WireExpr, ZInt, ZenohId, - EMPTY_EXPR_ID, + key_expr::OwnedKeyExpr, Channel, CongestionControl, ConsolidationMode, KnownEncoding, + QueryTarget, QueryableInfo, SampleKind, SubInfo, WireExpr, ZInt, ZenohId, EMPTY_EXPR_ID, }, zenoh::{DataInfo, QueryBody, RoutingContext}, }; @@ -43,17 +46,13 @@ pub struct AdminContext { version: String, } -type Handler = Box< - dyn for<'a> Fn(&'a AdminContext, &'a KeyExpr<'a>, &'a str) -> BoxFuture<'a, (ZBuf, Encoding)> - + Send - + Sync, ->; +type Handler = Arc; pub struct AdminSpace { zid: ZenohId, primitives: Mutex>>, mappings: Mutex>, - handlers: HashMap>, + handlers: HashMap, context: Arc, } @@ -68,27 +67,37 @@ impl AdminSpace { let zid_str = runtime.zid.to_string(); let root_key: OwnedKeyExpr = format!("@/router/{zid_str}").try_into().unwrap(); - let mut handlers: HashMap<_, Arc> = HashMap::new(); + let mut handlers: HashMap<_, Handler> = HashMap::new(); + handlers.insert(root_key.clone(), Arc::new(router_data)); handlers.insert( - root_key.clone(), - Arc::new(Box::new(|context, key, args| { - router_data(context, key, args).boxed() - })), + format!("@/router/{zid_str}/linkstate/routers") + .try_into() + .unwrap(), + Arc::new(routers_linkstate_data), + ); + handlers.insert( + format!("@/router/{zid_str}/linkstate/peers") + .try_into() + .unwrap(), + Arc::new(peers_linkstate_data), ); handlers.insert( - [&root_key, "/linkstate/routers"] - .concat() + format!("@/router/{zid_str}/subscriber/**") .try_into() .unwrap(), - Arc::new(Box::new(|context, key, args| { - linkstate_data(context, WhatAmI::Router, key, args).boxed() - })), + Arc::new(subscribers_data), ); handlers.insert( - [&root_key, "/linkstate/peers"].concat().try_into().unwrap(), - Arc::new(Box::new(|context, key, args| { - linkstate_data(context, WhatAmI::Peer, key, args).boxed() - })), + format!("@/router/{zid_str}/queryable/**") + .try_into() + .unwrap(), + Arc::new(queryables_data), + ); + handlers.insert( + format!("@/router/{zid_str}/status/plugins/**") + .try_into() + .unwrap(), + Arc::new(plugins_status), ); let mut active_plugins = plugins_mgr @@ -367,7 +376,7 @@ impl Primitives for AdminSpace { qid: ZInt, target: QueryTarget, _consolidation: ConsolidationMode, - _body: Option, + body: Option, _routing_context: Option, ) { trace!( @@ -386,10 +395,7 @@ impl Primitives for AdminSpace { "Received GET on '{}' but adminspace.permissions.read=false in configuration", key_expr ); - // router is not re-entrant - task::spawn(async move { - primitives.send_reply_final(qid); - }); + primitives.send_reply_final(qid); return; } } @@ -398,75 +404,31 @@ impl Primitives for AdminSpace { Ok(key_expr) => key_expr.into_owned(), Err(e) => { log::error!("Unknown KeyExpr: {}", e); - // router is not re-entrant - task::spawn(async move { - primitives.send_reply_final(qid); - }); + primitives.send_reply_final(qid); return; } }; let zid = self.zid; - let plugin_key: OwnedKeyExpr = format!("@/router/{}/status/plugins/**", &zid) - .try_into() - .unwrap(); - let context = self.context.clone(); - let mut matching_handlers = vec![]; - let ask_plugins = plugin_key.intersects(&key_expr); + let parameters = parameters.to_owned(); + let query = Query { + inner: Arc::new(QueryInner { + key_expr: key_expr.clone(), + parameters, + value: body.map(|b| { + Value::from(b.payload).encoding(b.data_info.encoding.unwrap_or_default()) + }), + qid, + zid, + primitives, + }), + }; + for (key, handler) in &self.handlers { if key_expr.intersects(key) { - matching_handlers.push((key.clone(), handler.clone())); + handler(&self.context, query.clone()); } } - let parameters = parameters.to_owned(); - - // router is not re-entrant - task::spawn(async move { - let handler_tasks = futures::future::join_all(matching_handlers.into_iter().map( - |(key, handler)| async { - let handler = handler; - let (payload, encoding) = handler(&context, &key_expr, ¶meters).await; - let data_info = DataInfo { - encoding: Some(encoding), - ..Default::default() - }; - - primitives.send_reply_data( - qid, - zid, - String::from(key).into(), - Some(data_info), - payload, - ); - }, - )); - if ask_plugins { - futures::join!(handler_tasks, async { - let plugin_status = plugins_status(&context, &key_expr, ¶meters).await; - for status in plugin_status { - let plugins::Response { key, mut value } = status; - zenoh_config::sift_privates(&mut value); - let payload: Vec = serde_json::to_vec(&value).unwrap(); - let data_info = DataInfo { - encoding: Some(KnownEncoding::AppJson.into()), - ..Default::default() - }; - - primitives.send_reply_data( - qid, - zid, - key.into(), - Some(data_info), - payload.into(), - ); - } - }); - } else { - handler_tasks.await; - } - - primitives.send_reply_final(qid); - }); } fn send_reply_data( @@ -512,11 +474,9 @@ impl Primitives for AdminSpace { } } -pub async fn router_data( - context: &AdminContext, - _key: &KeyExpr<'_>, - #[allow(unused_variables)] selector: &str, -) -> (ZBuf, Encoding) { +fn router_data(context: &AdminContext, query: Query) { + let reply_key: OwnedKeyExpr = format!("@/router/{}", context.zid_str).try_into().unwrap(); + let transport_mgr = context.runtime.manager().clone(); // plugins info @@ -575,79 +535,171 @@ pub async fn router_data( "plugins": plugins, }); log::trace!("AdminSpace router_data: {:?}", json); - ( - ZBuf::from(json.to_string().as_bytes().to_vec()), - KnownEncoding::AppJson.into(), - ) + if let Err(e) = query + .reply(Ok(Sample::new( + reply_key, + Value::from(json.to_string().as_bytes().to_vec()) + .encoding(KnownEncoding::AppJson.into()), + ))) + .res() + { + log::error!("Error sending AdminSpace reply: {:?}", e); + } } -pub async fn linkstate_data( - context: &AdminContext, - net_type: WhatAmI, - _key: &KeyExpr<'_>, - _args: &str, -) -> (ZBuf, Encoding) { +fn routers_linkstate_data(context: &AdminContext, query: Query) { + let reply_key: OwnedKeyExpr = format!("@/router/{}/linkstate/routers", context.zid_str) + .try_into() + .unwrap(); + let tables = zread!(context.runtime.router.tables.tables); - let net = match net_type { - WhatAmI::Router => tables.routers_net.as_ref(), - _ => tables.peers_net.as_ref(), - }; - ( - ZBuf::from( - net.map(|net| net.dot()) - .unwrap_or_else(|| "graph {}".to_string()) - .as_bytes() - .to_vec(), - ), - KnownEncoding::TextPlain.into(), - ) + if let Err(e) = query + .reply(Ok(Sample::new( + reply_key, + Value::from( + tables + .routers_net + .as_ref() + .map(|net| net.dot()) + .unwrap_or_else(|| "graph {}".to_string()) + .as_bytes() + .to_vec(), + ) + .encoding(KnownEncoding::TextPlain.into()), + ))) + .res() + { + log::error!("Error sending AdminSpace reply: {:?}", e); + } +} + +fn peers_linkstate_data(context: &AdminContext, query: Query) { + let reply_key: OwnedKeyExpr = format!("@/router/{}/linkstate/peers", context.zid_str) + .try_into() + .unwrap(); + + let tables = zread!(context.runtime.router.tables.tables); + + if let Err(e) = query + .reply(Ok(Sample::new( + reply_key, + Value::from( + tables + .peers_net + .as_ref() + .map(|net| net.dot()) + .unwrap_or_else(|| "graph {}".to_string()) + .as_bytes() + .to_vec(), + ) + .encoding(KnownEncoding::TextPlain.into()), + ))) + .res() + { + log::error!("Error sending AdminSpace reply: {:?}", e); + } +} + +fn subscribers_data(context: &AdminContext, query: Query) { + let tables = zread!(context.runtime.router.tables.tables); + for sub in tables.router_subs.iter() { + let key = KeyExpr::try_from(format!( + "@/router/{}/subscriber/{}", + context.zid_str, + sub.expr() + )) + .unwrap(); + if query.key_expr().intersects(&key) { + if let Err(e) = query.reply(Ok(Sample::new(key, Value::empty()))).res() { + log::error!("Error sending AdminSpace reply: {:?}", e); + } + } + } +} + +fn queryables_data(context: &AdminContext, query: Query) { + let tables = zread!(context.runtime.router.tables.tables); + for qabl in tables.router_qabls.iter() { + let key = KeyExpr::try_from(format!( + "@/router/{}/queryable/{}", + context.zid_str, + qabl.expr() + )) + .unwrap(); + if query.key_expr().intersects(&key) { + if let Err(e) = query.reply(Ok(Sample::new(key, Value::empty()))).res() { + log::error!("Error sending AdminSpace reply: {:?}", e); + } + } + } } -pub async fn plugins_status( - context: &AdminContext, - key: &KeyExpr<'_>, - args: &str, -) -> Vec { - let selector = key.clone().with_parameters(args); +fn plugins_status(context: &AdminContext, query: Query) { + let selector = query.selector(); let guard = zlock!(context.plugins_mgr); let mut root_key = format!("@/router/{}/status/plugins/", &context.zid_str); - let mut responses = Vec::new(); + for (name, (path, plugin)) in guard.running_plugins() { with_extended_string(&mut root_key, &[name], |plugin_key| { with_extended_string(plugin_key, &["/__path__"], |plugin_path_key| { - if key.intersects(plugin_path_key.as_str().try_into().unwrap()) { - responses.push(plugins::Response { - key: plugin_path_key.clone(), - value: path.into(), - }) + if let Ok(key_expr) = KeyExpr::try_from(plugin_path_key.clone()) { + if query.key_expr().intersects(&key_expr) { + if let Err(e) = query + .reply(Ok(Sample::new( + key_expr, + Value::from(path).encoding(KnownEncoding::AppJson.into()), + ))) + .res() + { + log::error!("Error sending AdminSpace reply: {:?}", e); + } + } + } else { + log::error!("Error: invalid plugin path key {}", plugin_path_key); } }); let matches_plugin = |plugin_status_space: &mut String| { - key.intersects(plugin_status_space.as_str().try_into().unwrap()) + query + .key_expr() + .intersects(plugin_status_space.as_str().try_into().unwrap()) }; if !with_extended_string(plugin_key, &["/**"], matches_plugin) { return; } match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { - plugin.adminspace_getter(&selector, plugin_key) - })) { - Ok(Ok(response)) => responses.extend(response), - Ok(Err(e)) => { - log::error!("Plugin {} bailed from responding to {}: {}", name, key, e) + plugin.adminspace_getter(&selector, plugin_key) + })) { + Ok(Ok(responses)) => { + for response in responses { + if let Ok(key_expr) = KeyExpr::try_from(response.key) { + if let Err(e) = query.reply(Ok(Sample::new( + key_expr, + Value::from(response.value).encoding(KnownEncoding::AppJson.into()), + ))) + .res() + { + log::error!("Error sending AdminSpace reply: {:?}", e); + } + } else { + log::error!("Error: plugin {} replied with an invalid key", plugin_key); } - Err(e) => match e - .downcast_ref::() - .map(|s| s.as_str()) - .or_else(|| e.downcast_ref::<&str>().copied()) - { - Some(e) => log::error!("Plugin {} panicked while responding to {}: {}", name, key, e), - None => log::error!("Plugin {} panicked while responding to {}. The panic message couldn't be recovered.", name, key), - }, } + } + Ok(Err(e)) => { + log::error!("Plugin {} bailed from responding to {}: {}", name, query.key_expr(), e) + } + Err(e) => match e + .downcast_ref::() + .map(|s| s.as_str()) + .or_else(|| e.downcast_ref::<&str>().copied()) + { + Some(e) => log::error!("Plugin {} panicked while responding to {}: {}", name, query.key_expr(), e), + None => log::error!("Plugin {} panicked while responding to {}. The panic message couldn't be recovered.", name, query.key_expr()), + }, + } }); } - responses } fn with_extended_string R>( diff --git a/zenoh/src/queryable.rs b/zenoh/src/queryable.rs index bf1350c165..13337802b2 100644 --- a/zenoh/src/queryable.rs +++ b/zenoh/src/queryable.rs @@ -21,28 +21,38 @@ use crate::query::ReplyKeyExpr; use crate::SessionRef; use crate::Undeclarable; -use futures::FutureExt; use std::fmt; -use std::future::{Future, Ready}; +use std::future::Ready; use std::ops::Deref; -use std::pin::Pin; use std::sync::Arc; -use std::task::{Context, Poll}; use zenoh_core::{AsyncResolve, Resolvable, SyncResolve}; use zenoh_protocol::core::WireExpr; use zenoh_result::ZResult; +use zenoh_transport::Primitives; -/// Structs received by a [`Queryable`](Queryable). -pub struct Query { +pub(crate) struct QueryInner { /// The key expression of this Query. pub(crate) key_expr: KeyExpr<'static>, /// This Query's selector parameters. pub(crate) parameters: String, /// This Query's body. pub(crate) value: Option, - /// The sender to use to send replies to this query. - /// When this sender is dropped, the reply is finalized. - pub(crate) replies_sender: flume::Sender, + + pub(crate) qid: ZInt, + pub(crate) zid: ZenohId, + pub(crate) primitives: Arc, +} + +impl Drop for QueryInner { + fn drop(&mut self) { + self.primitives.send_reply_final(self.qid); + } +} + +/// Structs received by a [`Queryable`](Queryable). +#[derive(Clone)] +pub struct Query { + pub(crate) inner: Arc, } impl Query { @@ -50,27 +60,27 @@ impl Query { #[inline(always)] pub fn selector(&self) -> Selector<'_> { Selector { - key_expr: self.key_expr.clone(), - parameters: (&self.parameters).into(), + key_expr: self.inner.key_expr.clone(), + parameters: (&self.inner.parameters).into(), } } /// The key selector part of this Query. #[inline(always)] pub fn key_expr(&self) -> &KeyExpr<'static> { - &self.key_expr + &self.inner.key_expr } /// This Query's selector parameters. #[inline(always)] pub fn parameters(&self) -> &str { - &self.parameters + &self.inner.parameters } /// This Query's value. #[inline(always)] pub fn value(&self) -> Option<&Value> { - self.value.as_ref() + self.inner.value.as_ref() } /// Sends a reply to this Query. @@ -108,8 +118,8 @@ impl Query { impl fmt::Debug for Query { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("Query") - .field("key_selector", &self.key_expr) - .field("parameters", &self.parameters) + .field("key_selector", &self.inner.key_expr) + .field("parameters", &self.inner.parameters) .finish() } } @@ -119,7 +129,7 @@ impl fmt::Display for Query { f.debug_struct("Query") .field( "selector", - &format!("{}{}", &self.key_expr, &self.parameters), + &format!("{}{}", &self.inner.key_expr, &self.inner.parameters), ) .finish() } @@ -145,53 +155,29 @@ impl SyncResolve for ReplyBuilder<'_> { { bail!("Attempted to reply on `{}`, which does not intersect with query `{}`, despite query only allowing replies on matching key expressions", sample.key_expr, self.query.key_expr()) } - self.query - .replies_sender - .send(sample) - .map_err(|e| zerror!("{}", e).into()) + let (key_expr, payload, data_info) = sample.split(); + self.query.inner.primitives.send_reply_data( + self.query.inner.qid, + self.query.inner.zid, + WireExpr { + scope: 0, + suffix: std::borrow::Cow::Borrowed(key_expr.as_str()), + }, + Some(data_info), + payload, + ); + Ok(()) } Err(_) => Err(zerror!("Replying errors is not yet supported!").into()), } } } -/// The future returned by a [`ReplyBuilder`] when using async. -#[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct ReplyFuture<'a>( - Result, Option>, -); - -impl Future for ReplyFuture<'_> { - type Output = ZResult<()>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match &mut self.get_mut().0 { - Ok(sender) => sender.poll_unpin(cx).map_err(|e| zerror!(e).into()), - Err(e) => Poll::Ready(Err(e - .take() - .unwrap_or_else(|| zerror!("Overpolling of ReplyFuture detected").into()))), - } - } -} - impl<'a> AsyncResolve for ReplyBuilder<'a> { - type Future = ReplyFuture<'a>; + type Future = Ready; fn res_async(self) -> Self::Future { - ReplyFuture(match self.result { - Ok(sample) => { - if !self.query._accepts_any_replies().unwrap_or(false) - && !self.query.key_expr().intersects(&sample.key_expr) - { - Err(Some(zerror!("Attempted to reply on `{}`, which does not intersect with query `{}`, despite query only allowing replies on matching key expressions", sample.key_expr, self.query.key_expr()).into())) - } else { - Ok(self.query.replies_sender.send_async(sample)) - } - } - Err(_) => Err(Some( - zerror!("Replying errors is not yet supported!").into(), - )), - }) + std::future::ready(self.res_sync()) } } diff --git a/zenoh/src/selector.rs b/zenoh/src/selector.rs index 034488ca43..025645612a 100644 --- a/zenoh/src/selector.rs +++ b/zenoh/src/selector.rs @@ -494,8 +494,8 @@ impl<'a> TryFrom<&'a String> for Selector<'a> { impl<'a> From<&'a Query> for Selector<'a> { fn from(q: &'a Query) -> Self { Selector { - key_expr: q.key_expr.clone(), - parameters: (&q.parameters).into(), + key_expr: q.inner.key_expr.clone(), + parameters: (&q.inner.parameters).into(), } } } diff --git a/zenoh/src/session.rs b/zenoh/src/session.rs index df52994789..ed696919f1 100644 --- a/zenoh/src/session.rs +++ b/zenoh/src/session.rs @@ -37,8 +37,6 @@ use crate::SampleKind; use crate::Selector; use crate::Value; use async_std::task; -use flume::bounded; -use futures::StreamExt; use log::{error, trace, warn}; use std::collections::HashMap; use std::convert::TryFrom; @@ -1562,11 +1560,11 @@ impl Session { _consolidation: ConsolidationMode, body: Option, ) { - let (primitives, key_expr, senders) = { + let (primitives, key_expr, callbacks) = { let state = zread!(self.state); match state.wireexpr_to_keyexpr(key_expr, local) { Ok(key_expr) => { - let senders = state + let callbacks = state .queryables .values() .filter( @@ -1592,7 +1590,7 @@ impl Session { ( state.primitives.as_ref().unwrap().clone(), key_expr.into_owned(), - senders, + callbacks, ) } Err(err) => { @@ -1603,54 +1601,29 @@ impl Session { }; let parameters = parameters.to_owned(); - let (rep_sender, rep_receiver) = bounded::(*API_REPLY_EMISSION_CHANNEL_SIZE); let zid = self.runtime.zid; // @TODO build/use prebuilt specific zid - if local { - let this = self.clone(); - task::spawn(async move { - while let Some(sample) = rep_receiver.stream().next().await { - let (key_expr, payload, data_info) = sample.split(); - this.send_reply_data( - qid, - zid, - key_expr.to_wire(&this).to_owned(), - Some(data_info), - payload, - ); - } - this.send_reply_final(qid); - }); - } else { - let this = self.clone(); - task::spawn(async move { - while let Some(sample) = rep_receiver.stream().next().await { - let (key_expr, payload, data_info) = sample.split(); - primitives.send_reply_data( - qid, - zid, - key_expr.to_wire(&this).to_owned(), - Some(data_info), - payload, - ); - } - primitives.send_reply_final(qid); - }); - } - - for req_sender in senders.iter() { - req_sender(Query { - key_expr: key_expr.clone().into_owned(), - parameters: parameters.clone(), - replies_sender: rep_sender.clone(), - value: body.as_ref().map(|b| Value { - payload: b.payload.clone(), - encoding: b.data_info.encoding.as_ref().cloned().unwrap_or_default(), + let query = Query { + inner: Arc::new(QueryInner { + key_expr, + parameters, + value: body.map(|b| Value { + payload: b.payload, + encoding: b.data_info.encoding.unwrap_or_default(), }), - }); + qid, + zid, + primitives: if local { + Arc::new(self.clone()) + } else { + primitives + }, + }), + }; + for callback in callbacks.iter() { + callback(query.clone()); } - drop(rep_sender); // all senders need to be dropped for the channel to close } }