From c103689111547eb56a2b3dc834a4f42e0bc1ce6b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Olivier=20H=C3=A9cart?= Date: Thu, 18 Mar 2021 12:17:55 +0100 Subject: [PATCH] Multi-path inter region routing (#74) --- zenoh/src/net/routing/network.rs | 8 +++ zenoh/src/net/routing/pubsub.rs | 90 +++++++++++++++++-------------- zenoh/src/net/routing/queries.rs | 90 ++++++++++++++++++------------- zenoh/src/net/routing/resource.rs | 24 +++++++++ 4 files changed, 134 insertions(+), 78 deletions(-) diff --git a/zenoh/src/net/routing/network.rs b/zenoh/src/net/routing/network.rs index b811d99244..1195a2985c 100644 --- a/zenoh/src/net/routing/network.rs +++ b/zenoh/src/net/routing/network.rs @@ -129,6 +129,14 @@ impl Network { ) } + #[inline] + pub(crate) fn get_pids(&self, whatami: whatami::Type) -> Vec<&PeerId> { + self.graph + .node_indices() + .filter_map(|idx| (self.graph[idx].whatami == whatami).then_some(&self.graph[idx].pid)) + .collect() + } + #[inline] pub(crate) fn get_idx(&self, pid: &PeerId) -> Option { self.graph diff --git a/zenoh/src/net/routing/pubsub.rs b/zenoh/src/net/routing/pubsub.rs index 6a5b55fb18..033cf4b885 100644 --- a/zenoh/src/net/routing/pubsub.rs +++ b/zenoh/src/net/routing/pubsub.rs @@ -25,7 +25,7 @@ use super::protocol::proto::{DataInfo, RoutingContext}; use super::face::FaceState; use super::network::Network; -use super::resource::{Context, PullCaches, Resource, Route}; +use super::resource::{elect_router, Context, PullCaches, Resource, Route}; use super::router::Tables; #[inline] @@ -776,48 +776,56 @@ unsafe fn compute_data_route( source_type: whatami::Type, ) -> Arc { let mut route = HashMap::new(); + let res_name = [&prefix.name(), suffix].concat(); let res = Resource::get_resource(prefix, suffix); let matches = match res.as_ref() { Some(res) => Cow::from(&res.matches), - None => Cow::from(Resource::get_matches( - tables, - &[&prefix.name(), suffix].concat(), - )), + None => Cow::from(Resource::get_matches(tables, &res_name)), }; + let master = tables.whatami != whatami::ROUTER + || *elect_router( + &res_name, + &tables.peers_net.as_ref().unwrap().get_pids(whatami::ROUTER)[..], + ) == tables.pid; + for mres in matches.iter() { let mut mres = mres.upgrade().unwrap(); let mres = Arc::get_mut_unchecked(&mut mres); if tables.whatami == whatami::ROUTER { - let net = tables.routers_net.as_ref().unwrap(); - let router_source = match source_type { - whatami::ROUTER => source.unwrap(), - _ => net.idx.index(), - }; - insert_faces_for_subs( - &mut route, - prefix, - suffix, - tables, - net, - router_source, - &mres.router_subs, - ); + if master || source_type == whatami::ROUTER { + let net = tables.routers_net.as_ref().unwrap(); + let router_source = match source_type { + whatami::ROUTER => source.unwrap(), + _ => net.idx.index(), + }; + insert_faces_for_subs( + &mut route, + prefix, + suffix, + tables, + net, + router_source, + &mres.router_subs, + ); + } - let net = tables.peers_net.as_ref().unwrap(); - let peer_source = match source_type { - whatami::PEER => source.unwrap(), - _ => net.idx.index(), - }; - insert_faces_for_subs( - &mut route, - prefix, - suffix, - tables, - net, - peer_source, - &mres.peer_subs, - ); + if master || source_type != whatami::ROUTER { + let net = tables.peers_net.as_ref().unwrap(); + let peer_source = match source_type { + whatami::PEER => source.unwrap(), + _ => net.idx.index(), + }; + insert_faces_for_subs( + &mut route, + prefix, + suffix, + tables, + net, + peer_source, + &mres.peer_subs, + ); + } } if tables.whatami == whatami::PEER { @@ -837,13 +845,15 @@ unsafe fn compute_data_route( ); } - for (sid, context) in &mut mres.contexts { - if let Some(subinfo) = &context.subs { - if subinfo.mode == SubMode::Push { - route.entry(*sid).or_insert_with(|| { - let reskey = Resource::get_best_key(prefix, suffix, *sid); - (context.face.clone(), reskey, None) - }); + if tables.whatami != whatami::ROUTER || master || source_type == whatami::ROUTER { + for (sid, context) in &mut mres.contexts { + if let Some(subinfo) = &context.subs { + if subinfo.mode == SubMode::Push { + route.entry(*sid).or_insert_with(|| { + let reskey = Resource::get_best_key(prefix, suffix, *sid); + (context.face.clone(), reskey, None) + }); + } } } } diff --git a/zenoh/src/net/routing/queries.rs b/zenoh/src/net/routing/queries.rs index f0de744e03..f1270a7aaa 100644 --- a/zenoh/src/net/routing/queries.rs +++ b/zenoh/src/net/routing/queries.rs @@ -13,6 +13,7 @@ // use async_std::sync::Arc; use petgraph::graph::NodeIndex; +use std::borrow::Cow; use std::collections::{HashMap, HashSet}; use super::protocol::core::{whatami, PeerId, QueryConsolidation, QueryTarget, ResKey, ZInt}; @@ -21,7 +22,7 @@ use super::protocol::proto::{DataInfo, RoutingContext}; use super::face::FaceState; use super::network::Network; -use super::resource::{Context, Resource, Route}; +use super::resource::{elect_router, Context, Resource, Route}; use super::router::Tables; pub(crate) struct Query { @@ -707,45 +708,56 @@ unsafe fn compute_query_route( source_type: whatami::Type, ) -> Arc { let mut route = HashMap::new(); - let resname = [&prefix.name(), suffix].concat(); + let res_name = [&prefix.name(), suffix].concat(); let res = Resource::get_resource(prefix, suffix); let matches = match res.as_ref() { - Some(res) => std::borrow::Cow::from(&res.matches), - None => std::borrow::Cow::from(Resource::get_matches(tables, &resname)), + Some(res) => Cow::from(&res.matches), + None => Cow::from(Resource::get_matches(tables, &res_name)), }; + let master = tables.whatami != whatami::ROUTER + || *elect_router( + &res_name, + &tables.peers_net.as_ref().unwrap().get_pids(whatami::ROUTER)[..], + ) == tables.pid; + for mres in matches.iter() { let mut mres = mres.upgrade().unwrap(); let mres = Arc::get_mut_unchecked(&mut mres); if tables.whatami == whatami::ROUTER { - let net = tables.routers_net.as_ref().unwrap(); - let router_source = match source_type { - whatami::ROUTER => source.unwrap(), - _ => net.idx.index(), - }; - insert_faces_for_qabls( - &mut route, - prefix, - suffix, - tables, - net, - router_source, - &mres.router_qabls, - ); - let net = tables.peers_net.as_ref().unwrap(); - let peer_source = match source_type { - whatami::PEER => source.unwrap(), - _ => net.idx.index(), - }; - insert_faces_for_qabls( - &mut route, - prefix, - suffix, - tables, - net, - peer_source, - &mres.peer_qabls, - ); + if master || source_type == whatami::ROUTER { + let net = tables.routers_net.as_ref().unwrap(); + let router_source = match source_type { + whatami::ROUTER => source.unwrap(), + _ => net.idx.index(), + }; + insert_faces_for_qabls( + &mut route, + prefix, + suffix, + tables, + net, + router_source, + &mres.router_qabls, + ); + } + + if master || source_type != whatami::ROUTER { + let net = tables.peers_net.as_ref().unwrap(); + let peer_source = match source_type { + whatami::PEER => source.unwrap(), + _ => net.idx.index(), + }; + insert_faces_for_qabls( + &mut route, + prefix, + suffix, + tables, + net, + peer_source, + &mres.peer_qabls, + ); + } } if tables.whatami == whatami::PEER { @@ -765,12 +777,14 @@ unsafe fn compute_query_route( ); } - for (sid, context) in &mut mres.contexts { - if context.qabl { - route.entry(*sid).or_insert_with(|| { - let reskey = Resource::get_best_key(prefix, suffix, *sid); - (context.face.clone(), reskey, None) - }); + if tables.whatami != whatami::ROUTER || master || source_type == whatami::ROUTER { + for (sid, context) in &mut mres.contexts { + if context.qabl { + route.entry(*sid).or_insert_with(|| { + let reskey = Resource::get_best_key(prefix, suffix, *sid); + (context.face.clone(), reskey, None) + }); + } } } } diff --git a/zenoh/src/net/routing/resource.rs b/zenoh/src/net/routing/resource.rs index c4537712b4..ed831036be 100644 --- a/zenoh/src/net/routing/resource.rs +++ b/zenoh/src/net/routing/resource.rs @@ -18,6 +18,7 @@ use super::protocol::io::RBuf; use super::protocol::proto::{DataInfo, RoutingContext}; use super::router::Tables; use async_std::sync::{Arc, Weak}; +use std::collections::hash_map::DefaultHasher; use std::collections::{HashMap, HashSet}; use std::hash::{Hash, Hasher}; @@ -502,3 +503,26 @@ pub async fn undeclare_resource(_tables: &mut Tables, face: &mut Arc, } } } + +#[inline] +pub(super) fn elect_router<'a>(res_name: &str, routers: &'a [&'a PeerId]) -> &'a PeerId { + if routers.len() == 1 { + &routers[0] + } else { + routers + .iter() + .map(|router| { + let mut hasher = DefaultHasher::new(); + for b in res_name.as_bytes() { + hasher.write_u8(*b); + } + for b in router.as_slice() { + hasher.write_u8(*b); + } + (router, hasher.finish()) + }) + .max_by(|(_, s1), (_, s2)| s1.partial_cmp(s2).unwrap()) + .unwrap() + .0 + } +}