Skip to content

Commit

Permalink
[tcp] Port-reuse, async-io, if-watch (#1887)
Browse files Browse the repository at this point in the history
* Update tomls.

* Let transports decide when to translate.

* Improve tcp transport.

* Update stuff.

* Remove background task. Enhance documentation.

To avoid spawning a background task and thread within
`TcpConfig::new()`, with communication via unbounded channels,
a `TcpConfig` now keeps track of the listening addresses
for port reuse in an `Arc<RwLock>`. Furthermore, an `IfWatcher`
is only used by a `TcpListenStream` if it listens on any interface
and directly polls the `IfWatcher` both for initialisation and
new events.

Includes some documentation and test enhancements.

* Reintroduce feature flags for tokio vs async-io.

To avoid having an extra reactor thread running for tokio
users and to make sure all TCP I/O uses the mio-based
tokio reactor.

Thereby run tests with both backends.

* Add missing files.

* Fix docsrs attributes.

* Update transports/tcp/src/lib.rs

Co-authored-by: Max Inden <[email protected]>

* Restore chat-tokio example.

* Forward poll_write_vectored for tokio's AsyncWrite.

* Update changelogs.

Co-authored-by: David Craven <[email protected]>
Co-authored-by: Max Inden <[email protected]>
  • Loading branch information
3 people authored Jan 12, 2021
1 parent c98b9ef commit ec0f8a3
Show file tree
Hide file tree
Showing 44 changed files with 1,380 additions and 602 deletions.
5 changes: 3 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@

# Version 0.34.0 [unreleased]

- Update `libp2p-gossipsub`, `libp2p-kad` and `libp2p-request-response`.
- Update `libp2p-core` and all dependent crates.

- Update dependencies.
- The `tcp-async-std` feature is now `tcp-async-io`, still
enabled by default.

# Version 0.33.0 [2020-12-17]

Expand Down
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ default = [
"pnet",
"request-response",
"secp256k1",
"tcp-async-std",
"tcp-async-io",
"uds",
"wasm-ext",
"websocket",
Expand All @@ -44,7 +44,7 @@ ping = ["libp2p-ping"]
plaintext = ["libp2p-plaintext"]
pnet = ["libp2p-pnet"]
request-response = ["libp2p-request-response"]
tcp-async-std = ["libp2p-tcp", "libp2p-tcp/async-std"]
tcp-async-io = ["libp2p-tcp", "libp2p-tcp/async-io"]
tcp-tokio = ["libp2p-tcp", "libp2p-tcp/tokio"]
uds = ["libp2p-uds"]
wasm-ext = ["libp2p-wasm-ext"]
Expand Down Expand Up @@ -91,7 +91,7 @@ libp2p-tcp = { version = "0.27.0", path = "transports/tcp", optional = true }
libp2p-websocket = { version = "0.28.0", path = "transports/websocket", optional = true }

[dev-dependencies]
async-std = "1.6.2"
async-std = { version = "1.6.2", features = ["attributes"] }
env_logger = "0.8.1"
tokio = { version = "0.3", features = ["io-util", "io-std", "stream", "macros", "rt", "rt-multi-thread"] }

Expand Down
4 changes: 4 additions & 0 deletions core/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# 0.27.0 [unreleased]

- (Re)add `Transport::address_translation` to permit transport-specific
translations of observed addresses onto listening addresses.
[PR 1887](https://github.com/libp2p/rust-libp2p/pull/1887)

- Update dependencies.

# 0.26.0 [2020-12-17]
Expand Down
4 changes: 2 additions & 2 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ zeroize = "1"
ring = { version = "0.16.9", features = ["alloc", "std"], default-features = false }

[dev-dependencies]
async-std = "1.6.2"
async-std = { version = "1.6.2", features = ["attributes"] }
criterion = "0.3"
libp2p-mplex = { path = "../muxers/mplex" }
libp2p-noise = { path = "../protocols/noise" }
libp2p-tcp = { path = "../transports/tcp", features = ["async-std"] }
libp2p-tcp = { path = "../transports/tcp" }
multihash = { version = "0.13", default-features = false, features = ["arb"] }
quickcheck = "0.9.0"
wasm-timer = "0.2"
Expand Down
4 changes: 4 additions & 0 deletions core/src/connection/listeners.rs
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,8 @@ mod tests {
fn dial(self, _: Multiaddr) -> Result<Self::Dial, transport::TransportError<Self::Error>> {
panic!()
}

fn address_translation(&self, _: &Multiaddr, _: &Multiaddr) -> Option<Multiaddr> { None }
}

async_std::task::block_on(async move {
Expand Down Expand Up @@ -466,6 +468,8 @@ mod tests {
fn dial(self, _: Multiaddr) -> Result<Self::Dial, transport::TransportError<Self::Error>> {
panic!()
}

fn address_translation(&self, _: &Multiaddr, _: &Multiaddr) -> Option<Multiaddr> { None }
}

async_std::task::block_on(async move {
Expand Down
7 changes: 7 additions & 0 deletions core/src/either.rs
Original file line number Diff line number Diff line change
Expand Up @@ -477,4 +477,11 @@ where
},
}
}

fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
match self {
EitherTransport::Left(a) => a.address_translation(server, observed),
EitherTransport::Right(b) => b.address_translation(server, observed),
}
}
}
28 changes: 12 additions & 16 deletions core/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ use crate::{
Executor,
Multiaddr,
PeerId,
address_translation,
connection::{
ConnectionId,
ConnectionLimit,
Expand Down Expand Up @@ -176,30 +175,27 @@ where
self.listeners.listen_addrs()
}

/// Call this function in order to know which address remotes should dial to
/// access your local node.
/// Maps the given `observed_addr`, representing an address of the local
/// node observed by a remote peer, onto the locally known listen addresses
/// to yield one or more addresses of the local node that may be publicly
/// reachable.
///
/// When receiving an observed address on a tcp connection that we initiated, the observed
/// address contains our tcp dial port, not our tcp listen port. We know which port we are
/// listening on, thereby we can replace the port within the observed address.
///
/// When receiving an observed address on a tcp connection that we did **not** initiated, the
/// observed address should contain our listening port. In case it differs from our listening
/// port there might be a proxy along the path.
///
/// # Arguments
///
/// * `observed_addr` - should be an address a remote observes you as, which can be obtained for
/// example with the identify protocol.
/// I.e. this method incorporates the view of other peers into the listen
/// addresses seen by the local node to account for possible IP and port
/// mappings performed by intermediate network devices in an effort to
/// obtain addresses for the local peer that are also reachable for peers
/// other than the peer who reported the `observed_addr`.
///
/// The translation is transport-specific. See [`Transport::address_translation`].
pub fn address_translation<'a>(&'a self, observed_addr: &'a Multiaddr)
-> impl Iterator<Item = Multiaddr> + 'a
where
TMuxer: 'a,
THandler: 'a,
{
let transport = self.listeners.transport();
let mut addrs: Vec<_> = self.listen_addrs()
.filter_map(move |server| address_translation(server, observed_addr))
.filter_map(move |server| transport.address_translation(server, observed_addr))
.collect();

// remove duplicates
Expand Down
5 changes: 5 additions & 0 deletions core/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,11 @@ pub trait Transport {
where
Self: Sized;

/// Performs a transport-specific mapping of an address `observed` by
/// a remote onto a local `listen` address to yield an address for
/// the local node that may be reachable for other peers.
fn address_translation(&self, listen: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr>;

/// Boxes the transport, including custom transport errors.
fn boxed(self) -> boxed::Boxed<Self::Output>
where
Expand Down
4 changes: 4 additions & 0 deletions core/src/transport/and_then.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ where
};
Ok(future)
}

fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
self.transport.address_translation(server, observed)
}
}

/// Custom `Stream` to avoid boxing.
Expand Down
9 changes: 9 additions & 0 deletions core/src/transport/boxed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type ListenerUpgrade<O> = Pin<Box<dyn Future<Output = io::Result<O>> + Send>>;
trait Abstract<O> {
fn listen_on(&self, addr: Multiaddr) -> Result<Listener<O>, TransportError<io::Error>>;
fn dial(&self, addr: Multiaddr) -> Result<Dial<O>, TransportError<io::Error>>;
fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr>;
}

impl<T, O> Abstract<O> for T
Expand Down Expand Up @@ -78,6 +79,10 @@ where
.map_err(|e| e.map(box_err))?;
Ok(Box::pin(fut) as Dial<_>)
}

fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
Transport::address_translation(self, server, observed)
}
}

