Skip to content

Commit

Permalink
Multi-path inter region routing (#74)
Browse files Browse the repository at this point in the history
  • Loading branch information
OlivierHecart committed Mar 18, 2021
1 parent 8a6194f commit c103689
Show file tree
Hide file tree
Showing 4 changed files with 134 additions and 78 deletions.
8 changes: 8 additions & 0 deletions zenoh/src/net/routing/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,14 @@ impl Network {
)
}

#[inline]
pub(crate) fn get_pids(&self, whatami: whatami::Type) -> Vec<&PeerId> {
self.graph
.node_indices()
.filter_map(|idx| (self.graph[idx].whatami == whatami).then_some(&self.graph[idx].pid))
.collect()
}

#[inline]
pub(crate) fn get_idx(&self, pid: &PeerId) -> Option<NodeIndex> {
self.graph
Expand Down
90 changes: 50 additions & 40 deletions zenoh/src/net/routing/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use super::protocol::proto::{DataInfo, RoutingContext};

use super::face::FaceState;
use super::network::Network;
use super::resource::{Context, PullCaches, Resource, Route};
use super::resource::{elect_router, Context, PullCaches, Resource, Route};
use super::router::Tables;

#[inline]
Expand Down Expand Up @@ -776,48 +776,56 @@ unsafe fn compute_data_route(
source_type: whatami::Type,
) -> Arc<Route> {
let mut route = HashMap::new();
let res_name = [&prefix.name(), suffix].concat();
let res = Resource::get_resource(prefix, suffix);
let matches = match res.as_ref() {
Some(res) => Cow::from(&res.matches),
None => Cow::from(Resource::get_matches(
tables,
&[&prefix.name(), suffix].concat(),
)),
None => Cow::from(Resource::get_matches(tables, &res_name)),
};

let master = tables.whatami != whatami::ROUTER
|| *elect_router(
&res_name,
&tables.peers_net.as_ref().unwrap().get_pids(whatami::ROUTER)[..],
) == tables.pid;

for mres in matches.iter() {
let mut mres = mres.upgrade().unwrap();
let mres = Arc::get_mut_unchecked(&mut mres);
if tables.whatami == whatami::ROUTER {
let net = tables.routers_net.as_ref().unwrap();
let router_source = match source_type {
whatami::ROUTER => source.unwrap(),
_ => net.idx.index(),
};
insert_faces_for_subs(
&mut route,
prefix,
suffix,
tables,
net,
router_source,
&mres.router_subs,
);
if master || source_type == whatami::ROUTER {
let net = tables.routers_net.as_ref().unwrap();
let router_source = match source_type {
whatami::ROUTER => source.unwrap(),
_ => net.idx.index(),
};
insert_faces_for_subs(
&mut route,
prefix,
suffix,
tables,
net,
router_source,
&mres.router_subs,
);
}

let net = tables.peers_net.as_ref().unwrap();
let peer_source = match source_type {
whatami::PEER => source.unwrap(),
_ => net.idx.index(),
};
insert_faces_for_subs(
&mut route,
prefix,
suffix,
tables,
net,
peer_source,
&mres.peer_subs,
);
if master || source_type != whatami::ROUTER {
let net = tables.peers_net.as_ref().unwrap();
let peer_source = match source_type {
whatami::PEER => source.unwrap(),
_ => net.idx.index(),
};
insert_faces_for_subs(
&mut route,
prefix,
suffix,
tables,
net,
peer_source,
&mres.peer_subs,
);
}
}

if tables.whatami == whatami::PEER {
Expand All @@ -837,13 +845,15 @@ unsafe fn compute_data_route(
);
}

for (sid, context) in &mut mres.contexts {
if let Some(subinfo) = &context.subs {
if subinfo.mode == SubMode::Push {
route.entry(*sid).or_insert_with(|| {
let reskey = Resource::get_best_key(prefix, suffix, *sid);
(context.face.clone(), reskey, None)
});
if tables.whatami != whatami::ROUTER || master || source_type == whatami::ROUTER {
for (sid, context) in &mut mres.contexts {
if let Some(subinfo) = &context.subs {
if subinfo.mode == SubMode::Push {
route.entry(*sid).or_insert_with(|| {
let reskey = Resource::get_best_key(prefix, suffix, *sid);
(context.face.clone(), reskey, None)
});
}
}
}
}
Expand Down
90 changes: 52 additions & 38 deletions zenoh/src/net/routing/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
//
use async_std::sync::Arc;
use petgraph::graph::NodeIndex;
use std::borrow::Cow;
use std::collections::{HashMap, HashSet};

use super::protocol::core::{whatami, PeerId, QueryConsolidation, QueryTarget, ResKey, ZInt};
Expand All @@ -21,7 +22,7 @@ use super::protocol::proto::{DataInfo, RoutingContext};

use super::face::FaceState;
use super::network::Network;
use super::resource::{Context, Resource, Route};
use super::resource::{elect_router, Context, Resource, Route};
use super::router::Tables;

pub(crate) struct Query {
Expand Down Expand Up @@ -707,45 +708,56 @@ unsafe fn compute_query_route(
source_type: whatami::Type,
) -> Arc<Route> {
let mut route = HashMap::new();
let resname = [&prefix.name(), suffix].concat();
let res_name = [&prefix.name(), suffix].concat();
let res = Resource::get_resource(prefix, suffix);
let matches = match res.as_ref() {
Some(res) => std::borrow::Cow::from(&res.matches),
None => std::borrow::Cow::from(Resource::get_matches(tables, &resname)),
Some(res) => Cow::from(&res.matches),
None => Cow::from(Resource::get_matches(tables, &res_name)),
};

let master = tables.whatami != whatami::ROUTER
|| *elect_router(
&res_name,
&tables.peers_net.as_ref().unwrap().get_pids(whatami::ROUTER)[..],
) == tables.pid;

for mres in matches.iter() {
let mut mres = mres.upgrade().unwrap();
let mres = Arc::get_mut_unchecked(&mut mres);
if tables.whatami == whatami::ROUTER {
let net = tables.routers_net.as_ref().unwrap();
let router_source = match source_type {
whatami::ROUTER => source.unwrap(),
_ => net.idx.index(),
};
insert_faces_for_qabls(
&mut route,
prefix,
suffix,
tables,
net,
router_source,
&mres.router_qabls,
);
let net = tables.peers_net.as_ref().unwrap();
let peer_source = match source_type {
whatami::PEER => source.unwrap(),
_ => net.idx.index(),
};
insert_faces_for_qabls(
&mut route,
prefix,
suffix,
tables,
net,
peer_source,
&mres.peer_qabls,
);
if master || source_type == whatami::ROUTER {
let net = tables.routers_net.as_ref().unwrap();
let router_source = match source_type {
whatami::ROUTER => source.unwrap(),
_ => net.idx.index(),
};
insert_faces_for_qabls(
&mut route,
prefix,
suffix,
tables,
net,
router_source,
&mres.router_qabls,
);
}

if master || source_type != whatami::ROUTER {
let net = tables.peers_net.as_ref().unwrap();
let peer_source = match source_type {
whatami::PEER => source.unwrap(),
_ => net.idx.index(),
};
insert_faces_for_qabls(
&mut route,
prefix,
suffix,
tables,
net,
peer_source,
&mres.peer_qabls,
);
}
}

if tables.whatami == whatami::PEER {
Expand All @@ -765,12 +777,14 @@ unsafe fn compute_query_route(
);
}

for (sid, context) in &mut mres.contexts {
if context.qabl {
route.entry(*sid).or_insert_with(|| {
let reskey = Resource::get_best_key(prefix, suffix, *sid);
(context.face.clone(), reskey, None)
});
if tables.whatami != whatami::ROUTER || master || source_type == whatami::ROUTER {
for (sid, context) in &mut mres.contexts {
if context.qabl {
route.entry(*sid).or_insert_with(|| {
let reskey = Resource::get_best_key(prefix, suffix, *sid);
(context.face.clone(), reskey, None)
});
}
}
}
}
Expand Down
24 changes: 24 additions & 0 deletions zenoh/src/net/routing/resource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use super::protocol::io::RBuf;
use super::protocol::proto::{DataInfo, RoutingContext};
use super::router::Tables;
use async_std::sync::{Arc, Weak};
use std::collections::hash_map::DefaultHasher;
use std::collections::{HashMap, HashSet};
use std::hash::{Hash, Hasher};

Expand Down Expand Up @@ -502,3 +503,26 @@ pub async fn undeclare_resource(_tables: &mut Tables, face: &mut Arc<FaceState>,
}
}
}

#[inline]
pub(super) fn elect_router<'a>(res_name: &str, routers: &'a [&'a PeerId]) -> &'a PeerId {
if routers.len() == 1 {
&routers[0]
} else {
routers
.iter()
.map(|router| {
let mut hasher = DefaultHasher::new();
for b in res_name.as_bytes() {
hasher.write_u8(*b);
}
for b in router.as_slice() {
hasher.write_u8(*b);
}
(router, hasher.finish())
})
.max_by(|(_, s1), (_, s2)| s1.partial_cmp(s2).unwrap())
.unwrap()
.0
}
}

0 comments on commit c103689

Please sign in to comment.