Skip to content

Commit

Permalink
Allow libp2p to determine listening addresses (#4700)
Browse files Browse the repository at this point in the history
## Issue Addressed

#4675 

## Proposed Changes

 - Update local ENR (**only port numbers**) with local addresses received from libp2p (via `SwarmEvent::NewListenAddr`)
 - Only use the zero port for CLI tests

## Additional Info

### See Also ###

 - #4705 
 - #4402 
 - #4745
  • Loading branch information
jmcph4 committed Sep 29, 2023
1 parent 441fc16 commit 3004217
Show file tree
Hide file tree
Showing 3 changed files with 158 additions and 57 deletions.
13 changes: 9 additions & 4 deletions beacon_node/lighthouse_network/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ use std::sync::Arc;
use std::time::Duration;
use types::{ForkContext, ForkName};

pub const DEFAULT_IPV4_ADDRESS: Ipv4Addr = Ipv4Addr::UNSPECIFIED;
pub const DEFAULT_TCP_PORT: u16 = 9000u16;
pub const DEFAULT_DISC_PORT: u16 = 9000u16;
pub const DEFAULT_QUIC_PORT: u16 = 9001u16;

/// The cache time is set to accommodate the circulation time of an attestation.
///
/// The p2p spec declares that we accept attestations within the following range:
Expand Down Expand Up @@ -304,10 +309,10 @@ impl Default for Config {
.expect("The total rate limit has been specified"),
);
let listen_addresses = ListenAddress::V4(ListenAddr {
addr: Ipv4Addr::UNSPECIFIED,
disc_port: 9000,
quic_port: 9001,
tcp_port: 9000,
addr: DEFAULT_IPV4_ADDRESS,
disc_port: DEFAULT_DISC_PORT,
quic_port: DEFAULT_QUIC_PORT,
tcp_port: DEFAULT_TCP_PORT,
});

let discv5_listen_config =
Expand Down
109 changes: 94 additions & 15 deletions beacon_node/lighthouse_network/src/discovery/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ pub use libp2p::identity::{Keypair, PublicKey};
use enr::{ATTESTATION_BITFIELD_ENR_KEY, ETH2_ENR_KEY, SYNC_COMMITTEE_BITFIELD_ENR_KEY};
use futures::prelude::*;
use futures::stream::FuturesUnordered;
use libp2p::multiaddr::Protocol;
use libp2p::swarm::behaviour::{DialFailure, FromSwarm};
use libp2p::swarm::THandlerInEvent;
pub use libp2p::{
core::{ConnectedPoint, Multiaddr},
core::{transport::ListenerId, ConnectedPoint, Multiaddr},
identity::PeerId,
swarm::{
dummy::ConnectionHandler, ConnectionId, DialError, NetworkBehaviour, NotifyHandler,
Expand Down Expand Up @@ -77,6 +78,19 @@ pub struct DiscoveredPeers {
pub peers: HashMap<Enr, Option<Instant>>,
}

/// Specifies which port numbers should be modified after start of the discovery service
#[derive(Debug)]
pub struct UpdatePorts {
/// TCP port associated wih IPv4 address (if present)
pub tcp4: bool,
/// TCP port associated wih IPv6 address (if present)
pub tcp6: bool,
/// QUIC port associated wih IPv4 address (if present)
pub quic4: bool,
/// QUIC port associated wih IPv6 address (if present)
pub quic6: bool,
}

#[derive(Clone, PartialEq)]
struct SubnetQuery {
subnet: Subnet,
Expand Down Expand Up @@ -177,12 +191,8 @@ pub struct Discovery<TSpec: EthSpec> {
/// always false.
pub started: bool,

/// This keeps track of whether an external UDP port change should also indicate an internal
/// TCP port change. As we cannot detect our external TCP port, we assume that the external UDP
/// port is also our external TCP port. This assumption only holds if the user has not
/// explicitly set their ENR TCP port via the CLI config. The first indicates tcp4 and the
/// second indicates tcp6.
update_tcp_port: (bool, bool),
/// Specifies whether various port numbers should be updated after the discovery service has been started
update_ports: UpdatePorts,

/// Logger for the discovery behaviour.
log: slog::Logger,
Expand Down Expand Up @@ -300,10 +310,12 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
}
}

let update_tcp_port = (
config.enr_tcp4_port.is_none(),
config.enr_tcp6_port.is_none(),
);
let update_ports = UpdatePorts {
tcp4: config.enr_tcp4_port.is_none(),
tcp6: config.enr_tcp6_port.is_none(),
quic4: config.enr_quic4_port.is_none(),
quic6: config.enr_quic6_port.is_none(),
};

Ok(Self {
cached_enrs: LruCache::new(50),
Expand All @@ -314,7 +326,7 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
discv5,
event_stream,
started: !config.disable_discovery,
update_tcp_port,
update_ports,
log,
enr_dir,
})
Expand Down Expand Up @@ -1006,8 +1018,8 @@ impl<TSpec: EthSpec> NetworkBehaviour for Discovery<TSpec> {
// Discv5 will have updated our local ENR. We save the updated version
// to disk.

if (self.update_tcp_port.0 && socket_addr.is_ipv4())
|| (self.update_tcp_port.1 && socket_addr.is_ipv6())
if (self.update_ports.tcp4 && socket_addr.is_ipv4())
|| (self.update_ports.tcp6 && socket_addr.is_ipv6())
{
// Update the TCP port in the ENR
self.discv5.update_local_enr_socket(socket_addr, true);
Expand Down Expand Up @@ -1036,12 +1048,79 @@ impl<TSpec: EthSpec> NetworkBehaviour for Discovery<TSpec> {
FromSwarm::DialFailure(DialFailure { peer_id, error, .. }) => {
self.on_dial_failure(peer_id, error)
}
FromSwarm::NewListenAddr(ev) => {
let addr = ev.addr;
let listener_id = ev.listener_id;

trace!(self.log, "Received NewListenAddr event from swarm"; "listener_id" => ?listener_id, "addr" => ?addr);

let mut addr_iter = addr.iter();

let attempt_enr_update = match addr_iter.next() {
Some(Protocol::Ip4(_)) => match (addr_iter.next(), addr_iter.next()) {
(Some(Protocol::Tcp(port)), None) => {
if !self.update_ports.tcp4 {
debug!(self.log, "Skipping ENR update"; "multiaddr" => ?addr);
return;
}

self.update_enr_tcp_port(port)
}
(Some(Protocol::Udp(port)), Some(Protocol::QuicV1)) => {
if !self.update_ports.quic4 {
debug!(self.log, "Skipping ENR update"; "multiaddr" => ?addr);
return;
}

self.update_enr_quic_port(port)
}
_ => {
debug!(self.log, "Encountered unacceptable multiaddr for listening (unsupported transport)"; "addr" => ?addr);
return;
}
},
Some(Protocol::Ip6(_)) => match (addr_iter.next(), addr_iter.next()) {
(Some(Protocol::Tcp(port)), None) => {
if !self.update_ports.tcp6 {
debug!(self.log, "Skipping ENR update"; "multiaddr" => ?addr);
return;
}

self.update_enr_tcp_port(port)
}
(Some(Protocol::Udp(port)), Some(Protocol::QuicV1)) => {
if !self.update_ports.quic6 {
debug!(self.log, "Skipping ENR update"; "multiaddr" => ?addr);
return;
}

self.update_enr_quic_port(port)
}
_ => {
debug!(self.log, "Encountered unacceptable multiaddr for listening (unsupported transport)"; "addr" => ?addr);
return;
}
},
_ => {
debug!(self.log, "Encountered unacceptable multiaddr for listening (no IP)"; "addr" => ?addr);
return;
}
};

let local_enr: Enr = self.discv5.local_enr();

match attempt_enr_update {
Ok(_) => {
info!(self.log, "Updated local ENR"; "enr" => local_enr.to_base64(), "seq" => local_enr.seq(), "id"=> %local_enr.node_id(), "ip4" => ?local_enr.ip4(), "udp4"=> ?local_enr.udp4(), "tcp4" => ?local_enr.tcp4(), "tcp6" => ?local_enr.tcp6(), "udp6" => ?local_enr.udp6())
}
Err(e) => warn!(self.log, "Failed to update ENR"; "error" => ?e),
}
}
FromSwarm::ConnectionEstablished(_)
| FromSwarm::ConnectionClosed(_)
| FromSwarm::AddressChange(_)
| FromSwarm::ListenFailure(_)
| FromSwarm::NewListener(_)
| FromSwarm::NewListenAddr(_)
| FromSwarm::ExpiredListenAddr(_)
| FromSwarm::ListenerError(_)
| FromSwarm::ListenerClosed(_)
Expand Down
Loading

0 comments on commit 3004217

Please sign in to comment.