impl<O> fmt::Debug for Boxed<O> {
Expand Down Expand Up @@ -108,6 +113,10 @@ impl<O> Transport for Boxed<O> {
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
self.inner.dial(addr)
}

fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
self.inner.address_translation(server, observed)
}
}

fn box_err<E: Error + Send + Sync + 'static>(e: E) -> io::Error {
Expand Down
8 changes: 8 additions & 0 deletions core/src/transport/choice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,4 +74,12 @@ where

Err(TransportError::MultiaddrNotSupported(addr))
}

fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
if let Some(addr) = self.0.address_translation(server, observed) {
Some(addr)
} else {
self.1.address_translation(server, observed)
}
}
}
4 changes: 4 additions & 0 deletions core/src/transport/dummy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ impl<TOut> Transport for DummyTransport<TOut> {
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
Err(TransportError::MultiaddrNotSupported(addr))
}

fn address_translation(&self, _server: &Multiaddr, _observed: &Multiaddr) -> Option<Multiaddr> {
None
}
}

/// Implementation of `AsyncRead` and `AsyncWrite`. Not meant to be instanciated.
Expand Down
4 changes: 4 additions & 0 deletions core/src/transport/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ where
let p = ConnectedPoint::Dialer { address: addr };
Ok(MapFuture { inner: future, args: Some((self.fun, p)) })
}

fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
self.transport.address_translation(server, observed)
}
}

/// Custom `Stream` implementation to avoid boxing.
Expand Down
4 changes: 4 additions & 0 deletions core/src/transport/map_err.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ where
Err(err) => Err(err.map(map)),
}
}

fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
self.transport.address_translation(server, observed)
}
}

/// Listening stream for `MapErr`.
Expand Down
4 changes: 4 additions & 0 deletions core/src/transport/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,10 @@ impl Transport for MemoryTransport {

DialFuture::new(port).ok_or(TransportError::Other(MemoryTransportError::Unreachable))
}

fn address_translation(&self, _server: &Multiaddr, _observed: &Multiaddr) -> Option<Multiaddr> {
None
}
}

/// Error that can be produced from the `MemoryTransport`.
Expand Down
8 changes: 8 additions & 0 deletions core/src/transport/optional.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,4 +74,12 @@ where
Err(TransportError::MultiaddrNotSupported(addr))
}
}

fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
if let Some(inner) = &self.0 {
inner.address_translation(server, observed)
} else {
None
}
}
}
4 changes: 4 additions & 0 deletions core/src/transport/timeout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ where
timer: Delay::new(self.outgoing_timeout),
})
}

fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
self.inner.address_translation(server, observed)
}
}

// TODO: can be removed and replaced with an `impl Stream` once impl Trait is fully stable
Expand Down
8 changes: 8 additions & 0 deletions core/src/transport/upgrade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,10 @@ where
fn listen_on(self, addr: Multiaddr) -> Result<Self::Listener, TransportError<Self::Error>> {
self.0.listen_on(addr)
}

fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
self.0.address_translation(server, observed)
}
}

