From 4ffa438d06f536f11ee760bc2933b0eba6f6cc08 Mon Sep 17 00:00:00 2001 From: arsenron Date: Tue, 8 Aug 2023 13:47:51 +0300 Subject: [PATCH 1/9] Reuse local listener instead of creating a new one --- transports/quic/src/transport.rs | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/transports/quic/src/transport.rs b/transports/quic/src/transport.rs index 7e0a4a812f7..193deb60758 100644 --- a/transports/quic/src/transport.rs +++ b/transports/quic/src/transport.rs @@ -155,9 +155,8 @@ impl GenTransport

{ if l.is_closed { return false; } - let listen_addr = l.socket_addr(); - SocketFamily::is_same(&listen_addr.ip(), &socket_addr.ip()) - && listen_addr.ip().is_loopback() == socket_addr.ip().is_loopback() + SocketFamily::is_same(&l.socket_addr().ip(), &socket_addr.ip()) + && l.is_loopback == socket_addr.ip().is_loopback() }) .collect(); match listeners.len() { @@ -428,6 +427,9 @@ struct Listener { /// The stream must be awaken after it has been closed to deliver the last event. close_listener_waker: Option, + + /// `true` if a listener supports loopback interface + is_loopback: bool, } impl Listener

{ @@ -440,6 +442,7 @@ impl Listener

{ ) -> Result { let if_watcher; let pending_event; + let mut is_loopback = false; let local_addr = socket.local_addr()?; if local_addr.ip().is_unspecified() { if_watcher = Some(P::new_if_watcher()?); @@ -447,6 +450,9 @@ impl Listener

{ } else { if_watcher = None; let ma = socketaddr_to_multiaddr(&local_addr, version); + if local_addr.ip().is_loopback() { + is_loopback = true + } pending_event = Some(TransportEvent::NewAddress { listener_id, listen_addr: ma, @@ -467,6 +473,7 @@ impl Listener

{ is_closed: false, pending_event, close_listener_waker: None, + is_loopback, }) } @@ -514,6 +521,7 @@ impl Listener

{ ip_to_listenaddr(&endpoint_addr, inet.addr(), self.version) { log::debug!("New listen address: {}", listen_addr); + self.is_loopback = true; return Poll::Ready(TransportEvent::NewAddress { listener_id: self.listener_id, listen_addr, From 1f51c7f87aa18696f778857e974f538e19052e3c Mon Sep 17 00:00:00 2001 From: arsenron Date: Tue, 8 Aug 2023 13:55:17 +0300 Subject: [PATCH 2/9] Fix logic inside ifwatcher --- transports/quic/src/transport.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/transports/quic/src/transport.rs b/transports/quic/src/transport.rs index 193deb60758..074b9b40128 100644 --- a/transports/quic/src/transport.rs +++ b/transports/quic/src/transport.rs @@ -520,8 +520,10 @@ impl Listener

{ if let Some(listen_addr) = ip_to_listenaddr(&endpoint_addr, inet.addr(), self.version) { - log::debug!("New listen address: {}", listen_addr); - self.is_loopback = true; + log::debug!("New listen address: {listen_addr}"); + if inet.addr().is_loopback() { + self.is_loopback = true; + } return Poll::Ready(TransportEvent::NewAddress { listener_id: self.listener_id, listen_addr, @@ -532,7 +534,7 @@ impl Listener

{ if let Some(listen_addr) = ip_to_listenaddr(&endpoint_addr, inet.addr(), self.version) { - log::debug!("Expired listen address: {}", listen_addr); + log::debug!("Expired listen address: {listen_addr}"); return Poll::Ready(TransportEvent::AddressExpired { listener_id: self.listener_id, listen_addr, From 9b25a624573cfbfb59eac24f935fc1845cca033f Mon Sep 17 00:00:00 2001 From: arsenron Date: Tue, 8 Aug 2023 14:16:39 +0300 Subject: [PATCH 3/9] Fix eligible_listener --- transports/quic/src/transport.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/transports/quic/src/transport.rs b/transports/quic/src/transport.rs index 074b9b40128..d33903b9478 100644 --- a/transports/quic/src/transport.rs +++ b/transports/quic/src/transport.rs @@ -156,7 +156,13 @@ impl GenTransport

{ return false; } SocketFamily::is_same(&l.socket_addr().ip(), &socket_addr.ip()) - && l.is_loopback == socket_addr.ip().is_loopback() + }) + .filter(|l| { + if socket_addr.ip().is_loopback() { + l.is_loopback + } else { + true + } }) .collect(); match listeners.len() { From 1de9a9ce2e7b1dfceb5ee6826e15df82488d4dce Mon Sep 17 00:00:00 2001 From: arsenron Date: Tue, 8 Aug 2023 14:27:25 +0300 Subject: [PATCH 4/9] Rename to `supports_loopback` --- transports/quic/src/transport.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/transports/quic/src/transport.rs b/transports/quic/src/transport.rs index d33903b9478..2e144a45000 100644 --- a/transports/quic/src/transport.rs +++ b/transports/quic/src/transport.rs @@ -159,7 +159,7 @@ impl GenTransport

{ }) .filter(|l| { if socket_addr.ip().is_loopback() { - l.is_loopback + l.supports_loopback } else { true } @@ -435,7 +435,7 @@ struct Listener { close_listener_waker: Option, /// `true` if a listener supports loopback interface - is_loopback: bool, + supports_loopback: bool, } impl Listener

{ @@ -448,7 +448,7 @@ impl Listener

{ ) -> Result { let if_watcher; let pending_event; - let mut is_loopback = false; + let mut supports_loopback = false; let local_addr = socket.local_addr()?; if local_addr.ip().is_unspecified() { if_watcher = Some(P::new_if_watcher()?); @@ -457,7 +457,7 @@ impl Listener

{ if_watcher = None; let ma = socketaddr_to_multiaddr(&local_addr, version); if local_addr.ip().is_loopback() { - is_loopback = true + supports_loopback = true } pending_event = Some(TransportEvent::NewAddress { listener_id, @@ -479,7 +479,7 @@ impl Listener

{ is_closed: false, pending_event, close_listener_waker: None, - is_loopback, + supports_loopback, }) } @@ -528,7 +528,7 @@ impl Listener

{ { log::debug!("New listen address: {listen_addr}"); if inet.addr().is_loopback() { - self.is_loopback = true; + self.supports_loopback = true; } return Poll::Ready(TransportEvent::NewAddress { listener_id: self.listener_id, From aee6ddb5f235d624d880f106f9e29d413581d856 Mon Sep 17 00:00:00 2001 From: arsenron Date: Thu, 10 Aug 2023 18:35:18 +0300 Subject: [PATCH 5/9] Add test --- transports/quic/src/transport.rs | 84 +++++++++++++++++++++++++++++++- 1 file changed, 83 insertions(+), 1 deletion(-) diff --git a/transports/quic/src/transport.rs b/transports/quic/src/transport.rs index 2e144a45000..dd810dfda20 100644 --- a/transports/quic/src/transport.rs +++ b/transports/quic/src/transport.rs @@ -746,7 +746,7 @@ fn socketaddr_to_multiaddr(socket_addr: &SocketAddr, version: ProtocolVersion) - #[cfg(test)] #[cfg(any(feature = "async-std", feature = "tokio"))] -mod test { +mod tests { use futures::future::poll_fn; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; @@ -942,4 +942,86 @@ mod test { ) .unwrap(); } + + /// - A listens on 0.0.0.0:0 + /// - B listens on 127.0.0.1:0 + /// - A dials B + /// - Source port of A at B is the A's listen port + #[allow(non_snake_case)] + #[cfg(feature = "tokio")] + #[tokio::test] + async fn test_local_listener_reuse() { + let keypair = libp2p_identity::Keypair::generate_ed25519(); + let mut node_A = crate::tokio::Transport::new(Config::new(&keypair)); + + let keypair = libp2p_identity::Keypair::generate_ed25519(); + let mut node_B = crate::tokio::Transport::new(Config::new(&keypair)); + + node_B + .listen_on( + ListenerId::next(), + "/ip4/127.0.0.1/udp/0/quic-v1".try_into().unwrap(), + ) + .unwrap(); + + node_A + .listen_on( + ListenerId::next(), + "/ip4/0.0.0.0/udp/0/quic-v1".try_into().unwrap(), + ) + .unwrap(); + + // node_A: wait until a listener reports a loopback address + poll_fn(|cx| { + let mut pinned = Pin::new(&mut node_A); + while let Poll::Ready(ev) = pinned.as_mut().poll(cx) { + if let TransportEvent::NewAddress { listen_addr, .. } = ev { + if multiaddr_to_socketaddr(&listen_addr, false) + .unwrap() + .0 + .ip() + .is_loopback() + { + return Poll::Ready(()); + } + } + } + + Poll::Pending + }) + .await; + + let listener_addr = |node: &GenTransport<_>| { + node.listeners + .iter() + .next() + .unwrap() + .socket + .local_addr() + .unwrap() + }; + + let node_B_multiaddr = + socketaddr_to_multiaddr(&listener_addr(&node_B), ProtocolVersion::V1); + + // node A dials node B + node_A.dial(node_B_multiaddr).unwrap().await.unwrap(); + + // Verify that node B received node A listening port + poll_fn(|cx| { + let mut pinned = Pin::new(&mut node_B); + while let Poll::Ready(e) = pinned.as_mut().poll(cx) { + // node B received connection incoming from node A + if let TransportEvent::Incoming { send_back_addr, .. } = e { + let (socket_addr, ..) = + multiaddr_to_socketaddr(&send_back_addr, false).unwrap(); + let node_A_listener_port = listener_addr(&node_A).port(); + assert_eq!(socket_addr.port(), node_A_listener_port); + return Poll::Ready(()); + } + } + Poll::Pending + }) + .await; + } } From fa8622da9a6af22af24b2f9ba794e3e2c7e40192 Mon Sep 17 00:00:00 2001 From: arsenron Date: Fri, 11 Aug 2023 12:09:58 +0300 Subject: [PATCH 6/9] Simplify and move test to smoke --- transports/quic/src/transport.rs | 82 -------------------------------- transports/quic/tests/smoke.rs | 43 +++++++++++++++++ 2 files changed, 43 insertions(+), 82 deletions(-) diff --git a/transports/quic/src/transport.rs b/transports/quic/src/transport.rs index dd810dfda20..b6b91971659 100644 --- a/transports/quic/src/transport.rs +++ b/transports/quic/src/transport.rs @@ -942,86 +942,4 @@ mod tests { ) .unwrap(); } - - /// - A listens on 0.0.0.0:0 - /// - B listens on 127.0.0.1:0 - /// - A dials B - /// - Source port of A at B is the A's listen port - #[allow(non_snake_case)] - #[cfg(feature = "tokio")] - #[tokio::test] - async fn test_local_listener_reuse() { - let keypair = libp2p_identity::Keypair::generate_ed25519(); - let mut node_A = crate::tokio::Transport::new(Config::new(&keypair)); - - let keypair = libp2p_identity::Keypair::generate_ed25519(); - let mut node_B = crate::tokio::Transport::new(Config::new(&keypair)); - - node_B - .listen_on( - ListenerId::next(), - "/ip4/127.0.0.1/udp/0/quic-v1".try_into().unwrap(), - ) - .unwrap(); - - node_A - .listen_on( - ListenerId::next(), - "/ip4/0.0.0.0/udp/0/quic-v1".try_into().unwrap(), - ) - .unwrap(); - - // node_A: wait until a listener reports a loopback address - poll_fn(|cx| { - let mut pinned = Pin::new(&mut node_A); - while let Poll::Ready(ev) = pinned.as_mut().poll(cx) { - if let TransportEvent::NewAddress { listen_addr, .. } = ev { - if multiaddr_to_socketaddr(&listen_addr, false) - .unwrap() - .0 - .ip() - .is_loopback() - { - return Poll::Ready(()); - } - } - } - - Poll::Pending - }) - .await; - - let listener_addr = |node: &GenTransport<_>| { - node.listeners - .iter() - .next() - .unwrap() - .socket - .local_addr() - .unwrap() - }; - - let node_B_multiaddr = - socketaddr_to_multiaddr(&listener_addr(&node_B), ProtocolVersion::V1); - - // node A dials node B - node_A.dial(node_B_multiaddr).unwrap().await.unwrap(); - - // Verify that node B received node A listening port - poll_fn(|cx| { - let mut pinned = Pin::new(&mut node_B); - while let Poll::Ready(e) = pinned.as_mut().poll(cx) { - // node B received connection incoming from node A - if let TransportEvent::Incoming { send_back_addr, .. } = e { - let (socket_addr, ..) = - multiaddr_to_socketaddr(&send_back_addr, false).unwrap(); - let node_A_listener_port = listener_addr(&node_A).port(); - assert_eq!(socket_addr.port(), node_A_listener_port); - return Poll::Ready(()); - } - } - Poll::Pending - }) - .await; - } } diff --git a/transports/quic/tests/smoke.rs b/transports/quic/tests/smoke.rs index 93adfa68013..4a9e0bb6dc9 100644 --- a/transports/quic/tests/smoke.rs +++ b/transports/quic/tests/smoke.rs @@ -414,6 +414,49 @@ async fn write_after_peer_dropped_stream() { stream_b.close().await.expect("Close failed."); } +/// - A listens on 0.0.0.0:0 +/// - B listens on 127.0.0.1:0 +/// - A dials B +/// - Source port of A at B is the A's listen port +#[cfg(feature = "tokio")] +#[tokio::test] +async fn test_local_listener_reuse() { + let (_, mut a_transport) = create_default_transport::(); + let (_, mut b_transport) = create_default_transport::(); + + a_transport + .listen_on( + ListenerId::next(), + "/ip4/0.0.0.0/udp/0/quic-v1".parse().unwrap(), + ) + .unwrap(); + + // wait until a listener reports a loopback address + let a_addr = 'outer: loop { + let ev = a_transport.next().await.unwrap(); + let listen_addr = ev.into_new_address().unwrap(); + for proto in listen_addr.iter() { + if let Protocol::Ip4(ip4) = proto { + if ip4.is_loopback() { + break 'outer listen_addr; + } + } + } + }; + // If we do not poll until the end, `NewAddress` events may be `Ready` and `connect` function + // below will panic due to an unexpected event. + poll_fn(|cx| { + let mut pinned = Pin::new(&mut a_transport); + while let Poll::Ready(e) = pinned.as_mut().poll(cx) {} + Poll::Ready(()) + }) + .await; + + let b_addr = start_listening(&mut b_transport, "/ip4/127.0.0.1/udp/0/quic-v1").await; + let (_, send_back_addr, _) = connect(&mut b_transport, &mut a_transport, b_addr).await.0; + assert_eq!(send_back_addr, a_addr); +} + async fn smoke() { let _ = env_logger::try_init(); From 3f604046e2e40faf3bf1e973213df1c2b9488749 Mon Sep 17 00:00:00 2001 From: arsenron Date: Fri, 11 Aug 2023 12:28:23 +0300 Subject: [PATCH 7/9] Clean the test --- transports/quic/tests/smoke.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/transports/quic/tests/smoke.rs b/transports/quic/tests/smoke.rs index 4a9e0bb6dc9..3043373da83 100644 --- a/transports/quic/tests/smoke.rs +++ b/transports/quic/tests/smoke.rs @@ -432,7 +432,7 @@ async fn test_local_listener_reuse() { .unwrap(); // wait until a listener reports a loopback address - let a_addr = 'outer: loop { + let a_listen_addr = 'outer: loop { let ev = a_transport.next().await.unwrap(); let listen_addr = ev.into_new_address().unwrap(); for proto in listen_addr.iter() { @@ -447,14 +447,14 @@ async fn test_local_listener_reuse() { // below will panic due to an unexpected event. poll_fn(|cx| { let mut pinned = Pin::new(&mut a_transport); - while let Poll::Ready(e) = pinned.as_mut().poll(cx) {} + while let Poll::Ready(_) = pinned.as_mut().poll(cx) {} Poll::Ready(()) }) .await; let b_addr = start_listening(&mut b_transport, "/ip4/127.0.0.1/udp/0/quic-v1").await; let (_, send_back_addr, _) = connect(&mut b_transport, &mut a_transport, b_addr).await.0; - assert_eq!(send_back_addr, a_addr); + assert_eq!(send_back_addr, a_listen_addr); } async fn smoke() { From 159208a9ece08a635bf4a60636fd6ac44df0a63d Mon Sep 17 00:00:00 2001 From: arsenron Date: Fri, 11 Aug 2023 13:11:48 +0300 Subject: [PATCH 8/9] Make logic more reliable --- transports/quic/src/transport.rs | 22 ++++++++++------------ transports/quic/tests/smoke.rs | 2 +- 2 files changed, 11 insertions(+), 13 deletions(-) diff --git a/transports/quic/src/transport.rs b/transports/quic/src/transport.rs index b6b91971659..9f025cd63fc 100644 --- a/transports/quic/src/transport.rs +++ b/transports/quic/src/transport.rs @@ -39,7 +39,7 @@ use libp2p_core::{ use libp2p_identity::PeerId; use socket2::{Domain, Socket, Type}; use std::collections::hash_map::{DefaultHasher, Entry}; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::hash::{Hash, Hasher}; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, UdpSocket}; use std::time::Duration; @@ -159,7 +159,9 @@ impl GenTransport

{ }) .filter(|l| { if socket_addr.ip().is_loopback() { - l.supports_loopback + l.listening_addresses + .iter() + .any(|ip_addr| ip_addr.is_loopback()) } else { true } @@ -434,8 +436,7 @@ struct Listener { /// The stream must be awaken after it has been closed to deliver the last event. close_listener_waker: Option, - /// `true` if a listener supports loopback interface - supports_loopback: bool, + listening_addresses: HashSet, } impl Listener

{ @@ -448,17 +449,15 @@ impl Listener

{ ) -> Result { let if_watcher; let pending_event; - let mut supports_loopback = false; + let mut listening_addresses = HashSet::new(); let local_addr = socket.local_addr()?; if local_addr.ip().is_unspecified() { if_watcher = Some(P::new_if_watcher()?); pending_event = None; } else { if_watcher = None; + listening_addresses.insert(local_addr.ip()); let ma = socketaddr_to_multiaddr(&local_addr, version); - if local_addr.ip().is_loopback() { - supports_loopback = true - } pending_event = Some(TransportEvent::NewAddress { listener_id, listen_addr: ma, @@ -479,7 +478,7 @@ impl Listener

{ is_closed: false, pending_event, close_listener_waker: None, - supports_loopback, + listening_addresses, }) } @@ -527,9 +526,7 @@ impl Listener

{ ip_to_listenaddr(&endpoint_addr, inet.addr(), self.version) { log::debug!("New listen address: {listen_addr}"); - if inet.addr().is_loopback() { - self.supports_loopback = true; - } + self.listening_addresses.insert(inet.addr()); return Poll::Ready(TransportEvent::NewAddress { listener_id: self.listener_id, listen_addr, @@ -541,6 +538,7 @@ impl Listener

{ ip_to_listenaddr(&endpoint_addr, inet.addr(), self.version) { log::debug!("Expired listen address: {listen_addr}"); + self.listening_addresses.remove(&inet.addr()); return Poll::Ready(TransportEvent::AddressExpired { listener_id: self.listener_id, listen_addr, diff --git a/transports/quic/tests/smoke.rs b/transports/quic/tests/smoke.rs index 3043373da83..a0c43b9ebdd 100644 --- a/transports/quic/tests/smoke.rs +++ b/transports/quic/tests/smoke.rs @@ -447,7 +447,7 @@ async fn test_local_listener_reuse() { // below will panic due to an unexpected event. poll_fn(|cx| { let mut pinned = Pin::new(&mut a_transport); - while let Poll::Ready(_) = pinned.as_mut().poll(cx) {} + while pinned.as_mut().poll(cx).is_ready() {} Poll::Ready(()) }) .await; From 7d341381981af8bda6d496fa41cd77448c6c9a5e Mon Sep 17 00:00:00 2001 From: Max Inden Date: Fri, 11 Aug 2023 13:47:42 +0200 Subject: [PATCH 9/9] chore: bump version and add changelog entry --- Cargo.lock | 2 +- Cargo.toml | 2 +- transports/quic/CHANGELOG.md | 7 +++++++ transports/quic/Cargo.toml | 2 +- 4 files changed, 10 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5f5026f1afc..979bc43d8d9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3058,7 +3058,7 @@ dependencies = [ [[package]] name = "libp2p-quic" -version = "0.9.1-alpha" +version = "0.9.2-alpha" dependencies = [ "async-std", "bytes", diff --git a/Cargo.toml b/Cargo.toml index 6b4bd5ebe09..fb503a12e61 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -85,7 +85,7 @@ libp2p-perf = { version = "0.2.0", path = "protocols/perf" } libp2p-ping = { version = "0.43.0", path = "protocols/ping" } libp2p-plaintext = { version = "0.40.0", path = "transports/plaintext" } libp2p-pnet = { version = "0.23.0", path = "transports/pnet" } -libp2p-quic = { version = "0.9.1-alpha", path = "transports/quic" } +libp2p-quic = { version = "0.9.2-alpha", path = "transports/quic" } libp2p-relay = { version = "0.16.1", path = "protocols/relay" } libp2p-rendezvous = { version = "0.13.0", path = "protocols/rendezvous" } libp2p-request-response = { version = "0.25.1", path = "protocols/request-response" } diff --git a/transports/quic/CHANGELOG.md b/transports/quic/CHANGELOG.md index 5fc4ccc1cdf..66a6dec3a2e 100644 --- a/transports/quic/CHANGELOG.md +++ b/transports/quic/CHANGELOG.md @@ -1,3 +1,10 @@ +## 0.9.2-alpha + +- Add support for reusing an existing socket when dialing localhost address. + See [PR 4304]. + +[PR 4304]: https://github.com/libp2p/rust-libp2p/pull/4304 + ## 0.9.1-alpha - Allow listening on ipv4 and ipv6 separately. diff --git a/transports/quic/Cargo.toml b/transports/quic/Cargo.toml index 67f131aeb31..9dba8c692c5 100644 --- a/transports/quic/Cargo.toml +++ b/transports/quic/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "libp2p-quic" -version = "0.9.1-alpha" +version = "0.9.2-alpha" authors = ["Parity Technologies "] edition = "2021" rust-version = { workspace = true }