Skip to content

Commit

Permalink
Update libp2p (#3039)
Browse files Browse the repository at this point in the history
Update libp2p. 

This corrects some gossipsub metrics.
  • Loading branch information
AgeManning committed Mar 2, 2022
1 parent f3c1dde commit e88b18b
Show file tree
Hide file tree
Showing 11 changed files with 330 additions and 224 deletions.
401 changes: 255 additions & 146 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 1 addition & 3 deletions beacon_node/lighthouse_network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,7 @@ prometheus-client = "0.15.0"
unused_port = { path = "../../common/unused_port" }

[dependencies.libp2p]
git = "https://github.com/sigp/rust-libp2p"
# branch libp2p-gossipsub-interval-hotfix
rev = "e213703e616eaba3c482d7714775e0d37c4ae8e5"
version = "0.43.0"
default-features = false
features = ["websocket", "identify", "mplex", "yamux", "noise", "gossipsub", "dns-tokio", "tcp-tokio", "plaintext", "secp256k1"]

Expand Down
2 changes: 1 addition & 1 deletion beacon_node/lighthouse_network/src/behaviour/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1147,7 +1147,7 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
cx: &mut Context,
_: &mut impl PollParameters,
) -> Poll<
NBAction<BehaviourEvent<TSpec>, <Behaviour<TSpec> as NetworkBehaviour>::ProtocolsHandler>,
NBAction<BehaviourEvent<TSpec>, <Behaviour<TSpec> as NetworkBehaviour>::ConnectionHandler>,
> {
if let Some(waker) = &self.waker {
if waker.will_wake(cx.waker()) {
Expand Down
14 changes: 7 additions & 7 deletions beacon_node/lighthouse_network/src/discovery/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use futures::stream::FuturesUnordered;
pub use libp2p::{
core::{connection::ConnectionId, ConnectedPoint, Multiaddr, PeerId},
swarm::{
protocols_handler::ProtocolsHandler, DialError, NetworkBehaviour,
handler::ConnectionHandler, DialError, NetworkBehaviour,
NetworkBehaviourAction as NBAction, NotifyHandler, PollParameters, SubstreamProtocol,
},
};
Expand Down Expand Up @@ -908,11 +908,11 @@ impl<TSpec: EthSpec> Discovery<TSpec> {

impl<TSpec: EthSpec> NetworkBehaviour for Discovery<TSpec> {
// Discovery is not a real NetworkBehaviour...
type ProtocolsHandler = libp2p::swarm::protocols_handler::DummyProtocolsHandler;
type ConnectionHandler = libp2p::swarm::handler::DummyConnectionHandler;
type OutEvent = DiscoveryEvent;

fn new_handler(&mut self) -> Self::ProtocolsHandler {
libp2p::swarm::protocols_handler::DummyProtocolsHandler::default()
fn new_handler(&mut self) -> Self::ConnectionHandler {
libp2p::swarm::handler::DummyConnectionHandler::default()
}

// Handles the libp2p request to obtain multiaddrs for peer_id's in order to dial them.
Expand All @@ -932,14 +932,14 @@ impl<TSpec: EthSpec> NetworkBehaviour for Discovery<TSpec> {
&mut self,
_: PeerId,
_: ConnectionId,
_: <Self::ProtocolsHandler as ProtocolsHandler>::OutEvent,
_: <Self::ConnectionHandler as ConnectionHandler>::OutEvent,
) {
}

fn inject_dial_failure(
&mut self,
peer_id: Option<PeerId>,
_handler: Self::ProtocolsHandler,
_handler: Self::ConnectionHandler,
error: &DialError,
) {
if let Some(peer_id) = peer_id {
Expand Down Expand Up @@ -967,7 +967,7 @@ impl<TSpec: EthSpec> NetworkBehaviour for Discovery<TSpec> {
&mut self,
cx: &mut Context,
_: &mut impl PollParameters,
) -> Poll<NBAction<Self::OutEvent, Self::ProtocolsHandler>> {
) -> Poll<NBAction<Self::OutEvent, Self::ConnectionHandler>> {
if !self.started {
return Poll::Pending;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ use std::task::{Context, Poll};
use futures::StreamExt;
use libp2p::core::connection::ConnectionId;
use libp2p::core::ConnectedPoint;
use libp2p::swarm::protocols_handler::DummyProtocolsHandler;
use libp2p::swarm::handler::DummyConnectionHandler;
use libp2p::swarm::{
DialError, NetworkBehaviour, NetworkBehaviourAction, PollParameters, ProtocolsHandler,
ConnectionHandler, DialError, NetworkBehaviour, NetworkBehaviourAction, PollParameters,
};
use libp2p::{Multiaddr, PeerId};
use slog::{debug, error};
Expand All @@ -19,21 +19,21 @@ use super::peerdb::BanResult;
use super::{PeerManager, PeerManagerEvent, ReportSource};

impl<TSpec: EthSpec> NetworkBehaviour for PeerManager<TSpec> {
type ProtocolsHandler = DummyProtocolsHandler;
type ConnectionHandler = DummyConnectionHandler;

type OutEvent = PeerManagerEvent;

/* Required trait members */

fn new_handler(&mut self) -> Self::ProtocolsHandler {
DummyProtocolsHandler::default()
fn new_handler(&mut self) -> Self::ConnectionHandler {
DummyConnectionHandler::default()
}

fn inject_event(
&mut self,
_: PeerId,
_: ConnectionId,
_: <DummyProtocolsHandler as ProtocolsHandler>::OutEvent,
_: <DummyConnectionHandler as ConnectionHandler>::OutEvent,
) {
unreachable!("Dummy handler does not emit events")
}
Expand All @@ -42,7 +42,7 @@ impl<TSpec: EthSpec> NetworkBehaviour for PeerManager<TSpec> {
&mut self,
cx: &mut Context<'_>,
_params: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ProtocolsHandler>> {
) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ConnectionHandler>> {
// perform the heartbeat when necessary
while self.heartbeat.poll_tick(cx).is_ready() {
self.heartbeat();
Expand Down Expand Up @@ -178,7 +178,7 @@ impl<TSpec: EthSpec> NetworkBehaviour for PeerManager<TSpec> {
peer_id: &PeerId,
_: &ConnectionId,
_: &ConnectedPoint,
_: DummyProtocolsHandler,
_: DummyConnectionHandler,
remaining_established: usize,
) {
if remaining_established > 0 {
Expand Down Expand Up @@ -243,7 +243,7 @@ impl<TSpec: EthSpec> NetworkBehaviour for PeerManager<TSpec> {
fn inject_dial_failure(
&mut self,
peer_id: Option<PeerId>,
_handler: DummyProtocolsHandler,
_handler: DummyConnectionHandler,
_error: &DialError,
) {
if let Some(peer_id) = peer_id {
Expand Down
50 changes: 26 additions & 24 deletions beacon_node/lighthouse_network/src/rpc/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ use futures::{Sink, SinkExt};
use libp2p::core::upgrade::{
InboundUpgrade, NegotiationError, OutboundUpgrade, ProtocolError, UpgradeError,
};
use libp2p::swarm::protocols_handler::{
KeepAlive, ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr, SubstreamProtocol,
use libp2p::swarm::handler::{
ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive,
SubstreamProtocol,
};
use libp2p::swarm::NegotiatedSubstream;
use slog::{crit, debug, trace, warn};
Expand Down Expand Up @@ -76,7 +77,7 @@ pub enum HandlerErr {
},
}

/// Implementation of `ProtocolsHandler` for the RPC protocol.
/// Implementation of `ConnectionHandler` for the RPC protocol.
pub struct RPCHandler<TSpec>
where
TSpec: EthSpec,
Expand Down Expand Up @@ -309,7 +310,7 @@ where
}
}

impl<TSpec> ProtocolsHandler for RPCHandler<TSpec>
impl<TSpec> ConnectionHandler for RPCHandler<TSpec>
where
TSpec: EthSpec,
{
Expand Down Expand Up @@ -442,12 +443,13 @@ where
fn inject_dial_upgrade_error(
&mut self,
request_info: Self::OutboundOpenInfo,
error: ProtocolsHandlerUpgrErr<
error: ConnectionHandlerUpgrErr<
<Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Error,
>,
) {
let (id, req) = request_info;
if let ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(RPCError::IoError(_))) = error {
if let ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(RPCError::IoError(_))) = error
{
self.outbound_io_error_retries += 1;
if self.outbound_io_error_retries < IO_ERROR_RETRIES {
self.send_request(id, req);
Expand All @@ -461,13 +463,13 @@ where
self.outbound_io_error_retries = 0;
// map the error
let error = match error {
ProtocolsHandlerUpgrErr::Timer => RPCError::InternalError("Timer failed"),
ProtocolsHandlerUpgrErr::Timeout => RPCError::NegotiationTimeout,
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)) => e,
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)) => {
ConnectionHandlerUpgrErr::Timer => RPCError::InternalError("Timer failed"),
ConnectionHandlerUpgrErr::Timeout => RPCError::NegotiationTimeout,
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)) => e,
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)) => {
RPCError::UnsupportedProtocol
}
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(
NegotiationError::ProtocolError(e),
)) => match e {
ProtocolError::IoError(io_err) => RPCError::IoError(io_err.to_string()),
Expand Down Expand Up @@ -517,7 +519,7 @@ where
&mut self,
cx: &mut Context<'_>,
) -> Poll<
ProtocolsHandlerEvent<
ConnectionHandlerEvent<
Self::OutboundProtocol,
Self::OutboundOpenInfo,
Self::OutEvent,
Expand All @@ -533,7 +535,7 @@ where
}
// return any events that need to be reported
if !self.events_out.is_empty() {
return Poll::Ready(ProtocolsHandlerEvent::Custom(self.events_out.remove(0)));
return Poll::Ready(ConnectionHandlerEvent::Custom(self.events_out.remove(0)));
} else {
self.events_out.shrink_to_fit();
}
Expand All @@ -543,7 +545,7 @@ where
if delay.is_elapsed() {
self.state = HandlerState::Deactivated;
debug!(self.log, "Handler deactivated");
return Poll::Ready(ProtocolsHandlerEvent::Close(RPCError::InternalError(
return Poll::Ready(ConnectionHandlerEvent::Close(RPCError::InternalError(
"Shutdown timeout",
)));
}
Expand Down Expand Up @@ -575,7 +577,7 @@ where
Poll::Ready(Some(Err(e))) => {
warn!(self.log, "Inbound substream poll failed"; "error" => ?e);
// drops the peer if we cannot read the delay queue
return Poll::Ready(ProtocolsHandlerEvent::Close(RPCError::InternalError(
return Poll::Ready(ConnectionHandlerEvent::Close(RPCError::InternalError(
"Could not poll inbound stream timer",
)));
}
Expand All @@ -596,14 +598,14 @@ where
error: RPCError::StreamTimeout,
};
// notify the user
return Poll::Ready(ProtocolsHandlerEvent::Custom(Err(outbound_err)));
return Poll::Ready(ConnectionHandlerEvent::Custom(Err(outbound_err)));
} else {
crit!(self.log, "timed out substream not in the books"; "stream_id" => outbound_id.get_ref());
}
}
Poll::Ready(Some(Err(e))) => {
warn!(self.log, "Outbound substream poll failed"; "error" => ?e);
return Poll::Ready(ProtocolsHandlerEvent::Close(RPCError::InternalError(
return Poll::Ready(ConnectionHandlerEvent::Close(RPCError::InternalError(
"Could not poll outbound stream timer",
)));
}
Expand Down Expand Up @@ -856,7 +858,7 @@ where
}),
};

return Poll::Ready(ProtocolsHandlerEvent::Custom(received));
return Poll::Ready(ConnectionHandlerEvent::Custom(received));
}
Poll::Ready(None) => {
// stream closed
Expand All @@ -871,7 +873,7 @@ where
// notify the application error
if request.expected_responses() > 1 {
// return an end of stream result
return Poll::Ready(ProtocolsHandlerEvent::Custom(Ok(
return Poll::Ready(ConnectionHandlerEvent::Custom(Ok(
RPCReceived::EndOfStream(request_id, request.stream_termination()),
)));
}
Expand All @@ -882,7 +884,7 @@ where
proto: request.protocol(),
error: RPCError::IncompleteStream,
};
return Poll::Ready(ProtocolsHandlerEvent::Custom(Err(outbound_err)));
return Poll::Ready(ConnectionHandlerEvent::Custom(Err(outbound_err)));
}
Poll::Pending => {
entry.get_mut().state =
Expand All @@ -898,7 +900,7 @@ where
error: e,
};
entry.remove_entry();
return Poll::Ready(ProtocolsHandlerEvent::Custom(Err(outbound_err)));
return Poll::Ready(ConnectionHandlerEvent::Custom(Err(outbound_err)));
}
},
OutboundSubstreamState::Closing(mut substream) => {
Expand All @@ -924,7 +926,7 @@ where
};

if let Some(termination) = termination {
return Poll::Ready(ProtocolsHandlerEvent::Custom(Ok(
return Poll::Ready(ConnectionHandlerEvent::Custom(Ok(
RPCReceived::EndOfStream(request_id, termination),
)));
}
Expand All @@ -946,7 +948,7 @@ where
self.dial_negotiated += 1;
let (id, req) = self.dial_queue.remove(0);
self.dial_queue.shrink_to_fit();
return Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest {
return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(
OutboundRequestContainer {
req: req.clone(),
Expand All @@ -967,7 +969,7 @@ where
&& self.events_out.is_empty()
&& self.dial_negotiated == 0
{
return Poll::Ready(ProtocolsHandlerEvent::Close(RPCError::Disconnected));
return Poll::Ready(ConnectionHandlerEvent::Close(RPCError::Disconnected));
}
}

Expand Down
12 changes: 6 additions & 6 deletions beacon_node/lighthouse_network/src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use futures::future::FutureExt;
use handler::RPCHandler;
use libp2p::core::{connection::ConnectionId, ConnectedPoint};
use libp2p::swarm::{
protocols_handler::ProtocolsHandler, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler,
handler::ConnectionHandler, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler,
PollParameters, SubstreamProtocol,
};
use libp2p::{Multiaddr, PeerId};
Expand Down Expand Up @@ -92,7 +92,7 @@ pub struct RPCMessage<TSpec: EthSpec> {
/// Handler managing this message.
pub conn_id: ConnectionId,
/// The message that was sent.
pub event: <RPCHandler<TSpec> as ProtocolsHandler>::OutEvent,
pub event: <RPCHandler<TSpec> as ConnectionHandler>::OutEvent,
}

/// Implements the libp2p `NetworkBehaviour` trait and therefore manages network-level
Expand Down Expand Up @@ -178,10 +178,10 @@ impl<TSpec> NetworkBehaviour for RPC<TSpec>
where
TSpec: EthSpec,
{
type ProtocolsHandler = RPCHandler<TSpec>;
type ConnectionHandler = RPCHandler<TSpec>;
type OutEvent = RPCMessage<TSpec>;

fn new_handler(&mut self) -> Self::ProtocolsHandler {
fn new_handler(&mut self) -> Self::ConnectionHandler {
RPCHandler::new(
SubstreamProtocol::new(
RPCProtocol {
Expand Down Expand Up @@ -227,7 +227,7 @@ where
&mut self,
peer_id: PeerId,
conn_id: ConnectionId,
event: <Self::ProtocolsHandler as ProtocolsHandler>::OutEvent,
event: <Self::ConnectionHandler as ConnectionHandler>::OutEvent,
) {
if let Ok(RPCReceived::Request(ref id, ref req)) = event {
// check if the request is conformant to the quota
Expand Down Expand Up @@ -289,7 +289,7 @@ where
&mut self,
cx: &mut Context,
_: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ProtocolsHandler>> {
) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ConnectionHandler>> {
// let the rate limiter prune
let _ = self.limiter.poll_unpin(cx);
if !self.events.is_empty() {
Expand Down
5 changes: 2 additions & 3 deletions beacon_node/lighthouse_network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,12 @@ use crate::EnrExt;
use crate::{NetworkConfig, NetworkGlobals, PeerAction, ReportSource};
use futures::prelude::*;
use libp2p::core::{
connection::ConnectionLimits, identity::Keypair, multiaddr::Multiaddr, muxing::StreamMuxerBox,
transport::Boxed,
identity::Keypair, multiaddr::Multiaddr, muxing::StreamMuxerBox, transport::Boxed,
};
use libp2p::{
bandwidth::{BandwidthLogging, BandwidthSinks},
core, noise,
swarm::{SwarmBuilder, SwarmEvent},
swarm::{ConnectionLimits, SwarmBuilder, SwarmEvent},
PeerId, Swarm, Transport,
};
use prometheus_client::registry::Registry;
Expand Down
Loading

0 comments on commit e88b18b

Please sign in to comment.