/// An inbound or outbound upgrade.
Expand Down Expand Up @@ -383,6 +387,10 @@ where
upgrade: self.upgrade
})
}

fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
self.inner.address_translation(server, observed)
}
}

/// Errors produced by a transport upgrade.
Expand Down
26 changes: 14 additions & 12 deletions core/tests/network_dial_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,12 @@ fn deny_incoming_connec() {
swarm1.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap();

let address = async_std::task::block_on(future::poll_fn(|cx| {
if let Poll::Ready(NetworkEvent::NewListenerAddress { listen_addr, .. }) = swarm1.poll(cx) {
Poll::Ready(listen_addr)
} else {
panic!("Was expecting the listen address to be reported")
match swarm1.poll(cx) {
Poll::Ready(NetworkEvent::NewListenerAddress { listen_addr, .. }) => {
Poll::Ready(listen_addr)
}
Poll::Pending => Poll::Pending,
_ => panic!("Was expecting the listen address to be reported"),
}
}));

Expand Down Expand Up @@ -95,15 +97,15 @@ fn dial_self() {
let mut swarm = test_network(NetworkConfig::default());
swarm.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap();

let (local_address, mut swarm) = async_std::task::block_on(
future::lazy(move |cx| {
if let Poll::Ready(NetworkEvent::NewListenerAddress { listen_addr, .. }) = swarm.poll(cx) {
Ok::<_, void::Void>((listen_addr, swarm))
} else {
panic!("Was expecting the listen address to be reported")
let local_address = async_std::task::block_on(future::poll_fn(|cx| {
match swarm.poll(cx) {
Poll::Ready(NetworkEvent::NewListenerAddress { listen_addr, .. }) => {
Poll::Ready(listen_addr)
}
}))
.unwrap();
Poll::Pending => Poll::Pending,
_ => panic!("Was expecting the listen address to be reported"),
}
}));

swarm.dial(&local_address, TestHandler()).unwrap();

Expand Down
14 changes: 9 additions & 5 deletions examples/chat-tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
//! --features="floodsub mplex noise tcp-tokio mdns-tokio"
//! ```
use futures::prelude::*;
use libp2p::{
Multiaddr,
NetworkBehaviour,
Expand Down Expand Up @@ -154,10 +153,15 @@ async fn main() -> Result<(), Box<dyn Error>> {
loop {
let to_publish = {
tokio::select! {
line = stdin.try_next() => Some((floodsub_topic.clone(), line?.expect("Stdin closed"))),
line = stdin.next_line() => {
let line = line?.expect("stdin closed");
Some((floodsub_topic.clone(), line))
}
event = swarm.next() => {
println!("New Event: {:?}", event);
None
// All events are handled by the `NetworkBehaviourEventProcess`es.
// I.e. the `swarm.next()` future drives the `Swarm` without ever
// terminating.
panic!("Unexpected event: {:?}", event);
}
}
};
Expand All @@ -171,4 +175,4 @@ async fn main() -> Result<(), Box<dyn Error>> {
}
}
}
}
}
2 changes: 1 addition & 1 deletion muxers/mplex/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ async-std = "1.7.0"
criterion = "0.3"
env_logger = "0.8"
futures = "0.3"
libp2p-tcp = { path = "../../transports/tcp", features = ["async-std"] }
libp2p-tcp = { path = "../../transports/tcp" }
libp2p-plaintext = { path = "../../protocols/plaintext" }
quickcheck = "0.9"
rand = "0.7"
Expand Down
2 changes: 1 addition & 1 deletion protocols/deflate/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,6 @@ flate2 = "1.0"

[dev-dependencies]
async-std = "1.6.2"
libp2p-tcp = { path = "../../transports/tcp", features = ["async-std"] }
libp2p-tcp = { path = "../../transports/tcp" }
quickcheck = "0.9"
rand = "0.7"
Loading

0 comments on commit ec0f8a3

Please sign in to comment.