Skip to content

Commit

Permalink
feat(swarm): allow NetworkBehaviours to create and remove listeners
Browse files Browse the repository at this point in the history
This extends `ToSwarm` to add `ToSwarm::ListenOn` and `ToSwarm::RemoveListener`, which allows creating and removing listeners from a `NetworkBehaviour`.

Resolves #3291.

Pull-Request: #3292.
  • Loading branch information
dariusc93 authored Jun 9, 2023
1 parent 4532302 commit c2230f9
Show file tree
Hide file tree
Showing 6 changed files with 231 additions and 6 deletions.
6 changes: 6 additions & 0 deletions swarm-derive/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -732,6 +732,12 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> syn::Result<Toke
std::task::Poll::Ready(#network_behaviour_action::Dial { opts }) => {
return std::task::Poll::Ready(#network_behaviour_action::Dial { opts });
}
std::task::Poll::Ready(#network_behaviour_action::ListenOn { opts }) => {
return std::task::Poll::Ready(#network_behaviour_action::ListenOn { opts });
}
std::task::Poll::Ready(#network_behaviour_action::RemoveListener { id }) => {
return std::task::Poll::Ready(#network_behaviour_action::RemoveListener { id });
}
std::task::Poll::Ready(#network_behaviour_action::NotifyHandler { peer_id, handler, event }) => {
return std::task::Poll::Ready(#network_behaviour_action::NotifyHandler {
peer_id,
Expand Down
4 changes: 4 additions & 0 deletions swarm/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
## 0.43.0 - unreleased

- Allow `NetworkBehaviours` to create and remove listeners.
See [PR 3292].

- Raise MSRV to 1.65.
See [PR 3715].

Expand Down Expand Up @@ -61,6 +64,7 @@

- Remove deprecated items. See [PR 3956].

[PR 3292]: https://github.com/libp2p/rust-libp2p/pull/3292
[PR 3605]: https://github.com/libp2p/rust-libp2p/pull/3605
[PR 3651]: https://github.com/libp2p/rust-libp2p/pull/3651
[PR 3715]: https://github.com/libp2p/rust-libp2p/pull/3715
Expand Down
11 changes: 11 additions & 0 deletions swarm/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ pub use listen_addresses::ListenAddresses;

use crate::connection::ConnectionId;
use crate::dial_opts::DialOpts;
use crate::listen_opts::ListenOpts;
use crate::{
ConnectionDenied, ConnectionHandler, DialError, ListenError, THandler, THandlerInEvent,
THandlerOutEvent,
Expand Down Expand Up @@ -250,6 +251,12 @@ pub enum ToSwarm<TOutEvent, TInEvent> {
/// This allows a [`NetworkBehaviour`] to identify a connection that resulted out of its own dial request.
Dial { opts: DialOpts },

/// Instructs the [`Swarm`](crate::Swarm) to listen on the provided address.
ListenOn { opts: ListenOpts },

/// Instructs the [`Swarm`](crate::Swarm) to remove the listener.
RemoveListener { id: ListenerId },

/// Instructs the `Swarm` to send an event to the handler dedicated to a
/// connection with a peer.
///
Expand Down Expand Up @@ -324,6 +331,8 @@ impl<TOutEvent, TInEventOld> ToSwarm<TOutEvent, TInEventOld> {
match self {
ToSwarm::GenerateEvent(e) => ToSwarm::GenerateEvent(e),
ToSwarm::Dial { opts } => ToSwarm::Dial { opts },
ToSwarm::ListenOn { opts } => ToSwarm::ListenOn { opts },
ToSwarm::RemoveListener { id } => ToSwarm::RemoveListener { id },
ToSwarm::NotifyHandler {
peer_id,
handler,
Expand Down Expand Up @@ -353,6 +362,8 @@ impl<TOutEvent, THandlerIn> ToSwarm<TOutEvent, THandlerIn> {
match self {
ToSwarm::GenerateEvent(e) => ToSwarm::GenerateEvent(f(e)),
ToSwarm::Dial { opts } => ToSwarm::Dial { opts },
ToSwarm::ListenOn { opts } => ToSwarm::ListenOn { opts },
ToSwarm::RemoveListener { id } => ToSwarm::RemoveListener { id },
ToSwarm::NotifyHandler {
peer_id,
handler,
Expand Down
40 changes: 34 additions & 6 deletions swarm/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ pub mod dial_opts;
pub mod dummy;
pub mod handler;
pub mod keep_alive;
mod listen_opts;

/// Bundles all symbols required for the [`libp2p_swarm_derive::NetworkBehaviour`] macro.
#[doc(hidden)]
Expand Down Expand Up @@ -121,6 +122,7 @@ pub use handler::{
};
#[cfg(feature = "macros")]
pub use libp2p_swarm_derive::NetworkBehaviour;
pub use listen_opts::ListenOpts;
pub use stream::Stream;
pub use stream_protocol::{InvalidProtocol, StreamProtocol};

Expand Down Expand Up @@ -370,12 +372,9 @@ where
/// Listeners report their new listening addresses as [`SwarmEvent::NewListenAddr`].
/// Depending on the underlying transport, one listener may have multiple listening addresses.
pub fn listen_on(&mut self, addr: Multiaddr) -> Result<ListenerId, TransportError<io::Error>> {
let id = ListenerId::next();
self.transport.listen_on(id, addr)?;
self.behaviour
.on_swarm_event(FromSwarm::NewListener(behaviour::NewListener {
listener_id: id,
}));
let opts = ListenOpts::new(addr);
let id = opts.listener_id();
self.add_listener(opts)?;
Ok(id)
}

Expand Down Expand Up @@ -542,6 +541,28 @@ where
self.confirmed_external_addr.iter()
}

fn add_listener(&mut self, opts: ListenOpts) -> Result<(), TransportError<io::Error>> {
let addr = opts.address();
let listener_id = opts.listener_id();

if let Err(e) = self.transport.listen_on(listener_id, addr.clone()) {
self.behaviour
.on_swarm_event(FromSwarm::ListenerError(behaviour::ListenerError {
listener_id,
err: &e,
}));

return Err(e);
}

self.behaviour
.on_swarm_event(FromSwarm::NewListener(behaviour::NewListener {
listener_id,
}));

Ok(())
}

/// Add a **confirmed** external address for the local node.
///
/// This function should only be called with addresses that are guaranteed to be reachable.
Expand Down Expand Up @@ -1014,6 +1035,13 @@ where
});
}
}
ToSwarm::ListenOn { opts } => {
// Error is dispatched internally, safe to ignore.
let _ = self.add_listener(opts);
}
ToSwarm::RemoveListener { id } => {
self.remove_listener(id);
}
ToSwarm::NotifyHandler {
peer_id,
handler,
Expand Down
33 changes: 33 additions & 0 deletions swarm/src/listen_opts.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
use crate::ListenerId;
use libp2p_core::Multiaddr;

#[derive(Debug)]
pub struct ListenOpts {
id: ListenerId,
address: Multiaddr,
}

impl ListenOpts {
pub fn new(address: Multiaddr) -> ListenOpts {
ListenOpts {
id: ListenerId::next(),
address,
}
}

/// Get the [`ListenerId`] of this listen attempt
pub fn listener_id(&self) -> ListenerId {
self.id
}

/// Get the [`Multiaddr`] that is being listened on
pub fn address(&self) -> &Multiaddr {
&self.address
}
}

impl From<Multiaddr> for ListenOpts {
fn from(addr: Multiaddr) -> Self {
ListenOpts::new(addr)
}
}
143 changes: 143 additions & 0 deletions swarm/tests/listener.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
use std::{
collections::{HashSet, VecDeque},
task::{Context, Poll},
};

use libp2p_core::{multiaddr::Protocol, transport::ListenerId, Endpoint, Multiaddr};
use libp2p_identity::PeerId;
use libp2p_swarm::{
derive_prelude::NewListener, dummy, ConnectionDenied, ConnectionId, FromSwarm, ListenOpts,
ListenerClosed, ListenerError, NetworkBehaviour, NewListenAddr, PollParameters, Swarm,
SwarmEvent, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
};

use libp2p_swarm_test::SwarmExt;

#[async_std::test]
async fn behaviour_listener() {
let mut swarm = Swarm::new_ephemeral(|_| Behaviour::default());
let addr: Multiaddr = Protocol::Memory(0).into();
let id = swarm.behaviour_mut().listen(addr.clone());

let address = swarm
.wait(|e| match e {
SwarmEvent::NewListenAddr {
listener_id,
address,
} => {
assert_eq!(listener_id, id);
Some(address)
}
_ => None,
})
.await;

swarm.behaviour_mut().stop_listening(id);

swarm
.wait(|e| match e {
SwarmEvent::ListenerClosed {
listener_id,
addresses,
reason,
} => {
assert_eq!(listener_id, id);
assert!(addresses.contains(&address));
assert!(reason.is_ok());
Some(())
}
_ => None,
})
.await;
}

#[derive(Default)]
struct Behaviour {
events: VecDeque<ToSwarm<<Self as NetworkBehaviour>::ToSwarm, THandlerInEvent<Self>>>,
listeners: HashSet<ListenerId>,
}

impl Behaviour {
pub(crate) fn listen(&mut self, addr: Multiaddr) -> ListenerId {
let opts = ListenOpts::new(addr);
let listener_id = opts.listener_id();
assert!(!self.listeners.contains(&listener_id));
self.events.push_back(ToSwarm::ListenOn { opts });
self.listeners.insert(listener_id);

listener_id
}

pub(crate) fn stop_listening(&mut self, id: ListenerId) {
self.events.push_back(ToSwarm::RemoveListener { id });
}
}

impl NetworkBehaviour for Behaviour {
type ConnectionHandler = dummy::ConnectionHandler;
type ToSwarm = void::Void;

fn handle_established_inbound_connection(
&mut self,
_: ConnectionId,
_: PeerId,
_: &Multiaddr,
_: &Multiaddr,
) -> Result<libp2p_swarm::THandler<Self>, ConnectionDenied> {
Ok(dummy::ConnectionHandler)
}

fn handle_established_outbound_connection(
&mut self,
_: ConnectionId,
_: PeerId,
_: &Multiaddr,
_: Endpoint,
) -> Result<THandler<Self>, ConnectionDenied> {
Ok(dummy::ConnectionHandler)
}

fn on_connection_handler_event(
&mut self,
_: PeerId,
_: ConnectionId,
_: THandlerOutEvent<Self>,
) {
}

fn on_swarm_event(&mut self, event: FromSwarm<Self::ConnectionHandler>) {
match event {
FromSwarm::NewListener(NewListener { listener_id }) => {
assert!(self.listeners.contains(&listener_id));
}
FromSwarm::NewListenAddr(NewListenAddr { listener_id, .. }) => {
assert!(self.listeners.contains(&listener_id));
}
FromSwarm::ListenerError(ListenerError { listener_id, err }) => {
panic!("Error for listener {listener_id:?}: {err}");
}
FromSwarm::ListenerClosed(ListenerClosed {
listener_id,
reason,
}) => {
assert!(self.listeners.contains(&listener_id));
assert!(reason.is_ok());
self.listeners.remove(&listener_id);
assert!(!self.listeners.contains(&listener_id));
}
_ => {}
}
}

fn poll(
&mut self,
_: &mut Context<'_>,
_: &mut impl PollParameters,
) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
if let Some(event) = self.events.pop_front() {
return Poll::Ready(event);
}

Poll::Pending
}
}

0 comments on commit c2230f9

Please sign in to comment.