From 31ae4221b709fdaaa3dcc40997cd1b93e086445e Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Mon, 5 Sep 2022 16:03:29 +0400 Subject: [PATCH] simplify IfWatcher integration port of https://github.com/libp2p/rust-libp2p/pull/2813 --- transports/webrtc/Cargo.toml | 2 +- transports/webrtc/src/in_addr.rs | 120 ----------------------------- transports/webrtc/src/lib.rs | 1 - transports/webrtc/src/transport.rs | 111 ++++++++++++-------------- transports/webrtc/tests/smoke.rs | 20 ++++- 5 files changed, 71 insertions(+), 183 deletions(-) delete mode 100644 transports/webrtc/src/in_addr.rs diff --git a/transports/webrtc/Cargo.toml b/transports/webrtc/Cargo.toml index d635189670a..f5ef7cd3891 100644 --- a/transports/webrtc/Cargo.toml +++ b/transports/webrtc/Cargo.toml @@ -17,7 +17,7 @@ futures = "0.3" futures-lite = "1" futures-timer = "3" hex = "0.4" -if-watch = "0.2" +if-watch = "2.0" libp2p-core = { version = "0.35.0", path = "../../core", default-features = false } libp2p-noise = { version = "0.38.0", path = "../../transports/noise" } log = "0.4" diff --git a/transports/webrtc/src/in_addr.rs b/transports/webrtc/src/in_addr.rs deleted file mode 100644 index fd642d16dc5..00000000000 --- a/transports/webrtc/src/in_addr.rs +++ /dev/null @@ -1,120 +0,0 @@ -// Copyright 2022 Parity Technologies (UK) Ltd. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the "Software"), -// to deal in the Software without restriction, including without limitation -// the rights to use, copy, modify, merge, publish, distribute, sublicense, -// and/or sell copies of the Software, and to permit persons to whom the -// Software is furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -// DEALINGS IN THE SOFTWARE. - -use if_watch::{IfEvent, IfWatcher}; - -use futures::{ - future::{BoxFuture, FutureExt}, - stream::Stream, -}; - -use std::{ - io::Result, - net::IpAddr, - ops::DerefMut, - pin::Pin, - task::{Context, Poll}, -}; - -/// Watches for interface changes. -#[derive(Debug)] -pub enum InAddr { - /// The socket accepts connections on a single interface. - One { ip: Option }, - /// The socket accepts connections on all interfaces. - Any { if_watch: Box }, -} - -impl InAddr { - /// If ip is specified then only one `IfEvent::Up` with IpNet(ip)/32 will be generated. - /// If ip is unspecified then `IfEvent::Up/Down` events will be generated for all interfaces. - pub fn new(ip: IpAddr) -> Self { - if ip.is_unspecified() { - let watcher = IfWatch::Pending(IfWatcher::new().boxed()); - InAddr::Any { - if_watch: Box::new(watcher), - } - } else { - InAddr::One { ip: Some(ip) } - } - } -} - -pub enum IfWatch { - Pending(BoxFuture<'static, std::io::Result>), - Ready(Box), -} - -impl std::fmt::Debug for IfWatch { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - match *self { - IfWatch::Pending(_) => write!(f, "Pending"), - IfWatch::Ready(_) => write!(f, "Ready"), - } - } -} -impl Stream for InAddr { - type Item = Result; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - let me = Pin::into_inner(self); - loop { - match me { - // If the listener is bound to a single interface, make sure the - // address is reported once. - InAddr::One { ip } => { - if let Some(ip) = ip.take() { - return Poll::Ready(Some(Ok(IfEvent::Up(ip.into())))); - } - } - InAddr::Any { if_watch } => { - match if_watch.deref_mut() { - // If we listen on all interfaces, wait for `if-watch` to be ready. - IfWatch::Pending(f) => match futures::ready!(f.poll_unpin(cx)) { - Ok(watcher) => { - *if_watch = Box::new(IfWatch::Ready(Box::new(watcher))); - continue; - } - Err(err) => { - *if_watch = Box::new(IfWatch::Pending(IfWatcher::new().boxed())); - return Poll::Ready(Some(Err(err))); - } - }, - // Consume all events for up/down interface changes. - IfWatch::Ready(watcher) => { - if let Poll::Ready(ev) = watcher.poll_unpin(cx) { - match ev { - Ok(event) => { - return Poll::Ready(Some(Ok(event))); - } - Err(err) => { - return Poll::Ready(Some(Err(err))); - } - } - } - } - } - } - } - break; - } - Poll::Pending - } -} diff --git a/transports/webrtc/src/lib.rs b/transports/webrtc/src/lib.rs index 1ae95f8a29e..8a57dbac607 100644 --- a/transports/webrtc/src/lib.rs +++ b/transports/webrtc/src/lib.rs @@ -85,7 +85,6 @@ pub mod error; pub mod transport; mod fingerprint; -mod in_addr; mod req_res_chan; mod sdp; mod udp_mux; diff --git a/transports/webrtc/src/transport.rs b/transports/webrtc/src/transport.rs index 00358b3bea4..1624d69457d 100644 --- a/transports/webrtc/src/transport.rs +++ b/transports/webrtc/src/transport.rs @@ -28,7 +28,7 @@ use futures::{ stream::Stream, TryFutureExt, }; -use if_watch::IfEvent; +use if_watch::{IfEvent, IfWatcher}; use libp2p_core::{ identity, multiaddr::{Multiaddr, Protocol}, @@ -54,7 +54,6 @@ use crate::{ connection::PollDataChannel, error::Error, fingerprint::Fingerprint, - in_addr::InAddr, udp_mux::{UDPMuxEvent, UDPMuxNewAddr}, webrtc_connection::WebRTCConnection, }; @@ -107,13 +106,16 @@ impl WebRTCTransport { let udp_mux = UDPMuxNewAddr::new(socket); - Ok(WebRTCListenStream::new( + return Ok(WebRTCListenStream::new( listener_id, listen_addr, self.config.clone(), udp_mux, self.id_keys.clone(), - )) + IfWatcher::new() + .map_err(Error::IoError) + .map_err(TransportError::Other)?, + )); } } @@ -247,13 +249,6 @@ pub struct WebRTCListenStream { /// when listening on all interfaces for IPv4 respectively IPv6 connections. listen_addr: SocketAddr, - /// The IP addresses of network interfaces on which the listening socket - /// is accepting connections. - /// - /// If the listen socket listens on all interfaces, these may change over - /// time as interfaces become available or unavailable. - in_addr: InAddr, - /// The config which holds this peer's certificate(s). config: WebRTCConfiguration, @@ -268,6 +263,13 @@ pub struct WebRTCListenStream { /// Optionally contains a [`TransportEvent::ListenerClosed`] that should be /// reported before the listener's stream is terminated. report_closed: Option::Item>>, + + /// Watcher for network interface changes. + /// Reports [`IfEvent`]s for new / deleted ip-addresses when interfaces + /// become or stop being available. + /// + /// `None` if the socket is only listening on a single interface. + if_watcher: IfWatcher, } impl WebRTCListenStream { @@ -278,17 +280,16 @@ impl WebRTCListenStream { config: WebRTCConfiguration, udp_mux: UDPMuxNewAddr, id_keys: identity::Keypair, + if_watcher: IfWatcher, ) -> Self { - let in_addr = InAddr::new(listen_addr.ip()); - WebRTCListenStream { listener_id, listen_addr, - in_addr, config, udp_mux, id_keys, report_closed: None, + if_watcher, } } @@ -309,58 +310,48 @@ impl WebRTCListenStream { } } - /// Poll for a next If Event. - fn poll_if_addr(&mut self, cx: &mut Context<'_>) -> Poll<::Item> { - loop { - let mut item = ready!(self.in_addr.poll_next_unpin(cx)); - if let Some(item) = item.take() { - // Consume all events for up/down interface changes. - match item { - Ok(IfEvent::Up(inet)) => { - let ip = inet.addr(); - if self.listen_addr.is_ipv4() == ip.is_ipv4() - || self.listen_addr.is_ipv6() == ip.is_ipv6() - { - let socket_addr = SocketAddr::new(ip, self.listen_addr.port()); - let ma = socketaddr_to_multiaddr(&socket_addr); - debug!("New listen address: {}", ma); - return Poll::Ready(TransportEvent::NewAddress { - listener_id: self.listener_id, - listen_addr: ma, - }); - } else { - continue; - } - } - Ok(IfEvent::Down(inet)) => { - let ip = inet.addr(); - if self.listen_addr.is_ipv4() == ip.is_ipv4() - || self.listen_addr.is_ipv6() == ip.is_ipv6() - { - let socket_addr = SocketAddr::new(ip, self.listen_addr.port()); - let ma = socketaddr_to_multiaddr(&socket_addr); - debug!("Expired listen address: {}", ma); - return Poll::Ready(TransportEvent::AddressExpired { - listener_id: self.listener_id, - listen_addr: ma, - }); - } else { - continue; - } + fn poll_if_watcher(&mut self, cx: &mut Context<'_>) -> Poll<::Item> { + while let Poll::Ready(event) = self.if_watcher.poll_if_event(cx) { + match event { + Ok(IfEvent::Up(inet)) => { + let ip = inet.addr(); + if self.listen_addr.is_ipv4() == ip.is_ipv4() + || self.listen_addr.is_ipv6() == ip.is_ipv6() + { + let socket_addr = SocketAddr::new(ip, self.listen_addr.port()); + let ma = socketaddr_to_multiaddr(&socket_addr); + log::debug!("New listen address: {}", ma); + return Poll::Ready(TransportEvent::NewAddress { + listener_id: self.listener_id, + listen_addr: ma, + }); } - Err(err) => { - debug! { - "Failure polling interfaces: {:?}.", - err - }; - return Poll::Ready(TransportEvent::ListenerError { + } + Ok(IfEvent::Down(inet)) => { + let ip = inet.addr(); + if self.listen_addr.is_ipv4() == ip.is_ipv4() + || self.listen_addr.is_ipv6() == ip.is_ipv6() + { + let socket_addr = SocketAddr::new(ip, self.listen_addr.port()); + let ma = socketaddr_to_multiaddr(&socket_addr); + log::debug!("Expired listen address: {}", ma); + return Poll::Ready(TransportEvent::AddressExpired { listener_id: self.listener_id, - error: err.into(), + listen_addr: ma, }); } } + Err(err) => { + log::debug!("Error when polling network interfaces {}", err); + return Poll::Ready(TransportEvent::ListenerError { + listener_id: self.listener_id, + error: err.into(), + }); + } } } + + Poll::Pending } } @@ -376,7 +367,7 @@ impl Stream for WebRTCListenStream { return Poll::Ready(closed.take()); } - if let Poll::Ready(event) = self.poll_if_addr(cx) { + if let Poll::Ready(event) = self.poll_if_watcher(cx) { return Poll::Ready(Some(event)); } diff --git a/transports/webrtc/tests/smoke.rs b/transports/webrtc/tests/smoke.rs index 413d6dff1d4..eae884867cf 100644 --- a/transports/webrtc/tests/smoke.rs +++ b/transports/webrtc/tests/smoke.rs @@ -75,6 +75,9 @@ async fn smoke() -> Result<()> { e => panic!("{:?}", e), }; + // skip other interface addresses + while let Some(_) = a.next().now_or_never() {} + let addr = addr.with(Protocol::Certhash(fingerprint2multihash(&a_fingerprint))); let _ = match b.next().await { @@ -82,6 +85,9 @@ async fn smoke() -> Result<()> { e => panic!("{:?}", e), }; + // skip other interface addresses + while let Some(_) = b.next().now_or_never() {} + let mut data = vec![0; 4096]; rng.fill_bytes(&mut data); @@ -312,6 +318,9 @@ async fn dial_failure() -> Result<()> { e => panic!("{:?}", e), }; + // skip other interface addresses + while let Some(_) = a.next().now_or_never() {} + let addr = addr.with(Protocol::Certhash(fingerprint2multihash(&a_fingerprint))); let _ = match b.next().await { @@ -319,6 +328,9 @@ async fn dial_failure() -> Result<()> { e => panic!("{:?}", e), }; + // skip other interface addresses + while let Some(_) = b.next().now_or_never() {} + let a_peer_id = &Swarm::local_peer_id(&a).clone(); drop(a); // stop a swarm so b can never reach it @@ -361,7 +373,7 @@ async fn concurrent_connections_and_streams() { } let mut pool = futures::executor::LocalPool::default(); - let mut data = vec![0; 4096 * 10]; + let mut data = vec![0; 4096]; rand::thread_rng().fill_bytes(&mut data); let mut listeners = vec![]; @@ -416,6 +428,9 @@ async fn concurrent_connections_and_streams() { log::debug!("listener ResponseSent"); } Some(SwarmEvent::ConnectionClosed { .. }) => {} + Some(SwarmEvent::NewListenAddr { .. }) => { + log::debug!("listener NewListenAddr"); + } Some(e) => { panic!("unexpected event {:?}", e); } @@ -489,6 +504,9 @@ async fn concurrent_connections_and_streams() { Some(SwarmEvent::ConnectionClosed { .. }) => { log::debug!("dialer ConnectionClosed"); } + Some(SwarmEvent::NewListenAddr { .. }) => { + log::debug!("dialer NewListenAddr"); + } e => { panic!("unexpected event {:?}", e); }