diff --git a/swarm-derive/src/lib.rs b/swarm-derive/src/lib.rs index dafd1076218..e54cd058daf 100644 --- a/swarm-derive/src/lib.rs +++ b/swarm-derive/src/lib.rs @@ -732,6 +732,12 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> syn::Result { return std::task::Poll::Ready(#network_behaviour_action::Dial { opts }); } + std::task::Poll::Ready(#network_behaviour_action::ListenOn { opts }) => { + return std::task::Poll::Ready(#network_behaviour_action::ListenOn { opts }); + } + std::task::Poll::Ready(#network_behaviour_action::RemoveListener { id }) => { + return std::task::Poll::Ready(#network_behaviour_action::RemoveListener { id }); + } std::task::Poll::Ready(#network_behaviour_action::NotifyHandler { peer_id, handler, event }) => { return std::task::Poll::Ready(#network_behaviour_action::NotifyHandler { peer_id, diff --git a/swarm/CHANGELOG.md b/swarm/CHANGELOG.md index d18629a0eb0..b91e866305d 100644 --- a/swarm/CHANGELOG.md +++ b/swarm/CHANGELOG.md @@ -1,5 +1,8 @@ ## 0.43.0 - unreleased +- Allow `NetworkBehaviours` to create and remove listeners. + See [PR 3292]. + - Raise MSRV to 1.65. See [PR 3715]. @@ -61,6 +64,7 @@ - Remove deprecated items. See [PR 3956]. +[PR 3292]: https://github.com/libp2p/rust-libp2p/pull/3292 [PR 3605]: https://github.com/libp2p/rust-libp2p/pull/3605 [PR 3651]: https://github.com/libp2p/rust-libp2p/pull/3651 [PR 3715]: https://github.com/libp2p/rust-libp2p/pull/3715 diff --git a/swarm/src/behaviour.rs b/swarm/src/behaviour.rs index 1ddec51dfe4..0615457291a 100644 --- a/swarm/src/behaviour.rs +++ b/swarm/src/behaviour.rs @@ -28,6 +28,7 @@ pub use listen_addresses::ListenAddresses; use crate::connection::ConnectionId; use crate::dial_opts::DialOpts; +use crate::listen_opts::ListenOpts; use crate::{ ConnectionDenied, ConnectionHandler, DialError, ListenError, THandler, THandlerInEvent, THandlerOutEvent, @@ -250,6 +251,12 @@ pub enum ToSwarm { /// This allows a [`NetworkBehaviour`] to identify a connection that resulted out of its own dial request. Dial { opts: DialOpts }, + /// Instructs the [`Swarm`](crate::Swarm) to listen on the provided address. + ListenOn { opts: ListenOpts }, + + /// Instructs the [`Swarm`](crate::Swarm) to remove the listener. + RemoveListener { id: ListenerId }, + /// Instructs the `Swarm` to send an event to the handler dedicated to a /// connection with a peer. /// @@ -324,6 +331,8 @@ impl ToSwarm { match self { ToSwarm::GenerateEvent(e) => ToSwarm::GenerateEvent(e), ToSwarm::Dial { opts } => ToSwarm::Dial { opts }, + ToSwarm::ListenOn { opts } => ToSwarm::ListenOn { opts }, + ToSwarm::RemoveListener { id } => ToSwarm::RemoveListener { id }, ToSwarm::NotifyHandler { peer_id, handler, @@ -353,6 +362,8 @@ impl ToSwarm { match self { ToSwarm::GenerateEvent(e) => ToSwarm::GenerateEvent(f(e)), ToSwarm::Dial { opts } => ToSwarm::Dial { opts }, + ToSwarm::ListenOn { opts } => ToSwarm::ListenOn { opts }, + ToSwarm::RemoveListener { id } => ToSwarm::RemoveListener { id }, ToSwarm::NotifyHandler { peer_id, handler, diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index 03e6a1efe57..9ea010ec61e 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -68,6 +68,7 @@ pub mod dial_opts; pub mod dummy; pub mod handler; pub mod keep_alive; +mod listen_opts; /// Bundles all symbols required for the [`libp2p_swarm_derive::NetworkBehaviour`] macro. #[doc(hidden)] @@ -121,6 +122,7 @@ pub use handler::{ }; #[cfg(feature = "macros")] pub use libp2p_swarm_derive::NetworkBehaviour; +pub use listen_opts::ListenOpts; pub use stream::Stream; pub use stream_protocol::{InvalidProtocol, StreamProtocol}; @@ -370,12 +372,9 @@ where /// Listeners report their new listening addresses as [`SwarmEvent::NewListenAddr`]. /// Depending on the underlying transport, one listener may have multiple listening addresses. pub fn listen_on(&mut self, addr: Multiaddr) -> Result> { - let id = ListenerId::next(); - self.transport.listen_on(id, addr)?; - self.behaviour - .on_swarm_event(FromSwarm::NewListener(behaviour::NewListener { - listener_id: id, - })); + let opts = ListenOpts::new(addr); + let id = opts.listener_id(); + self.add_listener(opts)?; Ok(id) } @@ -542,6 +541,28 @@ where self.confirmed_external_addr.iter() } + fn add_listener(&mut self, opts: ListenOpts) -> Result<(), TransportError> { + let addr = opts.address(); + let listener_id = opts.listener_id(); + + if let Err(e) = self.transport.listen_on(listener_id, addr.clone()) { + self.behaviour + .on_swarm_event(FromSwarm::ListenerError(behaviour::ListenerError { + listener_id, + err: &e, + })); + + return Err(e); + } + + self.behaviour + .on_swarm_event(FromSwarm::NewListener(behaviour::NewListener { + listener_id, + })); + + Ok(()) + } + /// Add a **confirmed** external address for the local node. /// /// This function should only be called with addresses that are guaranteed to be reachable. @@ -1014,6 +1035,13 @@ where }); } } + ToSwarm::ListenOn { opts } => { + // Error is dispatched internally, safe to ignore. + let _ = self.add_listener(opts); + } + ToSwarm::RemoveListener { id } => { + self.remove_listener(id); + } ToSwarm::NotifyHandler { peer_id, handler, diff --git a/swarm/src/listen_opts.rs b/swarm/src/listen_opts.rs new file mode 100644 index 00000000000..9c4d69a6fa0 --- /dev/null +++ b/swarm/src/listen_opts.rs @@ -0,0 +1,33 @@ +use crate::ListenerId; +use libp2p_core::Multiaddr; + +#[derive(Debug)] +pub struct ListenOpts { + id: ListenerId, + address: Multiaddr, +} + +impl ListenOpts { + pub fn new(address: Multiaddr) -> ListenOpts { + ListenOpts { + id: ListenerId::next(), + address, + } + } + + /// Get the [`ListenerId`] of this listen attempt + pub fn listener_id(&self) -> ListenerId { + self.id + } + + /// Get the [`Multiaddr`] that is being listened on + pub fn address(&self) -> &Multiaddr { + &self.address + } +} + +impl From for ListenOpts { + fn from(addr: Multiaddr) -> Self { + ListenOpts::new(addr) + } +} diff --git a/swarm/tests/listener.rs b/swarm/tests/listener.rs new file mode 100644 index 00000000000..71d92cb0e1f --- /dev/null +++ b/swarm/tests/listener.rs @@ -0,0 +1,143 @@ +use std::{ + collections::{HashSet, VecDeque}, + task::{Context, Poll}, +}; + +use libp2p_core::{multiaddr::Protocol, transport::ListenerId, Endpoint, Multiaddr}; +use libp2p_identity::PeerId; +use libp2p_swarm::{ + derive_prelude::NewListener, dummy, ConnectionDenied, ConnectionId, FromSwarm, ListenOpts, + ListenerClosed, ListenerError, NetworkBehaviour, NewListenAddr, PollParameters, Swarm, + SwarmEvent, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm, +}; + +use libp2p_swarm_test::SwarmExt; + +#[async_std::test] +async fn behaviour_listener() { + let mut swarm = Swarm::new_ephemeral(|_| Behaviour::default()); + let addr: Multiaddr = Protocol::Memory(0).into(); + let id = swarm.behaviour_mut().listen(addr.clone()); + + let address = swarm + .wait(|e| match e { + SwarmEvent::NewListenAddr { + listener_id, + address, + } => { + assert_eq!(listener_id, id); + Some(address) + } + _ => None, + }) + .await; + + swarm.behaviour_mut().stop_listening(id); + + swarm + .wait(|e| match e { + SwarmEvent::ListenerClosed { + listener_id, + addresses, + reason, + } => { + assert_eq!(listener_id, id); + assert!(addresses.contains(&address)); + assert!(reason.is_ok()); + Some(()) + } + _ => None, + }) + .await; +} + +#[derive(Default)] +struct Behaviour { + events: VecDeque::ToSwarm, THandlerInEvent>>, + listeners: HashSet, +} + +impl Behaviour { + pub(crate) fn listen(&mut self, addr: Multiaddr) -> ListenerId { + let opts = ListenOpts::new(addr); + let listener_id = opts.listener_id(); + assert!(!self.listeners.contains(&listener_id)); + self.events.push_back(ToSwarm::ListenOn { opts }); + self.listeners.insert(listener_id); + + listener_id + } + + pub(crate) fn stop_listening(&mut self, id: ListenerId) { + self.events.push_back(ToSwarm::RemoveListener { id }); + } +} + +impl NetworkBehaviour for Behaviour { + type ConnectionHandler = dummy::ConnectionHandler; + type ToSwarm = void::Void; + + fn handle_established_inbound_connection( + &mut self, + _: ConnectionId, + _: PeerId, + _: &Multiaddr, + _: &Multiaddr, + ) -> Result, ConnectionDenied> { + Ok(dummy::ConnectionHandler) + } + + fn handle_established_outbound_connection( + &mut self, + _: ConnectionId, + _: PeerId, + _: &Multiaddr, + _: Endpoint, + ) -> Result, ConnectionDenied> { + Ok(dummy::ConnectionHandler) + } + + fn on_connection_handler_event( + &mut self, + _: PeerId, + _: ConnectionId, + _: THandlerOutEvent, + ) { + } + + fn on_swarm_event(&mut self, event: FromSwarm) { + match event { + FromSwarm::NewListener(NewListener { listener_id }) => { + assert!(self.listeners.contains(&listener_id)); + } + FromSwarm::NewListenAddr(NewListenAddr { listener_id, .. }) => { + assert!(self.listeners.contains(&listener_id)); + } + FromSwarm::ListenerError(ListenerError { listener_id, err }) => { + panic!("Error for listener {listener_id:?}: {err}"); + } + FromSwarm::ListenerClosed(ListenerClosed { + listener_id, + reason, + }) => { + assert!(self.listeners.contains(&listener_id)); + assert!(reason.is_ok()); + self.listeners.remove(&listener_id); + assert!(!self.listeners.contains(&listener_id)); + } + _ => {} + } + } + + fn poll( + &mut self, + _: &mut Context<'_>, + _: &mut impl PollParameters, + ) -> Poll>> { + if let Some(event) = self.events.pop_front() { + return Poll::Ready(event); + } + + Poll::Pending + } +}