Skip to content

Commit

Permalink
Merge pull request #55 from gabrik/feature-unix-domain-sockets
Browse files Browse the repository at this point in the history
Unix Domain Socket transport
  • Loading branch information
Mallets authored Nov 23, 2020
2 parents 1f8ea92 + f8acca3 commit beda767
Show file tree
Hide file tree
Showing 8 changed files with 1,211 additions and 12 deletions.
3 changes: 2 additions & 1 deletion zenoh-protocol/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ maintenance = { status = "actively-developed" }
tcp = []
udp = []
zero-copy = []
default = ["tcp", "udp", "zero-copy"]
transport_unixsock-stream = []
default = ["tcp", "udp", "zero-copy", "transport_unixsock-stream"]

[dependencies]
async-trait = "0.1.38"
Expand Down
66 changes: 58 additions & 8 deletions zenoh-protocol/src/link/locator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
use async_std::net::SocketAddr;
#[cfg(any(feature = "tcp", feature = "udp"))]
use async_std::net::ToSocketAddrs;
#[cfg(any(feature = "tcp", feature = "udp"))]
#[cfg(all(feature = "transport_unixsock-stream", target_family = "unix"))]
use async_std::path::PathBuf;
#[cfg(any(feature = "tcp", feature = "udp", feat))]
use async_std::task;

use std::cmp::PartialEq;
Expand All @@ -36,13 +38,17 @@ pub const PORT_SEPARATOR: char = ':';
pub const STR_TCP: &str = "tcp";
#[cfg(feature = "udp")]
pub const STR_UDP: &str = "udp";
#[cfg(all(feature = "transport_unixsock-stream", target_family = "unix"))]
pub const STR_UNIXSOCK_STREAM: &str = "unixsock-stream";

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum LocatorProtocol {
#[cfg(feature = "tcp")]
Tcp,
#[cfg(feature = "udp")]
Udp,
#[cfg(all(feature = "transport_unixsock-stream", target_family = "unix"))]
UnixSockStream,
}

impl fmt::Display for LocatorProtocol {
Expand All @@ -52,6 +58,8 @@ impl fmt::Display for LocatorProtocol {
LocatorProtocol::Tcp => write!(f, "{}", STR_TCP)?,
#[cfg(feature = "udp")]
LocatorProtocol::Udp => write!(f, "{}", STR_UDP)?,
#[cfg(all(feature = "transport_unixsock-stream", target_family = "unix"))]
LocatorProtocol::UnixSockStream => write!(f, "{}", STR_UNIXSOCK_STREAM)?,
}
Ok(())
}
Expand All @@ -63,19 +71,35 @@ pub enum Locator {
Tcp(SocketAddr),
#[cfg(feature = "udp")]
Udp(SocketAddr),
#[cfg(all(feature = "transport_unixsock-stream", target_family = "unix"))]
UnixSockStream(PathBuf),
}

impl FromStr for Locator {
type Err = ZError;

fn from_str(s: &str) -> Result<Self, Self::Err> {
let split: Vec<&str> = s.split(PROTO_SEPARATOR).collect();
if split.len() != 2 {
return zerror!(ZErrorKind::InvalidLocator {
descr: format!("Missing protocol: {}", s)
});
}
let (proto, addr) = (split[0], split[1]);
let sep_index = s.find(PROTO_SEPARATOR);

let index = match sep_index {
Some(index) => index,
None => {
return zerror!(ZErrorKind::InvalidLocator {
descr: format!("Invalid locator: {}", s)
});
}
};

let (proto, addr) = s.split_at(index);
let addr = match addr.strip_prefix(PROTO_SEPARATOR) {
Some(addr) => addr,
None => {
return zerror!(ZErrorKind::InvalidLocator {
descr: format!("Invalid locator: {}", s)
});
}
};

match proto {
#[cfg(feature = "tcp")]
STR_TCP => {
Expand Down Expand Up @@ -121,6 +145,20 @@ impl FromStr for Locator {
});
addr.map(Locator::Udp)
}
#[cfg(all(feature = "transport_unixsock-stream", target_family = "unix"))]
STR_UNIXSOCK_STREAM => {
let addr = task::block_on(async {
match PathBuf::from(addr).to_str() {
Some(path) => Ok(PathBuf::from(path)),
None => {
let e = format!("Invalid Unix locator: {:?}", addr);
log::warn!("{}", e);
zerror!(ZErrorKind::InvalidLocator { descr: e })
}
}
});
addr.map(Locator::UnixSockStream)
}
_ => {
let e = format!("Invalid protocol locator: {}", proto);
log::warn!("{}", e);
Expand All @@ -137,6 +175,8 @@ impl Locator {
Locator::Tcp(..) => LocatorProtocol::Tcp,
#[cfg(feature = "udp")]
Locator::Udp(..) => LocatorProtocol::Udp,
#[cfg(all(feature = "transport_unixsock-stream", target_family = "unix"))]
Locator::UnixSockStream(..) => LocatorProtocol::UnixSockStream,
}
}
}
Expand All @@ -148,6 +188,11 @@ impl fmt::Display for Locator {
Locator::Tcp(addr) => write!(f, "{}/{}", STR_TCP, addr)?,
#[cfg(feature = "udp")]
Locator::Udp(addr) => write!(f, "{}/{}", STR_UDP, addr)?,
#[cfg(all(feature = "transport_unixsock-stream", target_family = "unix"))]
Locator::UnixSockStream(addr) => {
let path = addr.to_str().unwrap_or("None");
write!(f, "{}/{}", STR_UNIXSOCK_STREAM, path)?
}
}
Ok(())
}
Expand All @@ -160,6 +205,11 @@ impl fmt::Debug for Locator {
Locator::Tcp(addr) => (STR_TCP, addr.to_string()),
#[cfg(feature = "udp")]
Locator::Udp(addr) => (STR_UDP, addr.to_string()),
#[cfg(all(feature = "transport_unixsock-stream", target_family = "unix"))]
Locator::UnixSockStream(addr) => {
let path = addr.to_str().unwrap_or("None");
(STR_UNIXSOCK_STREAM, path.to_string())
}
};

f.debug_struct("Locator")
Expand Down
4 changes: 4 additions & 0 deletions zenoh-protocol/src/link/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ use async_std::sync::Arc;
use crate::link::tcp::ManagerTcp;
#[cfg(feature = "udp")]
use crate::link::udp::ManagerUdp;
#[cfg(all(feature = "transport_unixsock-stream", target_family = "unix"))]
use crate::link::unixsock_stream::ManagerUnixSockStream;
use crate::link::{LinkManager, LocatorProtocol};
use crate::session::SessionManagerInner;

Expand All @@ -32,6 +34,8 @@ impl LinkManagerBuilder {
LocatorProtocol::Tcp => Arc::new(ManagerTcp::new(manager)),
#[cfg(feature = "udp")]
LocatorProtocol::Udp => Arc::new(ManagerUdp::new(manager)),
#[cfg(all(feature = "transport_unixsock-stream", target_family = "unix"))]
LocatorProtocol::UnixSockStream => Arc::new(ManagerUnixSockStream::new(manager)),
}
}
}
2 changes: 2 additions & 0 deletions zenoh-protocol/src/link/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ pub use manager::*;
mod tcp;
#[cfg(feature = "udp")]
mod udp;
#[cfg(all(feature = "transport_unixsock-stream", target_family = "unix"))]
mod unixsock_stream;

/* General imports */
use async_std::sync::{Arc, Weak};
Expand Down
Loading

0 comments on commit beda767

Please sign in to comment.