diff --git a/Cargo.toml b/Cargo.toml index 16c4585fc74..af0508975d6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,6 +36,7 @@ default = [ "yamux", ] autonat = ["libp2p-autonat"] +dcutr = ["libp2p-dcutr", "libp2p-metrics/dcutr"] deflate = ["libp2p-deflate"] dns-async-std = ["libp2p-dns", "libp2p-dns/async-std"] dns-tokio = ["libp2p-dns", "libp2p-dns/tokio"] @@ -78,6 +79,7 @@ lazy_static = "1.2" libp2p-autonat = { version = "0.2.0", path = "protocols/autonat", optional = true } libp2p-core = { version = "0.32.0", path = "core", default-features = false } +libp2p-dcutr = { version = "0.1.0", path = "protocols/dcutr", optional = true } libp2p-floodsub = { version = "0.34.0", path = "protocols/floodsub", optional = true } libp2p-gossipsub = { version = "0.36.0", path = "./protocols/gossipsub", optional = true } libp2p-identify = { version = "0.34.0", path = "protocols/identify", optional = true } @@ -124,6 +126,7 @@ members = [ "misc/peer-id-generator", "muxers/mplex", "muxers/yamux", + "protocols/dcutr", "protocols/autonat", "protocols/floodsub", "protocols/gossipsub", diff --git a/misc/metrics/Cargo.toml b/misc/metrics/Cargo.toml index 5310bc2eceb..487ab9dd531 100644 --- a/misc/metrics/Cargo.toml +++ b/misc/metrics/Cargo.toml @@ -16,9 +16,11 @@ identify = ["libp2p-identify"] kad = ["libp2p-kad"] ping = ["libp2p-ping"] relay = ["libp2p-relay"] +dcutr = ["libp2p-dcutr"] [dependencies] libp2p-core = { version = "0.32.0", path = "../../core", default-features = false } +libp2p-dcutr = { version = "0.1.0", path = "../../protocols/dcutr", optional = true } libp2p-gossipsub = { version = "0.36.0", path = "../../protocols/gossipsub", optional = true } libp2p-identify = { version = "0.34.0", path = "../../protocols/identify", optional = true } libp2p-kad = { version = "0.35.0", path = "../../protocols/kad", optional = true } diff --git a/misc/metrics/src/dcutr.rs b/misc/metrics/src/dcutr.rs new file mode 100644 index 00000000000..82b624a2f37 --- /dev/null +++ b/misc/metrics/src/dcutr.rs @@ -0,0 +1,89 @@ +// Copyright 2021 Protocol Labs. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use prometheus_client::encoding::text::Encode; +use prometheus_client::metrics::counter::Counter; +use prometheus_client::metrics::family::Family; +use prometheus_client::registry::Registry; + +pub struct Metrics { + events: Family, +} + +impl Metrics { + pub fn new(registry: &mut Registry) -> Self { + let sub_registry = registry.sub_registry_with_prefix("relay"); + + let events = Family::default(); + sub_registry.register( + "events", + "Events emitted by the relay NetworkBehaviour", + Box::new(events.clone()), + ); + + Self { events } + } +} + +#[derive(Debug, Clone, Hash, PartialEq, Eq, Encode)] +struct EventLabels { + event: EventType, +} + +#[derive(Debug, Clone, Hash, PartialEq, Eq, Encode)] +enum EventType { + InitiateDirectConnectionUpgrade, + RemoteInitiatedDirectConnectionUpgrade, + DirectConnectionUpgradeSucceeded, + DirectConnectionUpgradeFailed, +} + +impl From<&libp2p_dcutr::behaviour::Event> for EventType { + fn from(event: &libp2p_dcutr::behaviour::Event) -> Self { + match event { + libp2p_dcutr::behaviour::Event::InitiatedDirectConnectionUpgrade { + remote_peer_id: _, + local_relayed_addr: _, + } => EventType::InitiateDirectConnectionUpgrade, + libp2p_dcutr::behaviour::Event::RemoteInitiatedDirectConnectionUpgrade { + remote_peer_id: _, + remote_relayed_addr: _, + } => EventType::RemoteInitiatedDirectConnectionUpgrade, + libp2p_dcutr::behaviour::Event::DirectConnectionUpgradeSucceeded { + remote_peer_id: _, + } => EventType::DirectConnectionUpgradeSucceeded, + libp2p_dcutr::behaviour::Event::DirectConnectionUpgradeFailed { + remote_peer_id: _, + error: _, + } => EventType::DirectConnectionUpgradeFailed, + } + } +} + +impl super::Recorder for super::Metrics { + fn record(&self, event: &libp2p_dcutr::behaviour::Event) { + self.dcutr + .events + .get_or_create(&EventLabels { + event: event.into(), + }) + .inc(); + } +} diff --git a/misc/metrics/src/lib.rs b/misc/metrics/src/lib.rs index dfd13515dc2..6acb0c55f5a 100644 --- a/misc/metrics/src/lib.rs +++ b/misc/metrics/src/lib.rs @@ -25,6 +25,8 @@ //! //! See `examples` directory for more. +#[cfg(feature = "dcutr")] +mod dcutr; #[cfg(feature = "gossipsub")] mod gossipsub; #[cfg(feature = "identify")] @@ -41,6 +43,8 @@ use prometheus_client::registry::Registry; /// Set of Swarm and protocol metrics derived from emitted events. pub struct Metrics { + #[cfg(feature = "dcutr")] + dcutr: dcutr::Metrics, #[cfg(feature = "gossipsub")] gossipsub: gossipsub::Metrics, #[cfg(feature = "identify")] @@ -66,6 +70,8 @@ impl Metrics { pub fn new(registry: &mut Registry) -> Self { let sub_registry = registry.sub_registry_with_prefix("libp2p"); Self { + #[cfg(feature = "dcutr")] + dcutr: dcutr::Metrics::new(sub_registry), #[cfg(feature = "gossipsub")] gossipsub: gossipsub::Metrics::new(sub_registry), #[cfg(feature = "identify")] diff --git a/protocols/dcutr/Cargo.toml b/protocols/dcutr/Cargo.toml new file mode 100644 index 00000000000..e32c2114614 --- /dev/null +++ b/protocols/dcutr/Cargo.toml @@ -0,0 +1,39 @@ +[package] +name = "libp2p-dcutr" +edition = "2021" +rust-version = "1.56.1" +description = "Direct connection upgrade through relay" +version = "0.1.0" +authors = ["Max Inden "] +license = "MIT" +repository = "https://github.com/libp2p/rust-libp2p" +keywords = ["peer-to-peer", "libp2p", "networking"] +categories = ["network-programming", "asynchronous"] + +[dependencies] +asynchronous-codec = "0.6" +bytes = "1" +either = "1.6.0" +futures = "0.3.1" +futures-timer = "3.0" +instant = "0.1.11" +libp2p-core = { version = "0.32", path = "../../core" } +libp2p-swarm = { version = "0.34", path = "../../swarm" } +log = "0.4" +prost = "0.7" +thiserror = "1.0" +unsigned-varint = { version = "0.7", features = ["asynchronous_codec"] } +void = "1" + +[build-dependencies] +prost-build = "0.7" + +[dev-dependencies] +env_logger = "0.8.3" +libp2p = { path = "../..", features = ["dcutr"] } +libp2p-identify = { path = "../identify" } +libp2p-plaintext = { path = "../../transports/plaintext" } +libp2p-relay = { path = "../relay" } +libp2p-yamux = { path = "../../muxers/yamux" } +rand = "0.7" +structopt = "0.3.21" \ No newline at end of file diff --git a/protocols/dcutr/build.rs b/protocols/dcutr/build.rs new file mode 100644 index 00000000000..b159bb4c817 --- /dev/null +++ b/protocols/dcutr/build.rs @@ -0,0 +1,23 @@ +// Copyright 2021 Protocol Labs. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +fn main() { + prost_build::compile_protos(&["src/message.proto"], &["src"]).unwrap(); +} diff --git a/protocols/dcutr/examples/client.rs b/protocols/dcutr/examples/client.rs new file mode 100644 index 00000000000..64453f62c44 --- /dev/null +++ b/protocols/dcutr/examples/client.rs @@ -0,0 +1,270 @@ +// Copyright 2021 Protocol Labs. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use futures::executor::block_on; +use futures::future::FutureExt; +use futures::stream::StreamExt; +use libp2p::core::multiaddr::{Multiaddr, Protocol}; +use libp2p::core::transport::OrTransport; +use libp2p::core::upgrade; +use libp2p::dcutr; +use libp2p::dns::DnsConfig; +use libp2p::identify::{Identify, IdentifyConfig, IdentifyEvent, IdentifyInfo}; +use libp2p::noise; +use libp2p::ping::{Ping, PingConfig, PingEvent}; +use libp2p::relay::v2::client::{self, Client}; +use libp2p::swarm::{SwarmBuilder, SwarmEvent}; +use libp2p::tcp::TcpConfig; +use libp2p::Transport; +use libp2p::{identity, NetworkBehaviour, PeerId}; +use log::info; +use std::convert::TryInto; +use std::error::Error; +use std::net::Ipv4Addr; +use std::str::FromStr; +use structopt::StructOpt; + +#[derive(Debug, StructOpt)] +#[structopt(name = "libp2p DCUtR client")] +struct Opts { + /// The mode (client-listen, client-dial). + #[structopt(long)] + mode: Mode, + + /// Fixed value to generate deterministic peer id. + #[structopt(long)] + secret_key_seed: u8, + + /// The listening address + #[structopt(long)] + relay_address: Multiaddr, + + /// Peer ID of the remote peer to hole punch to. + #[structopt(long)] + remote_peer_id: Option, +} + +#[derive(Debug, StructOpt, PartialEq)] +enum Mode { + Dial, + Listen, +} + +impl FromStr for Mode { + type Err = String; + fn from_str(mode: &str) -> Result { + match mode { + "dial" => Ok(Mode::Dial), + "listen" => Ok(Mode::Listen), + _ => Err("Expected either 'dial' or 'listen'".to_string()), + } + } +} + +fn main() -> Result<(), Box> { + env_logger::init(); + + let opts = Opts::from_args(); + + let local_key = generate_ed25519(opts.secret_key_seed); + let local_peer_id = PeerId::from(local_key.public()); + info!("Local peer id: {:?}", local_peer_id); + + let (relay_transport, client) = Client::new_transport_and_behaviour(local_peer_id); + + let noise_keys = noise::Keypair::::new() + .into_authentic(&local_key) + .expect("Signing libp2p-noise static DH keypair failed."); + + let transport = OrTransport::new( + relay_transport, + block_on(DnsConfig::system(TcpConfig::new().port_reuse(true))).unwrap(), + ) + .upgrade(upgrade::Version::V1) + .authenticate(noise::NoiseConfig::xx(noise_keys).into_authenticated()) + .multiplex(libp2p_yamux::YamuxConfig::default()) + .boxed(); + + #[derive(NetworkBehaviour)] + #[behaviour(out_event = "Event", event_process = false)] + struct Behaviour { + relay_client: Client, + ping: Ping, + identify: Identify, + dcutr: dcutr::behaviour::Behaviour, + } + + #[derive(Debug)] + enum Event { + Ping(PingEvent), + Identify(IdentifyEvent), + Relay(client::Event), + Dcutr(dcutr::behaviour::Event), + } + + impl From for Event { + fn from(e: PingEvent) -> Self { + Event::Ping(e) + } + } + + impl From for Event { + fn from(e: IdentifyEvent) -> Self { + Event::Identify(e) + } + } + + impl From for Event { + fn from(e: client::Event) -> Self { + Event::Relay(e) + } + } + + impl From for Event { + fn from(e: dcutr::behaviour::Event) -> Self { + Event::Dcutr(e) + } + } + + let behaviour = Behaviour { + relay_client: client, + ping: Ping::new(PingConfig::new()), + identify: Identify::new(IdentifyConfig::new( + "/TODO/0.0.1".to_string(), + local_key.public(), + )), + dcutr: dcutr::behaviour::Behaviour::new(), + }; + + let mut swarm = SwarmBuilder::new(transport, behaviour, local_peer_id) + .dial_concurrency_factor(10_u8.try_into().unwrap()) + .build(); + + swarm + .listen_on( + Multiaddr::empty() + .with("0.0.0.0".parse::().unwrap().into()) + .with(Protocol::Tcp(0)), + ) + .unwrap(); + + // Wait to listen on all interfaces. + block_on(async { + let mut delay = futures_timer::Delay::new(std::time::Duration::from_secs(1)).fuse(); + loop { + futures::select! { + event = swarm.next() => { + match event.unwrap() { + SwarmEvent::NewListenAddr { address, .. } => { + info!("Listening on {:?}", address); + } + event => panic!("{:?}", event), + } + } + _ = delay => { + // Likely listening on all interfaces now, thus continuing by breaking the loop. + break; + } + } + } + }); + + match opts.mode { + Mode::Dial => { + swarm.dial(opts.relay_address.clone()).unwrap(); + } + Mode::Listen => { + swarm + .listen_on(opts.relay_address.clone().with(Protocol::P2pCircuit)) + .unwrap(); + } + } + + // Wait till connected to relay to learn external address. + block_on(async { + loop { + match swarm.next().await.unwrap() { + SwarmEvent::NewListenAddr { .. } => {} + SwarmEvent::Dialing { .. } => {} + SwarmEvent::ConnectionEstablished { .. } => {} + SwarmEvent::Behaviour(Event::Ping(_)) => {} + SwarmEvent::Behaviour(Event::Relay(_)) => {} + SwarmEvent::Behaviour(Event::Identify(IdentifyEvent::Sent { .. })) => {} + SwarmEvent::Behaviour(Event::Identify(IdentifyEvent::Received { + info: IdentifyInfo { observed_addr, .. }, + .. + })) => { + info!("Observed address: {:?}", observed_addr); + break; + } + event => panic!("{:?}", event), + } + } + }); + + if opts.mode == Mode::Dial { + swarm + .dial( + opts.relay_address + .clone() + .with(Protocol::P2pCircuit) + .with(Protocol::P2p(opts.remote_peer_id.unwrap().into())), + ) + .unwrap(); + } + + block_on(async { + loop { + match swarm.next().await.unwrap() { + SwarmEvent::NewListenAddr { address, .. } => { + info!("Listening on {:?}", address); + } + SwarmEvent::Behaviour(Event::Relay(event)) => { + info!("{:?}", event) + } + SwarmEvent::Behaviour(Event::Dcutr(event)) => { + info!("{:?}", event) + } + SwarmEvent::Behaviour(Event::Identify(event)) => { + info!("{:?}", event) + } + SwarmEvent::Behaviour(Event::Ping(_)) => {} + SwarmEvent::ConnectionEstablished { + peer_id, endpoint, .. + } => { + info!("Established connection to {:?} via {:?}", peer_id, endpoint); + } + SwarmEvent::OutgoingConnectionError { peer_id, error } => { + info!("Outgoing connection error to {:?}: {:?}", peer_id, error); + } + _ => {} + } + } + }) +} + +fn generate_ed25519(secret_key_seed: u8) -> identity::Keypair { + let mut bytes = [0u8; 32]; + bytes[0] = secret_key_seed; + + let secret_key = identity::ed25519::SecretKey::from_bytes(&mut bytes) + .expect("this returns `Err` only if the length is wrong; the length is correct; qed"); + identity::Keypair::Ed25519(secret_key.into()) +} diff --git a/protocols/dcutr/src/behaviour.rs b/protocols/dcutr/src/behaviour.rs new file mode 100644 index 00000000000..fe8bbe42b2f --- /dev/null +++ b/protocols/dcutr/src/behaviour.rs @@ -0,0 +1,394 @@ +// Copyright 2021 Protocol Labs. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! [`NetworkBehaviour`] to act as a direct connection upgrade through relay node. + +use crate::handler; +use crate::protocol; +use either::Either; +use libp2p_core::connection::{ConnectedPoint, ConnectionId}; +use libp2p_core::multiaddr::Protocol; +use libp2p_core::{Multiaddr, PeerId}; +use libp2p_swarm::dial_opts::{self, DialOpts}; +use libp2p_swarm::{ + DialError, IntoProtocolsHandler, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, + PollParameters, ProtocolsHandler, ProtocolsHandlerUpgrErr, +}; +use std::collections::{HashMap, HashSet, VecDeque}; +use std::task::{Context, Poll}; +use thiserror::Error; + +const MAX_NUMBER_OF_UPGRADE_ATTEMPTS: u8 = 3; + +/// The events produced by the [`Behaviour`]. +#[derive(Debug)] +pub enum Event { + InitiatedDirectConnectionUpgrade { + remote_peer_id: PeerId, + local_relayed_addr: Multiaddr, + }, + RemoteInitiatedDirectConnectionUpgrade { + remote_peer_id: PeerId, + remote_relayed_addr: Multiaddr, + }, + DirectConnectionUpgradeSucceeded { + remote_peer_id: PeerId, + }, + DirectConnectionUpgradeFailed { + remote_peer_id: PeerId, + error: UpgradeError, + }, +} + +#[derive(Debug, Error)] +pub enum UpgradeError { + #[error("Failed to dial peer.")] + Dial, + #[error("Failed to establish substream: {0}.")] + Handler(ProtocolsHandlerUpgrErr), +} + +pub struct Behaviour { + /// Queue of actions to return when polled. + queued_actions: VecDeque, + + /// All direct (non-relayed) connections. + direct_connections: HashMap>, +} + +impl Behaviour { + pub fn new() -> Self { + Behaviour { + queued_actions: Default::default(), + direct_connections: Default::default(), + } + } +} + +impl NetworkBehaviour for Behaviour { + type ProtocolsHandler = handler::Prototype; + type OutEvent = Event; + + fn new_handler(&mut self) -> Self::ProtocolsHandler { + handler::Prototype::UnknownConnection + } + + fn addresses_of_peer(&mut self, _peer_id: &PeerId) -> Vec { + vec![] + } + + fn inject_connection_established( + &mut self, + peer_id: &PeerId, + connection_id: &ConnectionId, + connected_point: &ConnectedPoint, + _failed_addresses: Option<&Vec>, + ) { + if connected_point.is_relayed() { + if connected_point.is_listener() && !self.direct_connections.contains_key(peer_id) { + // TODO: Try dialing the remote peer directly. Specification: + // + // > The protocol starts with the completion of a relay connection from A to B. Upon + // observing the new connection, the inbound peer (here B) checks the addresses + // advertised by A via identify. If that set includes public addresses, then A may + // be reachable by a direct connection, in which case B attempts a unilateral + // connection upgrade by initiating a direct connection to A. + // + // https://github.com/libp2p/specs/blob/master/relay/DCUtR.md#the-protocol + self.queued_actions.extend([ + ActionBuilder::Connect { + peer_id: *peer_id, + attempt: 1, + handler: NotifyHandler::One(*connection_id), + }, + NetworkBehaviourAction::GenerateEvent( + Event::InitiatedDirectConnectionUpgrade { + remote_peer_id: *peer_id, + local_relayed_addr: match connected_point { + ConnectedPoint::Listener { local_addr, .. } => local_addr.clone(), + ConnectedPoint::Dialer { .. } => unreachable!("Due to outer if."), + }, + }, + ) + .into(), + ]); + } + } else { + self.direct_connections + .entry(*peer_id) + .or_default() + .insert(*connection_id); + } + } + + fn inject_dial_failure( + &mut self, + peer_id: Option, + handler: Self::ProtocolsHandler, + _error: &DialError, + ) { + match handler { + handler::Prototype::DirectConnection { + relayed_connection_id, + role: handler::Role::Initiator { attempt }, + } => { + let peer_id = + peer_id.expect("Peer of `Prototype::DirectConnection` is always known."); + if attempt < MAX_NUMBER_OF_UPGRADE_ATTEMPTS { + self.queued_actions.push_back(ActionBuilder::Connect { + peer_id, + handler: NotifyHandler::One(relayed_connection_id), + attempt: attempt + 1, + }); + } else { + self.queued_actions.extend([ + NetworkBehaviourAction::NotifyHandler { + peer_id, + handler: NotifyHandler::One(relayed_connection_id), + event: Either::Left( + handler::relayed::Command::UpgradeFinishedDontKeepAlive, + ), + } + .into(), + NetworkBehaviourAction::GenerateEvent( + Event::DirectConnectionUpgradeFailed { + remote_peer_id: peer_id, + error: UpgradeError::Dial, + }, + ) + .into(), + ]); + } + } + _ => {} + } + } + + fn inject_disconnected(&mut self, peer_id: &PeerId) { + assert!(!self.direct_connections.contains_key(peer_id)); + } + + fn inject_connection_closed( + &mut self, + peer_id: &PeerId, + connection_id: &ConnectionId, + connected_point: &ConnectedPoint, + _handler: <::ProtocolsHandler as IntoProtocolsHandler>::Handler, + ) { + if !connected_point.is_relayed() { + let connections = self + .direct_connections + .get_mut(peer_id) + .expect("Peer of direct connection to be tracked."); + connections + .remove(connection_id) + .then(|| ()) + .expect("Direct connection to be tracked."); + if connections.is_empty() { + self.direct_connections.remove(peer_id); + } + } + } + + fn inject_event( + &mut self, + event_source: PeerId, + connection: ConnectionId, + handler_event: <::Handler as ProtocolsHandler>::OutEvent, + ) { + match handler_event { + Either::Left(handler::relayed::Event::InboundConnectRequest { + inbound_connect, + remote_addr, + }) => { + self.queued_actions.extend([ + ActionBuilder::AcceptInboundConnect { + peer_id: event_source, + handler: NotifyHandler::One(connection), + inbound_connect, + }, + NetworkBehaviourAction::GenerateEvent( + Event::RemoteInitiatedDirectConnectionUpgrade { + remote_peer_id: event_source, + remote_relayed_addr: remote_addr, + }, + ) + .into(), + ]); + } + Either::Left(handler::relayed::Event::InboundNegotiationFailed { error }) => { + self.queued_actions.push_back( + NetworkBehaviourAction::GenerateEvent(Event::DirectConnectionUpgradeFailed { + remote_peer_id: event_source, + error: UpgradeError::Handler(error), + }) + .into(), + ); + } + Either::Left(handler::relayed::Event::InboundConnectNegotiated(remote_addrs)) => { + self.queued_actions.push_back( + NetworkBehaviourAction::Dial { + opts: DialOpts::peer_id(event_source) + .addresses(remote_addrs) + .condition(dial_opts::PeerCondition::Always) + .build(), + handler: handler::Prototype::DirectConnection { + relayed_connection_id: connection, + role: handler::Role::Listener, + }, + } + .into(), + ); + } + Either::Left(handler::relayed::Event::OutboundNegotiationFailed { error }) => { + self.queued_actions.push_back( + NetworkBehaviourAction::GenerateEvent(Event::DirectConnectionUpgradeFailed { + remote_peer_id: event_source, + error: UpgradeError::Handler(error), + }) + .into(), + ); + } + Either::Left(handler::relayed::Event::OutboundConnectNegotiated { + remote_addrs, + attempt, + }) => { + self.queued_actions.push_back( + NetworkBehaviourAction::Dial { + opts: DialOpts::peer_id(event_source) + .condition(dial_opts::PeerCondition::Always) + .addresses(remote_addrs) + .override_role() + .build(), + handler: handler::Prototype::DirectConnection { + relayed_connection_id: connection, + role: handler::Role::Initiator { attempt }, + }, + } + .into(), + ); + } + Either::Right(Either::Left( + handler::direct::Event::DirectConnectionUpgradeSucceeded { + relayed_connection_id, + }, + )) => { + self.queued_actions.extend([ + NetworkBehaviourAction::NotifyHandler { + peer_id: event_source, + handler: NotifyHandler::One(relayed_connection_id), + event: Either::Left( + handler::relayed::Command::UpgradeFinishedDontKeepAlive, + ), + } + .into(), + NetworkBehaviourAction::GenerateEvent( + Event::DirectConnectionUpgradeSucceeded { + remote_peer_id: event_source, + }, + ) + .into(), + ]); + } + Either::Right(Either::Right(event)) => void::unreachable(event), + }; + } + + fn poll( + &mut self, + _cx: &mut Context<'_>, + poll_parameters: &mut impl PollParameters, + ) -> Poll> { + if let Some(action) = self.queued_actions.pop_front() { + return Poll::Ready(action.build(poll_parameters)); + } + + Poll::Pending + } +} + +/// A [`NetworkBehaviourAction`], either complete, or still requiring data from [`PollParameters`] +/// before being returned in [`Behaviour::poll`]. +#[allow(clippy::large_enum_variant)] +enum ActionBuilder { + Done(NetworkBehaviourAction), + Connect { + attempt: u8, + handler: NotifyHandler, + peer_id: PeerId, + }, + AcceptInboundConnect { + inbound_connect: protocol::inbound::PendingConnect, + handler: NotifyHandler, + peer_id: PeerId, + }, +} + +impl From> for ActionBuilder { + fn from(action: NetworkBehaviourAction) -> Self { + Self::Done(action) + } +} + +impl ActionBuilder { + fn build( + self, + poll_parameters: &mut impl PollParameters, + ) -> NetworkBehaviourAction { + let obs_addrs = || { + poll_parameters + .external_addresses() + .filter(|a| !a.addr.iter().any(|p| p == Protocol::P2pCircuit)) + .map(|a| { + a.addr + .with(Protocol::P2p((*poll_parameters.local_peer_id()).into())) + }) + .collect() + }; + + match self { + ActionBuilder::Done(action) => action, + ActionBuilder::AcceptInboundConnect { + inbound_connect, + handler, + peer_id, + } => NetworkBehaviourAction::NotifyHandler { + handler, + peer_id, + event: Either::Left(handler::relayed::Command::AcceptInboundConnect { + inbound_connect, + obs_addrs: obs_addrs(), + }), + }, + ActionBuilder::Connect { + attempt, + handler, + peer_id, + } => NetworkBehaviourAction::NotifyHandler { + handler, + peer_id, + event: Either::Left(handler::relayed::Command::Connect { + attempt, + obs_addrs: obs_addrs(), + }), + }, + } + } +} diff --git a/protocols/dcutr/src/handler.rs b/protocols/dcutr/src/handler.rs new file mode 100644 index 00000000000..4dcc0f4d66d --- /dev/null +++ b/protocols/dcutr/src/handler.rs @@ -0,0 +1,81 @@ +// Copyright 2021 Protocol Labs. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use crate::protocol; +use either::Either; +use libp2p_core::connection::ConnectionId; +use libp2p_core::upgrade::{self, DeniedUpgrade}; +use libp2p_core::{ConnectedPoint, PeerId}; +use libp2p_swarm::protocols_handler::DummyProtocolsHandler; +use libp2p_swarm::protocols_handler::SendWrapper; +use libp2p_swarm::{IntoProtocolsHandler, ProtocolsHandler}; + +pub mod direct; +pub mod relayed; + +pub enum Prototype { + DirectConnection { + role: Role, + relayed_connection_id: ConnectionId, + }, + UnknownConnection, +} + +pub enum Role { + Initiator { attempt: u8 }, + Listener, +} + +impl IntoProtocolsHandler for Prototype { + type Handler = Either>; + + fn into_handler(self, _remote_peer_id: &PeerId, endpoint: &ConnectedPoint) -> Self::Handler { + match self { + Self::UnknownConnection => { + if endpoint.is_relayed() { + Either::Left(relayed::Handler::new(endpoint.clone())) + } else { + Either::Right(Either::Right(DummyProtocolsHandler::default())) + } + } + Self::DirectConnection { + relayed_connection_id, + .. + } => { + assert!( + !endpoint.is_relayed(), + "`Prototype::DirectConnection` is never created for relayed connection." + ); + Either::Right(Either::Left(direct::Handler::new(relayed_connection_id))) + } + } + } + + fn inbound_protocol(&self) -> ::InboundProtocol { + match self { + Prototype::UnknownConnection => upgrade::EitherUpgrade::A(SendWrapper( + upgrade::EitherUpgrade::A(protocol::inbound::Upgrade {}), + )), + Prototype::DirectConnection { .. } => { + upgrade::EitherUpgrade::A(SendWrapper(upgrade::EitherUpgrade::B(DeniedUpgrade))) + } + } + } +} diff --git a/protocols/dcutr/src/handler/direct.rs b/protocols/dcutr/src/handler/direct.rs new file mode 100644 index 00000000000..980e6e7f462 --- /dev/null +++ b/protocols/dcutr/src/handler/direct.rs @@ -0,0 +1,114 @@ +// Copyright 2021 Protocol Labs. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! [`ProtocolsHandler`] handling direct connection upgraded through a relayed connection. + +use libp2p_core::connection::ConnectionId; +use libp2p_core::upgrade::{DeniedUpgrade, InboundUpgrade, OutboundUpgrade}; +use libp2p_swarm::{ + KeepAlive, NegotiatedSubstream, ProtocolsHandler, ProtocolsHandlerEvent, + ProtocolsHandlerUpgrErr, SubstreamProtocol, +}; +use std::task::{Context, Poll}; +use void::Void; + +#[derive(Debug)] +pub enum Event { + DirectConnectionUpgradeSucceeded { relayed_connection_id: ConnectionId }, +} + +pub struct Handler { + relayed_connection_id: ConnectionId, + reported: bool, +} + +impl Handler { + pub(crate) fn new(relayed_connection_id: ConnectionId) -> Self { + Self { + reported: false, + relayed_connection_id, + } + } +} + +impl ProtocolsHandler for Handler { + type InEvent = void::Void; + type OutEvent = Event; + type Error = ProtocolsHandlerUpgrErr; + type InboundProtocol = DeniedUpgrade; + type OutboundProtocol = DeniedUpgrade; + type OutboundOpenInfo = Void; + type InboundOpenInfo = (); + + fn listen_protocol(&self) -> SubstreamProtocol { + SubstreamProtocol::new(DeniedUpgrade, ()) + } + + fn inject_fully_negotiated_inbound( + &mut self, + _: >::Output, + _: Self::InboundOpenInfo, + ) { + } + + fn inject_fully_negotiated_outbound( + &mut self, + _: >::Output, + _: Self::OutboundOpenInfo, + ) { + } + + fn inject_event(&mut self, _: Self::InEvent) {} + + fn inject_dial_upgrade_error( + &mut self, + _: Self::OutboundOpenInfo, + _: ProtocolsHandlerUpgrErr< + >::Error, + >, + ) { + } + + fn connection_keep_alive(&self) -> KeepAlive { + KeepAlive::No + } + + fn poll( + &mut self, + _: &mut Context<'_>, + ) -> Poll< + ProtocolsHandlerEvent< + Self::OutboundProtocol, + Self::OutboundOpenInfo, + Self::OutEvent, + Self::Error, + >, + > { + if !self.reported { + self.reported = true; + return Poll::Ready(ProtocolsHandlerEvent::Custom( + Event::DirectConnectionUpgradeSucceeded { + relayed_connection_id: self.relayed_connection_id, + }, + )); + } + Poll::Pending + } +} diff --git a/protocols/dcutr/src/handler/relayed.rs b/protocols/dcutr/src/handler/relayed.rs new file mode 100644 index 00000000000..b44af7cd59c --- /dev/null +++ b/protocols/dcutr/src/handler/relayed.rs @@ -0,0 +1,384 @@ +// Copyright 2021 Protocol Labs. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! [`ProtocolsHandler`] handling relayed connection potentially upgraded to a direct connection. + +use crate::protocol; +use futures::future::{BoxFuture, FutureExt}; +use futures::stream::{FuturesUnordered, StreamExt}; +use instant::Instant; +use libp2p_core::either::{EitherError, EitherOutput}; +use libp2p_core::multiaddr::Multiaddr; +use libp2p_core::upgrade::{self, DeniedUpgrade, NegotiationError, UpgradeError}; +use libp2p_core::ConnectedPoint; +use libp2p_swarm::protocols_handler::{InboundUpgradeSend, OutboundUpgradeSend}; +use libp2p_swarm::{ + KeepAlive, NegotiatedSubstream, ProtocolsHandler, ProtocolsHandlerEvent, + ProtocolsHandlerUpgrErr, SubstreamProtocol, +}; +use std::collections::VecDeque; +use std::fmt; +use std::task::{Context, Poll}; +use std::time::Duration; + +pub enum Command { + Connect { + obs_addrs: Vec, + attempt: u8, + }, + AcceptInboundConnect { + obs_addrs: Vec, + inbound_connect: protocol::inbound::PendingConnect, + }, + /// Upgrading the relayed connection to a direct connection either failed for good or succeeded. + /// There is no need to keep the relayed connection alive for the sake of upgrading to a direct + /// connection. + UpgradeFinishedDontKeepAlive, +} + +impl fmt::Debug for Command { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Command::Connect { obs_addrs, attempt } => f + .debug_struct("Command::Connect") + .field("obs_addrs", obs_addrs) + .field("attempt", attempt) + .finish(), + Command::AcceptInboundConnect { + obs_addrs, + inbound_connect: _, + } => f + .debug_struct("Command::AcceptInboundConnect") + .field("obs_addrs", obs_addrs) + .finish(), + Command::UpgradeFinishedDontKeepAlive => f + .debug_struct("Command::UpgradeFinishedDontKeepAlive") + .finish(), + } + } +} + +pub enum Event { + InboundConnectRequest { + inbound_connect: protocol::inbound::PendingConnect, + remote_addr: Multiaddr, + }, + InboundNegotiationFailed { + error: ProtocolsHandlerUpgrErr, + }, + InboundConnectNegotiated(Vec), + OutboundNegotiationFailed { + error: ProtocolsHandlerUpgrErr, + }, + OutboundConnectNegotiated { + remote_addrs: Vec, + attempt: u8, + }, +} + +impl fmt::Debug for Event { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Event::InboundConnectRequest { + inbound_connect: _, + remote_addr, + } => f + .debug_struct("Event::InboundConnectRequest") + .field("remote_addrs", remote_addr) + .finish(), + Event::InboundNegotiationFailed { error } => f + .debug_struct("Event::InboundNegotiationFailed") + .field("error", error) + .finish(), + Event::InboundConnectNegotiated(addrs) => f + .debug_tuple("Event::InboundConnectNegotiated") + .field(addrs) + .finish(), + Event::OutboundNegotiationFailed { error } => f + .debug_struct("Event::OutboundNegotiationFailed") + .field("error", error) + .finish(), + Event::OutboundConnectNegotiated { + remote_addrs, + attempt, + } => f + .debug_struct("Event::OutboundConnectNegotiated") + .field("remote_addrs", remote_addrs) + .field("attempt", attempt) + .finish(), + } + } +} + +pub struct Handler { + endpoint: ConnectedPoint, + /// A pending fatal error that results in the connection being closed. + pending_error: Option< + ProtocolsHandlerUpgrErr< + EitherError, + >, + >, + /// Queue of events to return when polled. + queued_events: VecDeque< + ProtocolsHandlerEvent< + ::OutboundProtocol, + ::OutboundOpenInfo, + ::OutEvent, + ::Error, + >, + >, + /// Inbound connects, accepted by the behaviour, pending completion. + inbound_connects: FuturesUnordered< + BoxFuture<'static, Result, protocol::inbound::UpgradeError>>, + >, + keep_alive: KeepAlive, +} + +impl Handler { + pub fn new(endpoint: ConnectedPoint) -> Self { + Self { + endpoint, + pending_error: Default::default(), + queued_events: Default::default(), + inbound_connects: Default::default(), + keep_alive: KeepAlive::Until(Instant::now() + Duration::from_secs(30)), + } + } +} + +impl ProtocolsHandler for Handler { + type InEvent = Command; + type OutEvent = Event; + type Error = ProtocolsHandlerUpgrErr< + EitherError, + >; + type InboundProtocol = upgrade::EitherUpgrade; + type OutboundProtocol = protocol::outbound::Upgrade; + type OutboundOpenInfo = u8; // Number of upgrade attempts. + type InboundOpenInfo = (); + + fn listen_protocol(&self) -> SubstreamProtocol { + match self.endpoint { + ConnectedPoint::Dialer { .. } => { + SubstreamProtocol::new(upgrade::EitherUpgrade::A(protocol::inbound::Upgrade {}), ()) + } + ConnectedPoint::Listener { .. } => { + // By the protocol specification the listening side of a relayed connection + // initiates the _direct connection upgrade_. In other words the listening side of + // the relayed connection opens a substream to the dialing side. (Connection roles + // and substream roles are reversed.) The listening side on a relayed connection + // never expects incoming substreams, hence the denied upgrade below. + SubstreamProtocol::new(upgrade::EitherUpgrade::B(DeniedUpgrade), ()) + } + } + } + + fn inject_fully_negotiated_inbound( + &mut self, + output: >::Output, + _: Self::InboundOpenInfo, + ) { + match output { + EitherOutput::First(inbound_connect) => { + let remote_addr = match &self.endpoint { + ConnectedPoint::Dialer { address, role_override: _ } => address.clone(), + ConnectedPoint::Listener { ..} => unreachable!("`::listen_protocol` denies all incoming substreams as a listener."), + }; + self.queued_events.push_back(ProtocolsHandlerEvent::Custom( + Event::InboundConnectRequest { + inbound_connect, + remote_addr, + }, + )); + } + // A connection listener denies all incoming substreams, thus none can ever be fully negotiated. + EitherOutput::Second(output) => void::unreachable(output), + } + } + + fn inject_fully_negotiated_outbound( + &mut self, + protocol::outbound::Connect { obs_addrs }: >::Output, + attempt: Self::OutboundOpenInfo, + ) { + assert!( + self.endpoint.is_listener(), + "A connection dialer never initiates a connection upgrade." + ); + self.queued_events.push_back(ProtocolsHandlerEvent::Custom( + Event::OutboundConnectNegotiated { + remote_addrs: obs_addrs, + attempt, + }, + )); + } + + fn inject_event(&mut self, event: Self::InEvent) { + match event { + Command::Connect { obs_addrs, attempt } => { + self.queued_events + .push_back(ProtocolsHandlerEvent::OutboundSubstreamRequest { + protocol: SubstreamProtocol::new( + protocol::outbound::Upgrade::new(obs_addrs), + attempt, + ), + }); + } + Command::AcceptInboundConnect { + inbound_connect, + obs_addrs, + } => { + self.inbound_connects + .push(inbound_connect.accept(obs_addrs).boxed()); + } + Command::UpgradeFinishedDontKeepAlive => { + self.keep_alive = KeepAlive::No; + } + } + } + + fn inject_listen_upgrade_error( + &mut self, + _: Self::InboundOpenInfo, + error: ProtocolsHandlerUpgrErr<::Error>, + ) { + match error { + ProtocolsHandlerUpgrErr::Timeout => { + self.queued_events.push_back(ProtocolsHandlerEvent::Custom( + Event::InboundNegotiationFailed { + error: ProtocolsHandlerUpgrErr::Timeout, + }, + )); + } + ProtocolsHandlerUpgrErr::Timer => { + self.queued_events.push_back(ProtocolsHandlerEvent::Custom( + Event::InboundNegotiationFailed { + error: ProtocolsHandlerUpgrErr::Timer, + }, + )); + } + ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)) => { + // The remote merely doesn't support the DCUtR protocol. + // This is no reason to close the connection, which may + // successfully communicate with other protocols already. + self.keep_alive = KeepAlive::No; + self.queued_events.push_back(ProtocolsHandlerEvent::Custom( + Event::InboundNegotiationFailed { + error: ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select( + NegotiationError::Failed, + )), + }, + )); + } + _ => { + // Anything else is considered a fatal error or misbehaviour of + // the remote peer and results in closing the connection. + self.pending_error = Some(error.map_upgrade_err(|e| { + e.map_err(|e| match e { + EitherError::A(e) => EitherError::A(e), + EitherError::B(v) => void::unreachable(v), + }) + })); + } + } + } + + fn inject_dial_upgrade_error( + &mut self, + _open_info: Self::OutboundOpenInfo, + error: ProtocolsHandlerUpgrErr<::Error>, + ) { + self.keep_alive = KeepAlive::No; + + match error { + ProtocolsHandlerUpgrErr::Timeout => { + self.queued_events.push_back(ProtocolsHandlerEvent::Custom( + Event::OutboundNegotiationFailed { + error: ProtocolsHandlerUpgrErr::Timeout, + }, + )); + } + ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)) => { + // The remote merely doesn't support the DCUtR protocol. + // This is no reason to close the connection, which may + // successfully communicate with other protocols already. + self.queued_events.push_back(ProtocolsHandlerEvent::Custom( + Event::OutboundNegotiationFailed { + error: ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select( + NegotiationError::Failed, + )), + }, + )); + } + _ => { + // Anything else is considered a fatal error or misbehaviour of + // the remote peer and results in closing the connection. + self.pending_error = + Some(error.map_upgrade_err(|e| e.map_err(|e| EitherError::B(e)))); + } + } + } + + fn connection_keep_alive(&self) -> KeepAlive { + self.keep_alive + } + + fn poll( + &mut self, + cx: &mut Context<'_>, + ) -> Poll< + ProtocolsHandlerEvent< + Self::OutboundProtocol, + Self::OutboundOpenInfo, + Self::OutEvent, + Self::Error, + >, + > { + // Check for a pending (fatal) error. + if let Some(err) = self.pending_error.take() { + // The handler will not be polled again by the `Swarm`. + return Poll::Ready(ProtocolsHandlerEvent::Close(err)); + } + + // Return queued events. + if let Some(event) = self.queued_events.pop_front() { + return Poll::Ready(event); + } + + while let Poll::Ready(Some(result)) = self.inbound_connects.poll_next_unpin(cx) { + match result { + Ok(addresses) => { + return Poll::Ready(ProtocolsHandlerEvent::Custom( + Event::InboundConnectNegotiated(addresses), + )); + } + Err(e) => { + return Poll::Ready(ProtocolsHandlerEvent::Close( + ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::A(e))), + )) + } + } + } + + Poll::Pending + } +} diff --git a/protocols/dcutr/src/lib.rs b/protocols/dcutr/src/lib.rs new file mode 100644 index 00000000000..5451f272101 --- /dev/null +++ b/protocols/dcutr/src/lib.rs @@ -0,0 +1,30 @@ +// Copyright 2021 Protocol Labs. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! Implementation of the [libp2p Direct Connection Upgrade through Relay +//! specification](https://github.com/libp2p/specs/blob/master/relay/DCUtR.md). + +pub mod behaviour; +mod handler; +mod protocol; + +mod message_proto { + include!(concat!(env!("OUT_DIR"), "/holepunch.pb.rs")); +} diff --git a/protocols/dcutr/src/message.proto b/protocols/dcutr/src/message.proto new file mode 100644 index 00000000000..ab5b220f2ea --- /dev/null +++ b/protocols/dcutr/src/message.proto @@ -0,0 +1,19 @@ +syntax = "proto2"; + +package holepunch.pb; + +message HolePunch { + enum Type { + CONNECT = 100; + SYNC = 300; + } + + required Type type=1; + + // For hole punching, we'll send some additional observed addresses to the remote peer + // that could have been filtered by the Host address factory (for example: AutoRelay removes all public addresses if peer has private reachability). + // This is a hack! + // We plan to have a better address discovery and advertisement mechanism in the future. + // See https://github.com/libp2p/go-libp2p-autonat/pull/98 + repeated bytes ObsAddrs = 2; +} diff --git a/protocols/dcutr/src/protocol.rs b/protocols/dcutr/src/protocol.rs new file mode 100644 index 00000000000..a69a27f79cf --- /dev/null +++ b/protocols/dcutr/src/protocol.rs @@ -0,0 +1,25 @@ +// Copyright 2021 Protocol Labs. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +mod codec; +pub mod inbound; +pub mod outbound; + +const PROTOCOL_NAME: &[u8; 13] = b"/libp2p/dcutr"; diff --git a/protocols/dcutr/src/protocol/codec.rs b/protocols/dcutr/src/protocol/codec.rs new file mode 100644 index 00000000000..9706d756e9e --- /dev/null +++ b/protocols/dcutr/src/protocol/codec.rs @@ -0,0 +1,88 @@ +// Copyright 2022 Protocol Labs. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use crate::message_proto; +use bytes::BytesMut; +use prost::Message; +use std::io::Cursor; +use thiserror::Error; +use unsigned_varint::codec::UviBytes; + +const MAX_MESSAGE_SIZE_BYTES: usize = 4096; + +pub struct Codec(UviBytes); + +impl Codec { + pub fn new() -> Self { + let mut codec = UviBytes::default(); + codec.set_max_len(MAX_MESSAGE_SIZE_BYTES); + Self(codec) + } +} + +impl asynchronous_codec::Encoder for Codec { + type Item = message_proto::HolePunch; + type Error = Error; + + fn encode( + &mut self, + item: Self::Item, + dst: &mut asynchronous_codec::BytesMut, + ) -> Result<(), Self::Error> { + let mut encoded_msg = BytesMut::new(); + item.encode(&mut encoded_msg) + .expect("BytesMut to have sufficient capacity."); + self.0 + .encode(encoded_msg.freeze(), dst) + .map_err(|e| e.into()) + } +} + +impl asynchronous_codec::Decoder for Codec { + type Item = message_proto::HolePunch; + type Error = Error; + + fn decode( + &mut self, + src: &mut asynchronous_codec::BytesMut, + ) -> Result, Self::Error> { + Ok(self + .0 + .decode(src)? + .map(|msg| message_proto::HolePunch::decode(Cursor::new(msg))) + .transpose()?) + } +} + +#[derive(Debug, Error)] +pub enum Error { + #[error("Failed to decode response: {0}.")] + Decode( + #[from] + #[source] + prost::DecodeError, + ), + #[error("Io error {0}")] + Io( + #[from] + #[source] + std::io::Error, + ), +} diff --git a/protocols/dcutr/src/protocol/inbound.rs b/protocols/dcutr/src/protocol/inbound.rs new file mode 100644 index 00000000000..c05e1072b8f --- /dev/null +++ b/protocols/dcutr/src/protocol/inbound.rs @@ -0,0 +1,143 @@ +// Copyright 2021 Protocol Labs. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use crate::message_proto::{hole_punch, HolePunch}; +use asynchronous_codec::Framed; +use futures::{future::BoxFuture, prelude::*}; +use libp2p_core::{multiaddr::Protocol, upgrade, Multiaddr}; +use libp2p_swarm::NegotiatedSubstream; +use std::convert::TryFrom; +use std::iter; +use thiserror::Error; + +pub struct Upgrade {} + +impl upgrade::UpgradeInfo for Upgrade { + type Info = &'static [u8]; + type InfoIter = iter::Once; + + fn protocol_info(&self) -> Self::InfoIter { + iter::once(super::PROTOCOL_NAME) + } +} + +impl upgrade::InboundUpgrade for Upgrade { + type Output = PendingConnect; + type Error = UpgradeError; + type Future = BoxFuture<'static, Result>; + + fn upgrade_inbound(self, substream: NegotiatedSubstream, _: Self::Info) -> Self::Future { + let mut substream = Framed::new(substream, super::codec::Codec::new()); + + async move { + let HolePunch { r#type, obs_addrs } = + substream + .next() + .await + .ok_or(super::codec::Error::Io(std::io::Error::new( + std::io::ErrorKind::UnexpectedEof, + "", + )))??; + + let obs_addrs = if obs_addrs.is_empty() { + return Err(UpgradeError::NoAddresses); + } else { + obs_addrs + .into_iter() + .map(Multiaddr::try_from) + // Filter out relayed addresses. + .filter(|a| match a { + Ok(a) => !a.iter().any(|p| p == Protocol::P2pCircuit), + Err(_) => true, + }) + .collect::, _>>() + .map_err(|_| UpgradeError::InvalidAddrs)? + }; + + let r#type = hole_punch::Type::from_i32(r#type).ok_or(UpgradeError::ParseTypeField)?; + + match r#type { + hole_punch::Type::Connect => {} + hole_punch::Type::Sync => return Err(UpgradeError::UnexpectedTypeSync), + } + + Ok(PendingConnect { + substream, + remote_obs_addrs: obs_addrs, + }) + } + .boxed() + } +} + +pub struct PendingConnect { + substream: Framed, + remote_obs_addrs: Vec, +} + +impl PendingConnect { + pub async fn accept( + mut self, + local_obs_addrs: Vec, + ) -> Result, UpgradeError> { + let msg = HolePunch { + r#type: hole_punch::Type::Connect.into(), + obs_addrs: local_obs_addrs.into_iter().map(|a| a.to_vec()).collect(), + }; + + self.substream.send(msg).await?; + let HolePunch { r#type, .. } = + self.substream + .next() + .await + .ok_or(super::codec::Error::Io(std::io::Error::new( + std::io::ErrorKind::UnexpectedEof, + "", + )))??; + + let r#type = hole_punch::Type::from_i32(r#type).ok_or(UpgradeError::ParseTypeField)?; + match r#type { + hole_punch::Type::Connect => return Err(UpgradeError::UnexpectedTypeConnect), + hole_punch::Type::Sync => {} + } + + Ok(self.remote_obs_addrs) + } +} + +#[derive(Debug, Error)] +pub enum UpgradeError { + #[error("Failed to encode or decode: {0}")] + Codec( + #[from] + #[source] + super::codec::Error, + ), + #[error("Expected at least one address in reservation.")] + NoAddresses, + #[error("Invalid addresses.")] + InvalidAddrs, + #[error("Failed to parse response type field.")] + ParseTypeField, + #[error("Unexpected message type 'connect'")] + UnexpectedTypeConnect, + #[error("Unexpected message type 'sync'")] + UnexpectedTypeSync, +} diff --git a/protocols/dcutr/src/protocol/outbound.rs b/protocols/dcutr/src/protocol/outbound.rs new file mode 100644 index 00000000000..332554c276a --- /dev/null +++ b/protocols/dcutr/src/protocol/outbound.rs @@ -0,0 +1,146 @@ +// Copyright 2021 Protocol Labs. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use crate::message_proto::{hole_punch, HolePunch}; +use asynchronous_codec::Framed; +use futures::{future::BoxFuture, prelude::*}; +use futures_timer::Delay; +use libp2p_core::{multiaddr::Protocol, upgrade, Multiaddr}; +use libp2p_swarm::NegotiatedSubstream; +use std::convert::TryFrom; +use std::iter; +use std::time::Instant; +use thiserror::Error; + +pub struct Upgrade { + obs_addrs: Vec, +} + +impl upgrade::UpgradeInfo for Upgrade { + type Info = &'static [u8]; + type InfoIter = iter::Once; + + fn protocol_info(&self) -> Self::InfoIter { + iter::once(super::PROTOCOL_NAME) + } +} + +impl Upgrade { + pub fn new(obs_addrs: Vec) -> Self { + Self { obs_addrs } + } +} + +impl upgrade::OutboundUpgrade for Upgrade { + type Output = Connect; + type Error = UpgradeError; + type Future = BoxFuture<'static, Result>; + + fn upgrade_outbound(self, substream: NegotiatedSubstream, _: Self::Info) -> Self::Future { + let mut substream = Framed::new(substream, super::codec::Codec::new()); + + let msg = HolePunch { + r#type: hole_punch::Type::Connect.into(), + obs_addrs: self.obs_addrs.into_iter().map(|a| a.to_vec()).collect(), + }; + + async move { + substream.send(msg).await?; + + let sent_time = Instant::now(); + + let HolePunch { r#type, obs_addrs } = + substream + .next() + .await + .ok_or(super::codec::Error::Io(std::io::Error::new( + std::io::ErrorKind::UnexpectedEof, + "", + )))??; + + let rtt = sent_time.elapsed(); + + let r#type = hole_punch::Type::from_i32(r#type).ok_or(UpgradeError::ParseTypeField)?; + match r#type { + hole_punch::Type::Connect => {} + hole_punch::Type::Sync => return Err(UpgradeError::UnexpectedTypeSync), + } + + let obs_addrs = if obs_addrs.is_empty() { + return Err(UpgradeError::NoAddresses); + } else { + obs_addrs + .into_iter() + .map(Multiaddr::try_from) + // Filter out relayed addresses. + .filter(|a| match a { + Ok(a) => !a.iter().any(|p| p == Protocol::P2pCircuit), + Err(_) => true, + }) + .collect::, _>>() + .map_err(|_| UpgradeError::InvalidAddrs)? + }; + + let msg = HolePunch { + r#type: hole_punch::Type::Sync.into(), + obs_addrs: vec![], + }; + + substream.send(msg).await?; + + Delay::new(rtt / 2).await; + + Ok(Connect { obs_addrs }) + } + .boxed() + } +} + +pub struct Connect { + pub obs_addrs: Vec, +} + +#[derive(Debug, Error)] +pub enum UpgradeError { + #[error("Failed to encode or decode: {0}")] + Codec( + #[from] + #[source] + super::codec::Error, + ), + #[error("Expected 'status' field to be set.")] + MissingStatusField, + #[error("Expected 'reservation' field to be set.")] + MissingReservationField, + #[error("Expected at least one address in reservation.")] + NoAddresses, + #[error("Invalid expiration timestamp in reservation.")] + InvalidReservationExpiration, + #[error("Invalid addresses in reservation.")] + InvalidAddrs, + #[error("Failed to parse response type field.")] + ParseTypeField, + #[error("Unexpected message type 'connect'")] + UnexpectedTypeConnect, + #[error("Unexpected message type 'sync'")] + UnexpectedTypeSync, + #[error("Failed to parse response type field.")] + ParseStatusField, +} diff --git a/protocols/dcutr/tests/lib.rs b/protocols/dcutr/tests/lib.rs new file mode 100644 index 00000000000..a53a5319f34 --- /dev/null +++ b/protocols/dcutr/tests/lib.rs @@ -0,0 +1,251 @@ +// Copyright 2021 Protocol Labs. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use futures::executor::LocalPool; +use futures::future::FutureExt; +use futures::io::{AsyncRead, AsyncWrite}; +use futures::stream::StreamExt; +use futures::task::Spawn; +use libp2p::core::multiaddr::{Multiaddr, Protocol}; +use libp2p::core::muxing::StreamMuxerBox; +use libp2p::core::transport::upgrade::Version; +use libp2p::core::transport::{Boxed, MemoryTransport, OrTransport, Transport}; +use libp2p::core::PublicKey; +use libp2p::core::{identity, PeerId}; +use libp2p::dcutr; +use libp2p::plaintext::PlainText2Config; +use libp2p::relay::v2::client; +use libp2p::relay::v2::relay; +use libp2p::NetworkBehaviour; +use libp2p_swarm::{AddressScore, NetworkBehaviour, Swarm, SwarmEvent}; +use std::time::Duration; + +#[test] +fn connect() { + let _ = env_logger::try_init(); + let mut pool = LocalPool::new(); + + let relay_addr = Multiaddr::empty().with(Protocol::Memory(rand::random::())); + let mut relay = build_relay(); + let relay_peer_id = *relay.local_peer_id(); + + relay.listen_on(relay_addr.clone()).unwrap(); + relay.add_external_address(relay_addr.clone(), AddressScore::Infinite); + spawn_swarm_on_pool(&pool, relay); + + let mut dst = build_client(); + let dst_peer_id = *dst.local_peer_id(); + let dst_relayed_addr = relay_addr + .clone() + .with(Protocol::P2p(relay_peer_id.into())) + .with(Protocol::P2pCircuit) + .with(Protocol::P2p(dst_peer_id.into())); + let dst_addr = Multiaddr::empty().with(Protocol::Memory(rand::random::())); + + dst.listen_on(dst_relayed_addr.clone()).unwrap(); + dst.listen_on(dst_addr.clone()).unwrap(); + dst.add_external_address(dst_addr.clone(), AddressScore::Infinite); + + pool.run_until(wait_for_reservation( + &mut dst, + dst_relayed_addr.clone(), + relay_peer_id, + false, // No renewal. + )); + spawn_swarm_on_pool(&pool, dst); + + let mut src = build_client(); + let src_addr = Multiaddr::empty().with(Protocol::Memory(rand::random::())); + src.listen_on(src_addr.clone()).unwrap(); + pool.run_until(wait_for_new_listen_addr(&mut src, &src_addr)); + src.add_external_address(src_addr.clone(), AddressScore::Infinite); + + src.dial(dst_relayed_addr.clone()).unwrap(); + + pool.run_until(wait_for_connection_established(&mut src, &dst_relayed_addr)); + match pool.run_until(wait_for_dcutr_event(&mut src)) { + dcutr::behaviour::Event::RemoteInitiatedDirectConnectionUpgrade { + remote_peer_id, + remote_relayed_addr, + } if remote_peer_id == dst_peer_id && remote_relayed_addr == dst_relayed_addr => {} + e => panic!("Unexpected event: {:?}.", e), + } + pool.run_until(wait_for_connection_established( + &mut src, + &dst_addr.with(Protocol::P2p(dst_peer_id.into())), + )); +} + +fn build_relay() -> Swarm { + let local_key = identity::Keypair::generate_ed25519(); + let local_public_key = local_key.public(); + let local_peer_id = local_public_key.clone().to_peer_id(); + + let transport = build_transport(MemoryTransport::default().boxed(), local_public_key); + + Swarm::new( + transport, + relay::Relay::new( + local_peer_id, + relay::Config { + reservation_duration: Duration::from_secs(2), + ..Default::default() + }, + ), + local_peer_id, + ) +} + +fn build_client() -> Swarm { + let local_key = identity::Keypair::generate_ed25519(); + let local_public_key = local_key.public(); + let local_peer_id = local_public_key.clone().to_peer_id(); + + let (relay_transport, behaviour) = client::Client::new_transport_and_behaviour(local_peer_id); + let transport = build_transport( + OrTransport::new(relay_transport, MemoryTransport::default()).boxed(), + local_public_key, + ); + + Swarm::new( + transport, + Client { + relay: behaviour, + dcutr: dcutr::behaviour::Behaviour::new(), + }, + local_peer_id, + ) +} + +fn build_transport( + transport: Boxed, + local_public_key: PublicKey, +) -> Boxed<(PeerId, StreamMuxerBox)> +where + StreamSink: AsyncRead + AsyncWrite + Send + Unpin + 'static, +{ + let transport = transport + .upgrade(Version::V1) + .authenticate(PlainText2Config { local_public_key }) + .multiplex(libp2p_yamux::YamuxConfig::default()) + .boxed(); + + transport +} + +#[derive(NetworkBehaviour)] +#[behaviour(out_event = "ClientEvent", event_process = false)] +struct Client { + relay: client::Client, + dcutr: dcutr::behaviour::Behaviour, +} + +#[derive(Debug)] +enum ClientEvent { + Relay(client::Event), + Dcutr(dcutr::behaviour::Event), +} + +impl From for ClientEvent { + fn from(event: client::Event) -> Self { + ClientEvent::Relay(event) + } +} + +impl From for ClientEvent { + fn from(event: dcutr::behaviour::Event) -> Self { + ClientEvent::Dcutr(event) + } +} + +fn spawn_swarm_on_pool(pool: &LocalPool, swarm: Swarm) { + pool.spawner() + .spawn_obj(swarm.collect::>().map(|_| ()).boxed().into()) + .unwrap(); +} + +async fn wait_for_reservation( + client: &mut Swarm, + client_addr: Multiaddr, + relay_peer_id: PeerId, + is_renewal: bool, +) { + let mut new_listen_addr_for_relayed_addr = false; + let mut reservation_req_accepted = false; + loop { + match client.select_next_some().await { + SwarmEvent::NewListenAddr { address, .. } if address != client_addr => {} + SwarmEvent::NewListenAddr { address, .. } if address == client_addr => { + new_listen_addr_for_relayed_addr = true; + if reservation_req_accepted { + break; + } + } + SwarmEvent::Behaviour(ClientEvent::Relay(client::Event::ReservationReqAccepted { + relay_peer_id: peer_id, + renewal, + .. + })) if relay_peer_id == peer_id && renewal == is_renewal => { + reservation_req_accepted = true; + if new_listen_addr_for_relayed_addr { + break; + } + } + SwarmEvent::Dialing(peer_id) if peer_id == relay_peer_id => {} + SwarmEvent::ConnectionEstablished { peer_id, .. } if peer_id == relay_peer_id => {} + e => panic!("{:?}", e), + } + } +} + +async fn wait_for_connection_established(client: &mut Swarm, addr: &Multiaddr) { + loop { + match client.select_next_some().await { + SwarmEvent::IncomingConnection { .. } => {} + SwarmEvent::ConnectionEstablished { endpoint, .. } + if endpoint.get_remote_address() == addr => + { + break + } + SwarmEvent::Dialing(_) => {} + SwarmEvent::Behaviour(ClientEvent::Relay( + client::Event::OutboundCircuitEstablished { .. }, + )) => {} + SwarmEvent::ConnectionEstablished { .. } => {} + e => panic!("{:?}", e), + } + } +} + +async fn wait_for_new_listen_addr(client: &mut Swarm, new_addr: &Multiaddr) { + match client.select_next_some().await { + SwarmEvent::NewListenAddr { address, .. } if address == *new_addr => {} + e => panic!("{:?}", e), + } +} + +async fn wait_for_dcutr_event(client: &mut Swarm) -> dcutr::behaviour::Event { + loop { + match client.select_next_some().await { + SwarmEvent::Behaviour(ClientEvent::Dcutr(e)) => return e, + e => panic!("{:?}", e), + } + } +} diff --git a/src/lib.rs b/src/lib.rs index b913a94121e..dbfe64cb43a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -46,6 +46,10 @@ pub use multiaddr; pub use libp2p_autonat as autonat; #[doc(inline)] pub use libp2p_core as core; +#[cfg(feature = "dcutr")] +#[cfg_attr(docsrs, doc(cfg(feature = "dcutr")))] +#[doc(inline)] +pub use libp2p_dcutr as dcutr; #[cfg(feature = "deflate")] #[cfg_attr(docsrs, doc(cfg(feature = "deflate")))] #[cfg(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")))]