diff --git a/zenoh/src/net/routing/hat/client/pubsub.rs b/zenoh/src/net/routing/hat/client/pubsub.rs index 3845917240..fb92ae614d 100644 --- a/zenoh/src/net/routing/hat/client/pubsub.rs +++ b/zenoh/src/net/routing/hat/client/pubsub.rs @@ -17,11 +17,11 @@ use crate::net::routing::dispatcher::face::FaceState; use crate::net::routing::dispatcher::resource::{NodeId, Resource, SessionContext}; use crate::net::routing::dispatcher::tables::Tables; use crate::net::routing::dispatcher::tables::{Route, RoutingExpr}; -use crate::net::routing::hat::HatPubSubTrait; +use crate::net::routing::hat::{HatPubSubTrait, Sources}; use crate::net::routing::router::RoutesIndexes; use crate::net::routing::{RoutingContext, PREFIX_LIVELINESS}; use std::borrow::Cow; -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use std::sync::Arc; use zenoh_protocol::core::key_expr::OwnedKeyExpr; use zenoh_protocol::{ @@ -274,11 +274,19 @@ impl HatPubSubTrait for HatCode { forget_client_subscription(tables, face, res); } - fn get_subscriptions(&self, tables: &Tables) -> Vec> { - let mut subs = HashSet::new(); + fn get_subscriptions(&self, tables: &Tables) -> Vec<(Arc, Sources)> { + // Compute the list of known suscriptions (keys) + let mut subs = HashMap::new(); for src_face in tables.faces.values() { for sub in &face_hat!(src_face).remote_subs { - subs.insert(sub.clone()); + // Insert the key in the list of known suscriptions + let srcs = subs.entry(sub.clone()).or_insert_with(Sources::empty); + // Append src_face as a suscription source in the proper list + match src_face.whatami { + WhatAmI::Router => srcs.routers.push(src_face.zid), + WhatAmI::Peer => srcs.peers.push(src_face.zid), + WhatAmI::Client => srcs.clients.push(src_face.zid), + } } } Vec::from_iter(subs) diff --git a/zenoh/src/net/routing/hat/client/queries.rs b/zenoh/src/net/routing/hat/client/queries.rs index 609d6e0b04..3576148aaf 100644 --- a/zenoh/src/net/routing/hat/client/queries.rs +++ b/zenoh/src/net/routing/hat/client/queries.rs @@ -17,12 +17,12 @@ use crate::net::routing::dispatcher::face::FaceState; use crate::net::routing::dispatcher::resource::{NodeId, Resource, SessionContext}; use crate::net::routing::dispatcher::tables::Tables; use crate::net::routing::dispatcher::tables::{QueryTargetQabl, QueryTargetQablSet, RoutingExpr}; -use crate::net::routing::hat::HatQueriesTrait; +use crate::net::routing::hat::{HatQueriesTrait, Sources}; use crate::net::routing::router::RoutesIndexes; use crate::net::routing::{RoutingContext, PREFIX_LIVELINESS}; use ordered_float::OrderedFloat; use std::borrow::Cow; -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use std::sync::Arc; use zenoh_buffers::ZBuf; use zenoh_protocol::core::key_expr::include::{Includer, DEFAULT_INCLUDER}; @@ -272,11 +272,19 @@ impl HatQueriesTrait for HatCode { forget_client_queryable(tables, face, res); } - fn get_queryables(&self, tables: &Tables) -> Vec> { - let mut qabls = HashSet::new(); + fn get_queryables(&self, tables: &Tables) -> Vec<(Arc, Sources)> { + // Compute the list of known queryables (keys) + let mut qabls = HashMap::new(); for src_face in tables.faces.values() { for qabl in &face_hat!(src_face).remote_qabls { - qabls.insert(qabl.clone()); + // Insert the key in the list of known queryables + let srcs = qabls.entry(qabl.clone()).or_insert_with(Sources::empty); + // Append src_face as a queryable source in the proper list + match src_face.whatami { + WhatAmI::Router => srcs.routers.push(src_face.zid), + WhatAmI::Peer => srcs.peers.push(src_face.zid), + WhatAmI::Client => srcs.clients.push(src_face.zid), + } } } Vec::from_iter(qabls) diff --git a/zenoh/src/net/routing/hat/linkstate_peer/pubsub.rs b/zenoh/src/net/routing/hat/linkstate_peer/pubsub.rs index 02b86de6b0..f0f8b77111 100644 --- a/zenoh/src/net/routing/hat/linkstate_peer/pubsub.rs +++ b/zenoh/src/net/routing/hat/linkstate_peer/pubsub.rs @@ -19,7 +19,7 @@ use crate::net::routing::dispatcher::pubsub::*; use crate::net::routing::dispatcher::resource::{NodeId, Resource, SessionContext}; use crate::net::routing::dispatcher::tables::Tables; use crate::net::routing::dispatcher::tables::{Route, RoutingExpr}; -use crate::net::routing::hat::HatPubSubTrait; +use crate::net::routing::hat::{HatPubSubTrait, Sources}; use crate::net::routing::router::RoutesIndexes; use crate::net::routing::{RoutingContext, PREFIX_LIVELINESS}; use petgraph::graph::NodeIndex; @@ -605,8 +605,31 @@ impl HatPubSubTrait for HatCode { } } - fn get_subscriptions(&self, tables: &Tables) -> Vec> { - hat!(tables).peer_subs.iter().cloned().collect() + fn get_subscriptions(&self, tables: &Tables) -> Vec<(Arc, Sources)> { + // Compute the list of known suscriptions (keys) + hat!(tables) + .peer_subs + .iter() + .map(|s| { + ( + s.clone(), + // Compute the list of routers, peers and clients that are known + // sources of those subscriptions + Sources { + routers: vec![], + peers: Vec::from_iter(res_hat!(s).peer_subs.iter().cloned()), + clients: s + .session_ctxs + .values() + .filter_map(|f| { + (f.face.whatami == WhatAmI::Client && f.subs.is_some()) + .then_some(f.face.zid) + }) + .collect(), + }, + ) + }) + .collect() } fn compute_data_route( diff --git a/zenoh/src/net/routing/hat/linkstate_peer/queries.rs b/zenoh/src/net/routing/hat/linkstate_peer/queries.rs index ba9b7bc02d..fa48a66ee5 100644 --- a/zenoh/src/net/routing/hat/linkstate_peer/queries.rs +++ b/zenoh/src/net/routing/hat/linkstate_peer/queries.rs @@ -19,7 +19,7 @@ use crate::net::routing::dispatcher::queries::*; use crate::net::routing::dispatcher::resource::{NodeId, Resource, SessionContext}; use crate::net::routing::dispatcher::tables::Tables; use crate::net::routing::dispatcher::tables::{QueryTargetQabl, QueryTargetQablSet, RoutingExpr}; -use crate::net::routing::hat::HatQueriesTrait; +use crate::net::routing::hat::{HatQueriesTrait, Sources}; use crate::net::routing::router::RoutesIndexes; use crate::net::routing::{RoutingContext, PREFIX_LIVELINESS}; use ordered_float::OrderedFloat; @@ -670,8 +670,31 @@ impl HatQueriesTrait for HatCode { } } - fn get_queryables(&self, tables: &Tables) -> Vec> { - hat!(tables).peer_qabls.iter().cloned().collect() + fn get_queryables(&self, tables: &Tables) -> Vec<(Arc, Sources)> { + // Compute the list of known queryables (keys) + hat!(tables) + .peer_qabls + .iter() + .map(|s| { + ( + s.clone(), + // Compute the list of routers, peers and clients that are known + // sources of those queryables + Sources { + routers: vec![], + peers: Vec::from_iter(res_hat!(s).peer_qabls.keys().cloned()), + clients: s + .session_ctxs + .values() + .filter_map(|f| { + (f.face.whatami == WhatAmI::Client && f.qabl.is_some()) + .then_some(f.face.zid) + }) + .collect(), + }, + ) + }) + .collect() } fn compute_query_route( diff --git a/zenoh/src/net/routing/hat/mod.rs b/zenoh/src/net/routing/hat/mod.rs index 4fbf9c9e5d..2752a80959 100644 --- a/zenoh/src/net/routing/hat/mod.rs +++ b/zenoh/src/net/routing/hat/mod.rs @@ -27,7 +27,7 @@ use super::{ use crate::runtime::Runtime; use std::{any::Any, sync::Arc}; use zenoh_buffers::ZBuf; -use zenoh_config::{unwrap_or_default, Config, WhatAmI}; +use zenoh_config::{unwrap_or_default, Config, WhatAmI, ZenohId}; use zenoh_protocol::{ core::WireExpr, network::{ @@ -47,6 +47,23 @@ zconfigurable! { pub static ref TREES_COMPUTATION_DELAY_MS: u64 = 100; } +#[derive(serde::Serialize)] +pub(crate) struct Sources { + routers: Vec, + peers: Vec, + clients: Vec, +} + +impl Sources { + pub(crate) fn empty() -> Self { + Self { + routers: vec![], + peers: vec![], + clients: vec![], + } + } +} + pub(crate) trait HatTrait: HatBaseTrait + HatPubSubTrait + HatQueriesTrait {} pub(crate) trait HatBaseTrait { @@ -129,7 +146,7 @@ pub(crate) trait HatPubSubTrait { node_id: NodeId, ); - fn get_subscriptions(&self, tables: &Tables) -> Vec>; + fn get_subscriptions(&self, tables: &Tables) -> Vec<(Arc, Sources)>; fn compute_data_route( &self, @@ -159,7 +176,7 @@ pub(crate) trait HatQueriesTrait { node_id: NodeId, ); - fn get_queryables(&self, tables: &Tables) -> Vec>; + fn get_queryables(&self, tables: &Tables) -> Vec<(Arc, Sources)>; fn compute_query_route( &self, diff --git a/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs b/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs index 432b8e137e..bbaf0f5bac 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs @@ -17,11 +17,11 @@ use crate::net::routing::dispatcher::face::FaceState; use crate::net::routing::dispatcher::resource::{NodeId, Resource, SessionContext}; use crate::net::routing::dispatcher::tables::Tables; use crate::net::routing::dispatcher::tables::{Route, RoutingExpr}; -use crate::net::routing::hat::HatPubSubTrait; +use crate::net::routing::hat::{HatPubSubTrait, Sources}; use crate::net::routing::router::RoutesIndexes; use crate::net::routing::{RoutingContext, PREFIX_LIVELINESS}; use std::borrow::Cow; -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use std::sync::Arc; use zenoh_protocol::core::key_expr::OwnedKeyExpr; use zenoh_protocol::{ @@ -275,11 +275,19 @@ impl HatPubSubTrait for HatCode { forget_client_subscription(tables, face, res); } - fn get_subscriptions(&self, tables: &Tables) -> Vec> { - let mut subs = HashSet::new(); + fn get_subscriptions(&self, tables: &Tables) -> Vec<(Arc, Sources)> { + // Compute the list of known suscriptions (keys) + let mut subs = HashMap::new(); for src_face in tables.faces.values() { for sub in &face_hat!(src_face).remote_subs { - subs.insert(sub.clone()); + // Insert the key in the list of known suscriptions + let srcs = subs.entry(sub.clone()).or_insert_with(Sources::empty); + // Append src_face as a suscription source in the proper list + match src_face.whatami { + WhatAmI::Router => srcs.routers.push(src_face.zid), + WhatAmI::Peer => srcs.peers.push(src_face.zid), + WhatAmI::Client => srcs.clients.push(src_face.zid), + } } } Vec::from_iter(subs) diff --git a/zenoh/src/net/routing/hat/p2p_peer/queries.rs b/zenoh/src/net/routing/hat/p2p_peer/queries.rs index 0937e22a65..aeaee21409 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/queries.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/queries.rs @@ -17,12 +17,12 @@ use crate::net::routing::dispatcher::face::FaceState; use crate::net::routing::dispatcher::resource::{NodeId, Resource, SessionContext}; use crate::net::routing::dispatcher::tables::Tables; use crate::net::routing::dispatcher::tables::{QueryTargetQabl, QueryTargetQablSet, RoutingExpr}; -use crate::net::routing::hat::HatQueriesTrait; +use crate::net::routing::hat::{HatQueriesTrait, Sources}; use crate::net::routing::router::RoutesIndexes; use crate::net::routing::{RoutingContext, PREFIX_LIVELINESS}; use ordered_float::OrderedFloat; use std::borrow::Cow; -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use std::sync::Arc; use zenoh_buffers::ZBuf; use zenoh_protocol::core::key_expr::include::{Includer, DEFAULT_INCLUDER}; @@ -272,11 +272,19 @@ impl HatQueriesTrait for HatCode { forget_client_queryable(tables, face, res); } - fn get_queryables(&self, tables: &Tables) -> Vec> { - let mut qabls = HashSet::new(); + fn get_queryables(&self, tables: &Tables) -> Vec<(Arc, Sources)> { + // Compute the list of known queryables (keys) + let mut qabls = HashMap::new(); for src_face in tables.faces.values() { for qabl in &face_hat!(src_face).remote_qabls { - qabls.insert(qabl.clone()); + // Insert the key in the list of known queryables + let srcs = qabls.entry(qabl.clone()).or_insert_with(Sources::empty); + // Append src_face as a queryable source in the proper list + match src_face.whatami { + WhatAmI::Router => srcs.routers.push(src_face.zid), + WhatAmI::Peer => srcs.peers.push(src_face.zid), + WhatAmI::Client => srcs.clients.push(src_face.zid), + } } } Vec::from_iter(qabls) diff --git a/zenoh/src/net/routing/hat/router/pubsub.rs b/zenoh/src/net/routing/hat/router/pubsub.rs index 6bf91a0605..b7d00227c0 100644 --- a/zenoh/src/net/routing/hat/router/pubsub.rs +++ b/zenoh/src/net/routing/hat/router/pubsub.rs @@ -19,7 +19,7 @@ use crate::net::routing::dispatcher::pubsub::*; use crate::net::routing::dispatcher::resource::{NodeId, Resource, SessionContext}; use crate::net::routing::dispatcher::tables::Tables; use crate::net::routing::dispatcher::tables::{Route, RoutingExpr}; -use crate::net::routing::hat::HatPubSubTrait; +use crate::net::routing::hat::{HatPubSubTrait, Sources}; use crate::net::routing::router::RoutesIndexes; use crate::net::routing::{RoutingContext, PREFIX_LIVELINESS}; use petgraph::graph::NodeIndex; @@ -925,8 +925,41 @@ impl HatPubSubTrait for HatCode { } } - fn get_subscriptions(&self, tables: &Tables) -> Vec> { - hat!(tables).router_subs.iter().cloned().collect() + fn get_subscriptions(&self, tables: &Tables) -> Vec<(Arc, Sources)> { + // Compute the list of known suscriptions (keys) + hat!(tables) + .router_subs + .iter() + .map(|s| { + ( + s.clone(), + // Compute the list of routers, peers and clients that are known + // sources of those subscriptions + Sources { + routers: Vec::from_iter(res_hat!(s).router_subs.iter().cloned()), + peers: if hat!(tables).full_net(WhatAmI::Peer) { + Vec::from_iter(res_hat!(s).peer_subs.iter().cloned()) + } else { + s.session_ctxs + .values() + .filter_map(|f| { + (f.face.whatami == WhatAmI::Peer && f.subs.is_some()) + .then_some(f.face.zid) + }) + .collect() + }, + clients: s + .session_ctxs + .values() + .filter_map(|f| { + (f.face.whatami == WhatAmI::Client && f.subs.is_some()) + .then_some(f.face.zid) + }) + .collect(), + }, + ) + }) + .collect() } fn compute_data_route( diff --git a/zenoh/src/net/routing/hat/router/queries.rs b/zenoh/src/net/routing/hat/router/queries.rs index 2451b8c2b6..28ff0800db 100644 --- a/zenoh/src/net/routing/hat/router/queries.rs +++ b/zenoh/src/net/routing/hat/router/queries.rs @@ -19,7 +19,7 @@ use crate::net::routing::dispatcher::queries::*; use crate::net::routing::dispatcher::resource::{NodeId, Resource, SessionContext}; use crate::net::routing::dispatcher::tables::Tables; use crate::net::routing::dispatcher::tables::{QueryTargetQabl, QueryTargetQablSet, RoutingExpr}; -use crate::net::routing::hat::HatQueriesTrait; +use crate::net::routing::hat::{HatQueriesTrait, Sources}; use crate::net::routing::router::RoutesIndexes; use crate::net::routing::{RoutingContext, PREFIX_LIVELINESS}; use ordered_float::OrderedFloat; @@ -1073,8 +1073,41 @@ impl HatQueriesTrait for HatCode { } } - fn get_queryables(&self, tables: &Tables) -> Vec> { - hat!(tables).router_qabls.iter().cloned().collect() + fn get_queryables(&self, tables: &Tables) -> Vec<(Arc, Sources)> { + // Compute the list of known queryables (keys) + hat!(tables) + .router_qabls + .iter() + .map(|s| { + ( + s.clone(), + // Compute the list of routers, peers and clients that are known + // sources of those queryables + Sources { + routers: Vec::from_iter(res_hat!(s).router_qabls.keys().cloned()), + peers: if hat!(tables).full_net(WhatAmI::Peer) { + Vec::from_iter(res_hat!(s).peer_qabls.keys().cloned()) + } else { + s.session_ctxs + .values() + .filter_map(|f| { + (f.face.whatami == WhatAmI::Peer && f.qabl.is_some()) + .then_some(f.face.zid) + }) + .collect() + }, + clients: s + .session_ctxs + .values() + .filter_map(|f| { + (f.face.whatami == WhatAmI::Client && f.qabl.is_some()) + .then_some(f.face.zid) + }) + .collect(), + }, + ) + }) + .collect() } fn compute_query_route( diff --git a/zenoh/src/net/runtime/adminspace.rs b/zenoh/src/net/runtime/adminspace.rs index d62379b862..0040c96666 100644 --- a/zenoh/src/net/runtime/adminspace.rs +++ b/zenoh/src/net/runtime/adminspace.rs @@ -715,11 +715,18 @@ fn subscribers_data(context: &AdminContext, query: Query) { "@/{}/{}/subscriber/{}", context.runtime.state.whatami, context.runtime.state.zid, - sub.expr() + sub.0.expr() )) .unwrap(); if query.key_expr().intersects(&key) { - if let Err(e) = query.reply(Ok(Sample::new(key, Value::empty()))).res() { + if let Err(e) = query + .reply(Ok(Sample::new( + key, + Value::from(serde_json::to_string(&sub.1).unwrap_or_else(|_| "{}".to_string())) + .encoding(KnownEncoding::AppJson.into()), + ))) + .res() + { tracing::error!("Error sending AdminSpace reply: {:?}", e); } } @@ -733,11 +740,20 @@ fn queryables_data(context: &AdminContext, query: Query) { "@/{}/{}/queryable/{}", context.runtime.state.whatami, context.runtime.state.zid, - qabl.expr() + qabl.0.expr() )) .unwrap(); if query.key_expr().intersects(&key) { - if let Err(e) = query.reply(Ok(Sample::new(key, Value::empty()))).res() { + if let Err(e) = query + .reply(Ok(Sample::new( + key, + Value::from( + serde_json::to_string(&qabl.1).unwrap_or_else(|_| "{}".to_string()), + ) + .encoding(KnownEncoding::AppJson.into()), + ))) + .res() + { tracing::error!("Error sending AdminSpace reply: {:?}", e); } }