diff --git a/transports/webrtc/Cargo.toml b/transports/webrtc/Cargo.toml index f721e36891a..d7adb8e7feb 100644 --- a/transports/webrtc/Cargo.toml +++ b/transports/webrtc/Cargo.toml @@ -18,8 +18,8 @@ futures-lite = "1" futures-timer = "3" hex = "0.4" if-watch = "0.2" -libp2p-core = { version = "0.33.0", path = "../../core", default-features = false } -libp2p-noise = { version = "0.36.0", path = "../../transports/noise" } +libp2p-core = { version = "0.34.0", path = "../../core", default-features = false } +libp2p-noise = { version = "0.37.0", path = "../../transports/noise" } log = "0.4" serde = { version = "1.0", features = ["derive"] } stun = "0.4" @@ -35,8 +35,8 @@ webrtc-util = { version = "0.5.3", default-features = false, features = ["conn", [dev-dependencies] anyhow = "1.0" env_logger = "0.9" -libp2p-request-response = { version = "0.18.0", path = "../../protocols/request-response" } -libp2p-swarm = { version = "0.36.0", path = "../../swarm" } +libp2p-request-response = { version = "0.20.0", path = "../../protocols/request-response" } +libp2p-swarm = { version = "0.38.0", path = "../../swarm" } rand = "0.8" rand_core = "0.5" rcgen = "0.9" diff --git a/transports/webrtc/src/connection.rs b/transports/webrtc/src/connection.rs index 0e7a4b24f2e..0de8b50c586 100644 --- a/transports/webrtc/src/connection.rs +++ b/transports/webrtc/src/connection.rs @@ -33,7 +33,6 @@ use webrtc::peer_connection::RTCPeerConnection; use webrtc_data::data_channel::DataChannel as DetachedDataChannel; use std::io; -use std::pin::Pin; use std::sync::{Arc, Mutex as StdMutex}; use std::task::{Context, Poll}; @@ -226,47 +225,47 @@ impl<'a> StreamMuxer for Connection { /// abruptly interrupt the execution. fn destroy_outbound(&self, _s: Self::OutboundSubstream) {} - fn read_substream( - &self, - cx: &mut Context<'_>, - s: &mut Self::Substream, - buf: &mut [u8], - ) -> Poll> { - Pin::new(s).poll_read(cx, buf) - } - - fn write_substream( - &self, - cx: &mut Context<'_>, - s: &mut Self::Substream, - buf: &[u8], - ) -> Poll> { - Pin::new(s).poll_write(cx, buf) - } - - fn flush_substream( - &self, - cx: &mut Context<'_>, - s: &mut Self::Substream, - ) -> Poll> { - trace!("Flushing substream {}", s.stream_identifier()); - Pin::new(s).poll_flush(cx) - } - - fn shutdown_substream( - &self, - cx: &mut Context<'_>, - s: &mut Self::Substream, - ) -> Poll> { - trace!("Closing substream {}", s.stream_identifier()); - Pin::new(s).poll_close(cx) - } - - fn destroy_substream(&self, s: Self::Substream) { - trace!("Destroying substream {}", s.stream_identifier()); - let mut data_channels_inner = self.data_channels_inner.lock().unwrap(); - data_channels_inner.map.remove(&s.stream_identifier()); - } + // fn read_substream( + // &self, + // cx: &mut Context<'_>, + // s: &mut Self::Substream, + // buf: &mut [u8], + // ) -> Poll> { + // Pin::new(s).poll_read(cx, buf) + // } + + // fn write_substream( + // &self, + // cx: &mut Context<'_>, + // s: &mut Self::Substream, + // buf: &[u8], + // ) -> Poll> { + // Pin::new(s).poll_write(cx, buf) + // } + + // fn flush_substream( + // &self, + // cx: &mut Context<'_>, + // s: &mut Self::Substream, + // ) -> Poll> { + // trace!("Flushing substream {}", s.stream_identifier()); + // Pin::new(s).poll_flush(cx) + // } + + // fn shutdown_substream( + // &self, + // cx: &mut Context<'_>, + // s: &mut Self::Substream, + // ) -> Poll> { + // trace!("Closing substream {}", s.stream_identifier()); + // Pin::new(s).poll_close(cx) + // } + + // fn destroy_substream(&self, s: Self::Substream) { + // trace!("Destroying substream {}", s.stream_identifier()); + // let mut data_channels_inner = self.data_channels_inner.lock().unwrap(); + // data_channels_inner.map.remove(&s.stream_identifier()); + // } fn poll_close(&self, cx: &mut Context<'_>) -> Poll> { debug!("Closing connection"); @@ -274,20 +273,20 @@ impl<'a> StreamMuxer for Connection { let mut data_channels_inner = self.data_channels_inner.lock().unwrap(); // First, flush all the buffered data. - for (_, ch) in &mut data_channels_inner.map { - match ready!(self.flush_substream(cx, ch)) { - Ok(_) => continue, - Err(e) => return Poll::Ready(Err(e)), - } - } + // for (_, ch) in &mut data_channels_inner.map { + // match ready!(self.flush_substream(cx, ch)) { + // Ok(_) => continue, + // Err(e) => return Poll::Ready(Err(e)), + // } + // } // Second, shutdown all the substreams. - for (_, ch) in &mut data_channels_inner.map { - match ready!(self.shutdown_substream(cx, ch)) { - Ok(_) => continue, - Err(e) => return Poll::Ready(Err(e)), - } - } + // for (_, ch) in &mut data_channels_inner.map { + // match ready!(self.shutdown_substream(cx, ch)) { + // Ok(_) => continue, + // Err(e) => return Poll::Ready(Err(e)), + // } + // } // Third, close `incoming_data_channels_rx` data_channels_inner.incoming_data_channels_rx.close(); diff --git a/transports/webrtc/src/error.rs b/transports/webrtc/src/error.rs index 7a73da660bb..8b62279a12c 100644 --- a/transports/webrtc/src/error.rs +++ b/transports/webrtc/src/error.rs @@ -42,6 +42,9 @@ pub enum Error { got: PeerId, }, + #[error("no active listeners")] + NoListeners, + #[error("internal error: {0} (see debug logs)")] InternalError(String), } diff --git a/transports/webrtc/src/transport.rs b/transports/webrtc/src/transport.rs index 3a2d7fa84b2..f521743ef8c 100644 --- a/transports/webrtc/src/transport.rs +++ b/transports/webrtc/src/transport.rs @@ -32,14 +32,14 @@ use libp2p_core::identity; use libp2p_core::{ multiaddr::{Multiaddr, Protocol}, muxing::StreamMuxerBox, - transport::{Boxed, ListenerEvent, TransportError}, + transport::{Boxed, ListenerId, TransportError, TransportEvent}, PeerId, Transport, }; use libp2p_core::{OutboundUpgrade, UpgradeInfo}; use libp2p_noise::{Keypair, NoiseConfig, NoiseError, RemoteIdentity, X25519Spec}; use log::{debug, trace}; use tinytemplate::TinyTemplate; -use tokio_crate::net::{ToSocketAddrs, UdpSocket}; +use tokio_crate::net::UdpSocket; use webrtc::api::setting_engine::SettingEngine; use webrtc::api::APIBuilder; use webrtc::data_channel::data_channel_init::RTCDataChannelInit; @@ -52,19 +52,19 @@ use webrtc_ice::udp_mux::UDPMux; use webrtc_ice::udp_network::UDPNetwork; use std::borrow::Cow; +use std::collections::VecDeque; use std::io; use std::net::IpAddr; use std::net::SocketAddr; use std::pin::Pin; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use std::task::{Context, Poll}; use std::time::Duration; -use crate::error::Error; -use crate::sdp; - use crate::connection::Connection; use crate::connection::PollDataChannel; +use crate::error::Error; +use crate::sdp; use crate::udp_mux::UDPMuxNewAddr; use crate::udp_mux::UDPMuxParams; use crate::upgrade; @@ -83,56 +83,31 @@ enum InAddr { } /// A WebRTC transport with direct p2p communication (without a STUN server). -#[derive(Clone)] pub struct WebRTCTransport { /// A `RTCConfiguration` which holds this peer's certificate(s). config: RTCConfiguration, - - /// The `UDPMux` that manages all ICE connections. - udp_mux: Arc, - - /// The local address of `udp_mux`. - udp_mux_addr: SocketAddr, - - /// The receiver for new `SocketAddr` connecting to this peer. - new_addr_rx: Arc>>, - /// `Keypair` identifying this peer id_keys: identity::Keypair, + /// All the active listeners. + /// The `WebRTCListenStream` struct contains a stream that we want to be pinned. Since the `VecDeque` + /// can be resized, the only way is to use a `Pin>`. + listeners: VecDeque>>, + /// Pending transport events to return from [`WebRTCTransport::poll`]. + pending_events: VecDeque::ListenerUpgrade, Error>>, } impl WebRTCTransport { /// Create a new WebRTC transport. - /// - /// Creates a UDP socket bound to `listen_addr`. - pub async fn new( - certificate: RTCCertificate, - id_keys: identity::Keypair, - listen_addr: A, - ) -> Result> { - // Bind to `listen_addr` and construct a UDP mux. - let socket = UdpSocket::bind(listen_addr) - .map_err(Error::IoError) - .map_err(TransportError::Other) - .await?; - // Sender and receiver for new addresses - let (new_addr_tx, new_addr_rx) = mpsc::channel(1); - let udp_mux_addr = socket - .local_addr() - .map_err(Error::IoError) - .map_err(TransportError::Other)?; - let udp_mux = UDPMuxNewAddr::new(UDPMuxParams::new(socket), new_addr_tx); - - Ok(Self { + pub fn new(certificate: RTCCertificate, id_keys: identity::Keypair) -> Self { + Self { config: RTCConfiguration { certificates: vec![certificate], ..RTCConfiguration::default() }, - udp_mux, - udp_mux_addr, - new_addr_rx: Arc::new(Mutex::new(new_addr_rx)), id_keys, - }) + listeners: VecDeque::new(), + pending_events: VecDeque::new(), + } } /// Returns the SHA-256 fingerprint of the certificate in lowercase hex string as expressed @@ -148,59 +123,330 @@ impl WebRTCTransport { }) .boxed() } + + fn do_listen( + &self, + listener_id: ListenerId, + addr: Multiaddr, + ) -> Result> { + let sock_addr = multiaddr_to_socketaddr(&addr) + .ok_or_else(|| TransportError::MultiaddrNotSupported(addr))?; + + let std_sock = std::net::UdpSocket::bind(sock_addr) + .map_err(Error::IoError) + .map_err(TransportError::Other)?; + std_sock + .set_nonblocking(true) + .map_err(Error::IoError) + .map_err(TransportError::Other)?; + let socket = UdpSocket::from_std(std_sock) + .map_err(Error::IoError) + .map_err(TransportError::Other)?; + + let listen_addr = socket + .local_addr() + .map_err(Error::IoError) + .map_err(TransportError::Other)?; + + debug!("listening on {}", listen_addr); + + // Sender and receiver for new addresses + let (new_addr_tx, new_addr_rx) = mpsc::channel(1); + let udp_mux = UDPMuxNewAddr::new(UDPMuxParams::new(socket), new_addr_tx); + + Ok(WebRTCListenStream::new( + listener_id, + listen_addr, + self.config.clone(), + udp_mux, + new_addr_rx, + self.id_keys.clone(), + )) + } } impl Transport for WebRTCTransport { type Output = (PeerId, Connection); type Error = Error; - type Listener = WebRTCListenStream; type ListenerUpgrade = BoxFuture<'static, Result>; type Dial = BoxFuture<'static, Result>; - fn listen_on( - &mut self, - addr: Multiaddr, - ) -> Result> { - debug!("listening on {} (ignoring {})", self.udp_mux_addr, addr); - Ok(WebRTCListenStream::new( - self.udp_mux_addr, - self.config.clone(), - self.udp_mux.clone(), - self.new_addr_rx.clone(), - self.id_keys.clone(), - )) + fn listen_on(&mut self, addr: Multiaddr) -> Result> { + let id = ListenerId::new(); + let listener = self.do_listen(id, addr)?; + self.listeners.push_back(Box::pin(listener)); + Ok(id) + } + + fn remove_listener(&mut self, id: ListenerId) -> bool { + if let Some(index) = self.listeners.iter().position(|l| l.listener_id != id) { + self.listeners.remove(index); + self.pending_events + .push_back(TransportEvent::ListenerClosed { + listener_id: id, + reason: Ok(()), + }); + true + } else { + false + } + } + + /// Poll all listeners. + fn poll( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + // Return pending events from closed listeners. + if let Some(event) = self.pending_events.pop_front() { + return Poll::Ready(event); + } + // We remove each element from `listeners` one by one and add them back. + let mut remaining = self.listeners.len(); + while let Some(mut listener) = self.listeners.pop_back() { + match TryStream::try_poll_next(listener.as_mut(), cx) { + Poll::Pending => { + self.listeners.push_front(listener); + remaining -= 1; + if remaining == 0 { + break; + } + } + Poll::Ready(Some(Ok(WebRTCListenerEvent::Upgrade { + upgrade, + local_addr, + remote_addr, + }))) => { + let id = listener.listener_id; + self.listeners.push_front(listener); + return Poll::Ready(TransportEvent::Incoming { + listener_id: id, + upgrade, + local_addr, + send_back_addr: remote_addr, + }); + } + Poll::Ready(Some(Ok(WebRTCListenerEvent::NewAddress(a)))) => { + let id = listener.listener_id; + self.listeners.push_front(listener); + return Poll::Ready(TransportEvent::NewAddress { + listener_id: id, + listen_addr: a, + }); + } + Poll::Ready(Some(Ok(WebRTCListenerEvent::AddressExpired(a)))) => { + let id = listener.listener_id; + self.listeners.push_front(listener); + return Poll::Ready(TransportEvent::AddressExpired { + listener_id: id, + listen_addr: a, + }); + } + Poll::Ready(Some(Ok(WebRTCListenerEvent::Error(error)))) => { + let id = listener.listener_id; + self.listeners.push_front(listener); + return Poll::Ready(TransportEvent::ListenerError { + listener_id: id, + error: error.into(), + }); + } + Poll::Ready(None) => { + return Poll::Ready(TransportEvent::ListenerClosed { + listener_id: listener.listener_id, + reason: Ok(()), + }); + } + Poll::Ready(Some(Err(err))) => { + return Poll::Ready(TransportEvent::ListenerClosed { + listener_id: listener.listener_id, + reason: Err(err), + }); + } + } + } + Poll::Pending } fn dial(&mut self, addr: Multiaddr) -> Result> { - let config = self.config.clone(); - let our_fingerprint = self.cert_fingerprint(); - let udp_mux = self.udp_mux.clone(); - let id_keys = self.id_keys.clone(); - Ok(Box::pin(do_dial( - config, - our_fingerprint, - udp_mux, - id_keys, - addr, - ))) + self.dial_as_listener(addr) } fn dial_as_listener( &mut self, addr: Multiaddr, ) -> Result> { - // XXX: anything to do here? - self.dial(addr) + let config = self.config.clone(); + let our_fingerprint = self.cert_fingerprint(); + let id_keys = self.id_keys.clone(); + let udp_mux = if let Some(l) = self.listeners.back() { + l.udp_mux.clone() + } else { + return Err(TransportError::Other(Error::NoListeners)); + }; + + let sock_addr = multiaddr_to_socketaddr(&addr) + .ok_or_else(|| TransportError::MultiaddrNotSupported(addr.clone()))?; + if sock_addr.port() == 0 || sock_addr.ip().is_unspecified() { + return Err(TransportError::MultiaddrNotSupported(addr)); + } + + let remote = addr.clone(); // used for logging + trace!("dialing address: {:?}", remote); + + let se = build_setting_engine(udp_mux.clone(), &sock_addr, &our_fingerprint); + let api = APIBuilder::new().with_setting_engine(se).build(); + + // [`Transport::dial`] should do no work unless the returned [`Future`] is polled. Thus + // do the `set_remote_description` call within the [`Future`]. + Ok(async move { + let peer_connection = api + .new_peer_connection(config) + .map_err(Error::WebRTC) + .await?; + + let offer = peer_connection + .create_offer(None) + .map_err(Error::WebRTC) + .await?; + debug!("OFFER: {:?}", offer.sdp); + peer_connection + .set_local_description(offer) + .map_err(Error::WebRTC) + .await?; + + // Set the remote description to the predefined SDP. + let remote_fingerprint = match fingerprint_from_addr(&addr) { + Some(f) => fingerprint_to_string(&f), + None => return Err(Error::InvalidMultiaddr(addr.clone())), + }; + let server_session_description = render_description( + sdp::SERVER_SESSION_DESCRIPTION, + sock_addr, + &remote_fingerprint, + ); + debug!("ANSWER: {:?}", server_session_description); + let sdp = RTCSessionDescription::answer(server_session_description).unwrap(); + // Set the local description and start UDP listeners + // Note: this will start the gathering of ICE candidates + peer_connection + .set_remote_description(sdp) + .map_err(Error::WebRTC) + .await?; + + // Create a datachannel with label 'data' + let data_channel = peer_connection + .create_data_channel( + "data", + Some(RTCDataChannelInit { + id: Some(1), + ..RTCDataChannelInit::default() + }), + ) + .await?; + + let (tx, mut rx) = oneshot::channel::>(); + + // Wait until the data channel is opened and detach it. + crate::connection::register_data_channel_open_handler(data_channel, tx).await; + + // Wait until data channel is opened and ready to use + let detached = select! { + res = rx => match res { + Ok(detached) => detached, + Err(e) => return Err(Error::InternalError(e.to_string())), + }, + _ = Delay::new(Duration::from_secs(10)).fuse() => return Err(Error::InternalError( + "data channel opening took longer than 10 seconds (see logs)".into(), + )) + }; + + trace!("noise handshake with {}", remote); + let dh_keys = Keypair::::new() + .into_authentic(&id_keys) + .unwrap(); + let noise = NoiseConfig::xx(dh_keys); + let info = noise.protocol_info().next().unwrap(); + let (peer_id, mut noise_io) = noise + .upgrade_outbound(PollDataChannel::new(detached), info) + .and_then(|(remote, io)| match remote { + RemoteIdentity::IdentityKey(pk) => future::ok((pk.to_peer_id(), io)), + _ => future::err(NoiseError::AuthenticationFailed), + }) + .await + .map_err(Error::Noise)?; + + // Exchange TLS certificate fingerprints to prevent MiM attacks. + trace!("exchanging TLS certificate fingerprints with {}", remote); + let n = noise_io.write(&our_fingerprint.into_bytes()).await?; + noise_io.flush().await?; + let mut buf = vec![0; n]; // ASSERT: fingerprint's format is the same. + noise_io.read_exact(buf.as_mut_slice()).await?; + let fingerprint_from_noise = String::from_utf8(buf) + .map_err(|_| Error::Noise(NoiseError::AuthenticationFailed))?; + if fingerprint_from_noise != remote_fingerprint { + return Err(Error::InvalidFingerprint { + expected: remote_fingerprint, + got: fingerprint_from_noise, + }); + } + + trace!("verifying peer's identity {}", remote); + let peer_id_from_addr = PeerId::try_from_multiaddr(&addr); + if peer_id_from_addr.is_none() || peer_id_from_addr.unwrap() != peer_id { + return Err(Error::InvalidPeerID { + expected: peer_id_from_addr, + got: peer_id, + }); + } + + // Close the initial data channel after noise handshake is done. + // https://github.com/webrtc-rs/sctp/pull/14 + // detached + // .close() + // .await + // .map_err(|e| Error::WebRTC(e.into()))?; + + let mut c = Connection::new(peer_connection).await; + // XXX: default buffer size is too small to fit some messages. Possibly remove once + // https://github.com/webrtc-rs/sctp/issues/28 is fixed. + c.set_data_channels_read_buf_capacity(8192 * 10); + Ok((peer_id, c)) + } + .boxed()) } fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { - // XXX: anything to do here? libp2p_core::address_translation(server, observed) } } +/// Event produced by a [`WebRTCListenStream`]. +#[derive(Debug)] +pub enum WebRTCListenerEvent { + /// The listener is listening on a new additional [`Multiaddr`]. + NewAddress(Multiaddr), + /// An upgrade, consisting of the upgrade future, the listener address and the remote address. + Upgrade { + /// The upgrade. + upgrade: S, + /// The local address which produced this upgrade. + local_addr: Multiaddr, + /// The remote address which produced this upgrade. + remote_addr: Multiaddr, + }, + /// A [`Multiaddr`] is no longer used for listening. + AddressExpired(Multiaddr), + /// A non-fatal error has happened on the listener. + /// + /// This event should be generated in order to notify the user that something wrong has + /// happened. The listener, however, continues to run. + Error(Error), +} + /// A stream of incoming connections on one or more interfaces. pub struct WebRTCListenStream { + /// The ID of this listener. + listener_id: ListenerId, /// The socket address that the listening socket is bound to, /// which may be a "wildcard address" like `INADDR_ANY` or `IN6ADDR_ANY` /// when listening on all interfaces for IPv4 respectively IPv6 connections. @@ -216,25 +462,24 @@ pub struct WebRTCListenStream { sleep_on_error: Duration, /// The current pause, if any. pause: Option, - /// A `RTCConfiguration` which holds this peer's certificate(s). config: RTCConfiguration, /// The `UDPMux` that manages all ICE connections. udp_mux: Arc, /// The receiver for new `SocketAddr` connecting to this peer. - new_addr_rx: Arc>>, + new_addr_rx: mpsc::Receiver, /// `Keypair` identifying this peer id_keys: identity::Keypair, } impl WebRTCListenStream { - /// Constructs a `WebRTCListenStream` for incoming connections around - /// the given `TcpListener`. + /// Constructs a `WebRTCListenStream` for incoming connections. fn new( + listener_id: ListenerId, listen_addr: SocketAddr, config: RTCConfiguration, udp_mux: Arc, - new_addr_rx: Arc>>, + new_addr_rx: mpsc::Receiver, id_keys: identity::Keypair, ) -> Self { // Check whether the listening IP is set or not. @@ -254,6 +499,7 @@ impl WebRTCListenStream { }; WebRTCListenStream { + listener_id, listen_addr, in_addr, pause: None, @@ -267,10 +513,8 @@ impl WebRTCListenStream { } impl Stream for WebRTCListenStream { - type Item = Result< - ListenerEvent>, Error>, - Error, - >; + type Item = + Result>>, Error>; fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { let me = Pin::into_inner(self); @@ -291,9 +535,9 @@ impl Stream for WebRTCListenStream { }; *if_watch = IfWatch::Pending(IfWatcher::new().boxed()); me.pause = Some(Delay::new(me.sleep_on_error)); - return Poll::Ready(Some(Ok(ListenerEvent::Error(Error::IoError( - err, - ))))); + return Poll::Ready(Some(Ok(WebRTCListenerEvent::Error( + Error::IoError(err), + )))); } }, // Consume all events for up/down interface changes. @@ -307,9 +551,9 @@ impl Stream for WebRTCListenStream { { let ma = ip_to_multiaddr(ip, me.listen_addr.port()); debug!("New listen address: {}", ma); - return Poll::Ready(Some(Ok(ListenerEvent::NewAddress( - ma, - )))); + return Poll::Ready(Some(Ok( + WebRTCListenerEvent::NewAddress(ma), + ))); } } Ok(IfEvent::Down(inet)) => { @@ -320,7 +564,7 @@ impl Stream for WebRTCListenStream { let ma = ip_to_multiaddr(ip, me.listen_addr.port()); debug!("Expired listen address: {}", ma); return Poll::Ready(Some(Ok( - ListenerEvent::AddressExpired(ma), + WebRTCListenerEvent::AddressExpired(ma), ))); } } @@ -330,7 +574,7 @@ impl Stream for WebRTCListenStream { err }; me.pause = Some(Delay::new(me.sleep_on_error)); - return Poll::Ready(Some(Ok(ListenerEvent::Error( + return Poll::Ready(Some(Ok(WebRTCListenerEvent::Error( Error::IoError(err), )))); } @@ -342,7 +586,7 @@ impl Stream for WebRTCListenStream { // once. InAddr::One { out } => { if let Some(multiaddr) = out.take() { - return Poll::Ready(Some(Ok(ListenerEvent::NewAddress(multiaddr)))); + return Poll::Ready(Some(Ok(WebRTCListenerEvent::NewAddress(multiaddr)))); } } } @@ -358,8 +602,8 @@ impl Stream for WebRTCListenStream { } // Safe to unwrap here since this is the only place `new_addr_rx` is locked. - return match Pin::new(&mut *me.new_addr_rx.lock().unwrap()).poll_next(cx) { - Poll::Ready(Some(addr)) => Poll::Ready(Some(Ok(ListenerEvent::Upgrade { + return match Pin::new(&mut me.new_addr_rx).poll_next(cx) { + Poll::Ready(Some(addr)) => Poll::Ready(Some(Ok(WebRTCListenerEvent::Upgrade { local_addr: ip_to_multiaddr(me.listen_addr.ip(), me.listen_addr.port()), remote_addr: addr.clone(), upgrade: Box::pin(upgrade::webrtc( @@ -376,139 +620,6 @@ impl Stream for WebRTCListenStream { } } -async fn do_dial( - config: RTCConfiguration, - our_fingerprint: String, - udp_mux: Arc, - id_keys: identity::Keypair, - addr: Multiaddr, -) -> Result<(PeerId, Connection), Error> { - let socket_addr = - multiaddr_to_socketaddr(&addr).ok_or_else(|| Error::InvalidMultiaddr(addr.clone()))?; - if socket_addr.port() == 0 || socket_addr.ip().is_unspecified() { - return Err(Error::InvalidMultiaddr(addr.clone())); - } - - let remote = addr.clone(); // used for logging - trace!("dialing address: {:?}", remote); - - let se = build_setting_engine(udp_mux.clone(), &socket_addr, &our_fingerprint); - let api = APIBuilder::new().with_setting_engine(se).build(); - - let peer_connection = api - .new_peer_connection(config) - .map_err(Error::WebRTC) - .await?; - - let offer = peer_connection - .create_offer(None) - .map_err(Error::WebRTC) - .await?; - debug!("OFFER: {:?}", offer.sdp); - peer_connection - .set_local_description(offer) - .map_err(Error::WebRTC) - .await?; - - // Set the remote description to the predefined SDP. - let remote_fingerprint = match fingerprint_from_addr(&addr) { - Some(f) => fingerprint_to_string(&f), - None => return Err(Error::InvalidMultiaddr(addr.clone())), - }; - let server_session_description = render_description( - sdp::SERVER_SESSION_DESCRIPTION, - socket_addr, - &remote_fingerprint, - ); - debug!("ANSWER: {:?}", server_session_description); - let sdp = RTCSessionDescription::answer(server_session_description).unwrap(); - // Set the local description and start UDP listeners - // Note: this will start the gathering of ICE candidates - peer_connection - .set_remote_description(sdp) - .map_err(Error::WebRTC) - .await?; - - // Create a datachannel with label 'data' - let data_channel = peer_connection - .create_data_channel( - "data", - Some(RTCDataChannelInit { - id: Some(1), - ..RTCDataChannelInit::default() - }), - ) - .await?; - - let (tx, mut rx) = oneshot::channel::>(); - - // Wait until the data channel is opened and detach it. - crate::connection::register_data_channel_open_handler(data_channel, tx).await; - - // Wait until data channel is opened and ready to use - let detached = select! { - res = rx => match res { - Ok(detached) => detached, - Err(e) => return Err(Error::InternalError(e.to_string())), - }, - _ = Delay::new(Duration::from_secs(10)).fuse() => return Err(Error::InternalError( - "data channel opening took longer than 10 seconds (see logs)".into(), - )) - }; - - trace!("noise handshake with {}", remote); - let dh_keys = Keypair::::new() - .into_authentic(&id_keys) - .unwrap(); - let noise = NoiseConfig::xx(dh_keys); - let info = noise.protocol_info().next().unwrap(); - let (peer_id, mut noise_io) = noise - .upgrade_outbound(PollDataChannel::new(detached), info) - .and_then(|(remote, io)| match remote { - RemoteIdentity::IdentityKey(pk) => future::ok((pk.to_peer_id(), io)), - _ => future::err(NoiseError::AuthenticationFailed), - }) - .await - .map_err(Error::Noise)?; - - // Exchange TLS certificate fingerprints to prevent MiM attacks. - trace!("exchanging TLS certificate fingerprints with {}", remote); - let n = noise_io.write(&our_fingerprint.into_bytes()).await?; - noise_io.flush().await?; - let mut buf = vec![0; n]; // ASSERT: fingerprint's format is the same. - noise_io.read_exact(buf.as_mut_slice()).await?; - let fingerprint_from_noise = - String::from_utf8(buf).map_err(|_| Error::Noise(NoiseError::AuthenticationFailed))?; - if fingerprint_from_noise != remote_fingerprint { - return Err(Error::InvalidFingerprint { - expected: remote_fingerprint, - got: fingerprint_from_noise, - }); - } - - trace!("verifying peer's identity {}", remote); - let peer_id_from_addr = PeerId::try_from_multiaddr(&addr); - if peer_id_from_addr.is_none() || peer_id_from_addr.unwrap() != peer_id { - return Err(Error::InvalidPeerID { - expected: peer_id_from_addr, - got: peer_id, - }); - } - - // Close the initial data channel after noise handshake is done. - // https://github.com/webrtc-rs/sctp/pull/14 - // detached - // .close() - // .await - // .map_err(|e| Error::WebRTC(e.into()))?; - - let mut c = Connection::new(peer_connection).await; - // XXX: default buffer size is too small to fit some messages. Possibly remove once - // https://github.com/webrtc-rs/sctp/issues/28 is fixed. - c.set_data_channels_read_buf_capacity(8192 * 10); - Ok((peer_id, c)) -} - /// Creates a [`Multiaddr`] from the given IP address and port number. fn ip_to_multiaddr(ip: IpAddr, port: u16) -> Multiaddr { Multiaddr::empty().with(ip.into()).with(Protocol::Udp(port)) @@ -630,7 +741,7 @@ fn fingerprint_from_addr<'a>(addr: &'a Multiaddr) -> Option> { #[cfg(test)] mod tests { use super::*; - use libp2p_core::{multiaddr::Protocol, Multiaddr, Transport}; + use libp2p_core::{multiaddr::Protocol, Multiaddr}; use rcgen::KeyPair; use std::net::IpAddr; use std::net::{Ipv4Addr, Ipv6Addr}; @@ -716,37 +827,31 @@ mod tests { #[tokio::test] async fn dialer_connects_to_listener_ipv4() { let _ = env_logger::builder().is_test(true).try_init(); - let a = "127.0.0.1:0".parse().unwrap(); - connect(a).await; + let a = "/ip4/127.0.0.1/udp/0/x-webrtc/ACD1E533EC271FCDE0275947F4D62A2B2331FF10C9DDE0298EB7B399B4BFF60B".parse().unwrap(); + futures::executor::block_on(connect(a)); } #[tokio::test] async fn dialer_connects_to_listener_ipv6() { let _ = env_logger::builder().is_test(true).try_init(); - let a = "[::1]:0".parse().unwrap(); - connect(a).await; + let a = "/ip6/::1/udp/0/x-webrtc/ACD1E533EC271FCDE0275947F4D62A2B2331FF10C9DDE0298EB7B399B4BFF60B".parse().unwrap(); + futures::executor::block_on(connect(a)); } - async fn connect(listen_addr: SocketAddr) { + async fn connect(listen_addr: Multiaddr) { let id_keys = identity::Keypair::generate_ed25519(); let t1_peer_id = PeerId::from_public_key(&id_keys.public()); - let transport = { + let mut transport = { let kp = KeyPair::generate(&rcgen::PKCS_ECDSA_P256_SHA256).expect("key pair"); let cert = RTCCertificate::from_key_pair(kp).expect("certificate"); - WebRTCTransport::new(cert, id_keys, listen_addr) - .await - .expect("transport") + WebRTCTransport::new(cert, id_keys).boxed() }; - let mut listener = transport - .clone() - .listen_on(ip_to_multiaddr(listen_addr.ip(), listen_addr.port())) - .expect("listener"); + transport.listen_on(listen_addr.clone()).expect("listener"); - let addr = listener - .try_next() + let addr = transport + .next() .await - .expect("some event") .expect("no error") .into_new_address() .expect("listen address"); @@ -754,29 +859,29 @@ mod tests { assert_ne!(Some(Protocol::Udp(0)), addr.iter().nth(1)); let inbound = async move { - let (conn, _addr) = listener - .try_filter_map(|e| future::ready(Ok(e.into_upgrade()))) - .try_next() + let (conn, _addr) = transport + .select_next_some() + .map(|ev| ev.into_incoming()) .await - .unwrap() .unwrap(); conn.await }; - let mut transport2 = { + let (mut transport2, f) = { let kp = KeyPair::generate(&rcgen::PKCS_ECDSA_P256_SHA256).expect("key pair"); let id_keys = identity::Keypair::generate_ed25519(); let cert = RTCCertificate::from_key_pair(kp).expect("certificate"); - // okay to reuse `listen_addr` since the port is `0` (any). - WebRTCTransport::new(cert, id_keys, listen_addr) - .await - .expect("transport") + let t = WebRTCTransport::new(cert, id_keys); + // TODO: make code cleaner wrt ":" + let f = t.cert_fingerprint().replace(':', ""); + (t.boxed(), f) }; - // TODO: make code cleaner wrt ":" - let f = &transport.cert_fingerprint().replace(':', ""); + + transport2.listen_on(listen_addr).expect("listener"); + let outbound = transport2 .dial( - addr.with(Protocol::XWebRTC(hex_to_cow(f))) + addr.with(Protocol::XWebRTC(hex_to_cow(&f))) .with(Protocol::P2p(t1_peer_id.into())), ) .unwrap(); diff --git a/transports/webrtc/tests/smoke.rs b/transports/webrtc/tests/smoke.rs index ab1f1914abc..3cdcbabd140 100644 --- a/transports/webrtc/tests/smoke.rs +++ b/transports/webrtc/tests/smoke.rs @@ -34,7 +34,7 @@ async fn create_swarm() -> Result<(Swarm>, String)> { let cert = generate_certificate(); let keypair = generate_tls_keypair(); let peer_id = keypair.public().to_peer_id(); - let transport = WebRTCTransport::new(cert, keypair, "127.0.0.1:0").await?; + let transport = WebRTCTransport::new(cert, keypair); let fingerprint = transport.cert_fingerprint(); let protocols = iter::once((PingProtocol(), ProtocolSupport::Full)); let cfg = RequestResponseConfig::default(); @@ -59,7 +59,7 @@ async fn smoke() -> Result<()> { let (mut a, a_fingerprint) = create_swarm().await?; let (mut b, _b_fingerprint) = create_swarm().await?; - Swarm::listen_on(&mut a, "/ip4/127.0.0.1/udp/0".parse()?)?; + Swarm::listen_on(&mut a, "/ip4/127.0.0.1/udp/0/x-webrtc/ACD1E533EC271FCDE0275947F4D62A2B2331FF10C9DDE0298EB7B399B4BFF60B".parse()?)?; let addr = match a.next().await { Some(SwarmEvent::NewListenAddr { address, .. }) => address,