Skip to content

Commit

Permalink
collect bandwidth metrics per transport
Browse files Browse the repository at this point in the history
  • Loading branch information
jxs committed Oct 4, 2023
1 parent 7d53721 commit bce7071
Show file tree
Hide file tree
Showing 5 changed files with 124 additions and 22 deletions.
61 changes: 61 additions & 0 deletions beacon_node/lighthouse_network/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
use libp2p::bandwidth::BandwidthSinks;
use std::sync::Arc;

pub use lighthouse_metrics::*;

lazy_static! {
Expand Down Expand Up @@ -184,3 +187,61 @@ pub fn scrape_discovery_metrics() {
set_gauge(&DISCOVERY_SENT_BYTES, metrics.bytes_sent as i64);
set_gauge(&DISCOVERY_RECV_BYTES, metrics.bytes_recv as i64);
}

/// Agregated `BandwidthSinks` of tcp and quic transports
/// used in libp2p.
pub struct AgregatedBandwithSinks {
tcp_sinks: Arc<BandwidthSinks>,
quic_sinks: Option<Arc<BandwidthSinks>>,
}

impl AgregatedBandwithSinks {
/// Create a new `AgregatedBandwithSinks`.
pub fn new(tcp_sinks: Arc<BandwidthSinks>, quic_sinks: Option<Arc<BandwidthSinks>>) -> Self {
AgregatedBandwithSinks {
tcp_sinks,
quic_sinks,
}
}

/// Total quic inbound bandwith.
pub fn total_quic_inbound(&self) -> u64 {
self.quic_sinks
.as_ref()
.map(|q| q.total_inbound())
.unwrap_or_default()
}

/// Total tcp inbound bandwith.
pub fn total_tcp_inbound(&self) -> u64 {
self.tcp_sinks.total_inbound()
}

/// Total quic outbound bandwith.
pub fn total_quic_outbound(&self) -> u64 {
self.quic_sinks
.as_ref()
.map(|q| q.total_outbound())
.unwrap_or_default()
}

/// Total tcp outbound bandwith.
pub fn total_tcp_outbound(&self) -> u64 {
self.tcp_sinks.total_outbound()
}

/// Total agregated inbound bandwith.
pub fn total_inbound(&self) -> u64 {
self.total_tcp_inbound() + self.total_quic_inbound()
}

/// Total agregated outbound bandwith.
pub fn total_outbound(&self) -> u64 {
self.total_tcp_outbound() + self.total_quic_outbound()
}

/// Total agregated inbound and outbound bandwidth.
pub fn total(&self) -> u64 {
self.total_inbound() + self.total_outbound()
}
}
4 changes: 2 additions & 2 deletions beacon_node/lighthouse_network/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::config::{gossipsub_config, GossipsubConfigParams, NetworkLoad};
use crate::discovery::{
subnet_predicate, DiscoveredPeers, Discovery, FIND_NODE_QUERY_CLOSEST_PEERS,
};
use crate::metrics::AgregatedBandwithSinks;
use crate::peer_manager::{
config::Config as PeerManagerCfg, peerdb::score::PeerAction, peerdb::score::ReportSource,
ConnectionDirection, PeerManager, PeerManagerEvent,
Expand All @@ -23,7 +24,6 @@ use crate::{error, metrics, Enr, NetworkGlobals, PubsubMessage, TopicHash};
use api_types::{PeerRequestId, Request, RequestId, Response};
use futures::stream::StreamExt;
use gossipsub_scoring_parameters::{lighthouse_gossip_thresholds, PeerScoreSettings};
use libp2p::bandwidth::BandwidthSinks;
use libp2p::gossipsub::{
self, IdentTopic as Topic, MessageAcceptance, MessageAuthenticity, MessageId, PublishError,
TopicScoreParams,
Expand Down Expand Up @@ -124,7 +124,7 @@ pub struct Network<AppReqId: ReqId, TSpec: EthSpec> {
update_gossipsub_scores: tokio::time::Interval,
gossip_cache: GossipCache,
/// The bandwidth logger for the underlying libp2p transport.
pub bandwidth: Arc<BandwidthSinks>,
pub bandwidth: AgregatedBandwithSinks,
/// This node's PeerId.
pub local_peer_id: PeerId,
/// Logger for behaviour actions.
Expand Down
37 changes: 23 additions & 14 deletions beacon_node/lighthouse_network/src/service/utils.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use crate::metrics::AgregatedBandwithSinks;
use crate::multiaddr::Protocol;
use crate::rpc::{MetaData, MetaDataV1, MetaDataV2};
use crate::types::{
error, EnrAttestationBitfield, EnrSyncCommitteeBitfield, GossipEncoding, GossipKind,
};
use crate::{GossipTopic, NetworkConfig};
use futures::future::Either;
use libp2p::bandwidth::BandwidthSinks;
use libp2p::core::{multiaddr::Multiaddr, muxing::StreamMuxerBox, transport::Boxed};
use libp2p::gossipsub;
use libp2p::identity::{secp256k1, Keypair};
Expand Down Expand Up @@ -44,7 +44,7 @@ type BoxedTransport = Boxed<(PeerId, StreamMuxerBox)>;
pub fn build_transport(
local_private_key: Keypair,
quic_support: bool,
) -> std::io::Result<(BoxedTransport, Arc<BandwidthSinks>)> {
) -> std::io::Result<(BoxedTransport, AgregatedBandwithSinks)> {
// mplex config
let mut mplex_config = libp2p_mplex::MplexConfig::new();
mplex_config.set_max_buffer_size(256);
Expand All @@ -55,30 +55,39 @@ pub fn build_transport(
yamux_config.set_window_update_mode(yamux::WindowUpdateMode::on_read());

// Creates the TCP transport layer
let tcp = libp2p::tcp::tokio::Transport::new(libp2p::tcp::Config::default().nodelay(true))
.upgrade(core::upgrade::Version::V1)
.authenticate(generate_noise_config(&local_private_key))
.multiplex(core::upgrade::SelectUpgrade::new(
yamux_config,
mplex_config,
))
.timeout(Duration::from_secs(10));
let (tcp, tcp_bandwidth) =
libp2p::tcp::tokio::Transport::new(libp2p::tcp::Config::default().nodelay(true))
.upgrade(core::upgrade::Version::V1)
.authenticate(generate_noise_config(&local_private_key))
.multiplex(core::upgrade::SelectUpgrade::new(
yamux_config,
mplex_config,
))
.timeout(Duration::from_secs(10))
.with_bandwidth_logging();

let (transport, bandwidth) = if quic_support {
// Enables Quic
// The default quic configuration suits us for now.
let quic_config = libp2p_quic::Config::new(&local_private_key);
tcp.or_transport(libp2p_quic::tokio::Transport::new(quic_config))
let (quic, quic_bandwidth) =
libp2p_quic::tokio::Transport::new(quic_config).with_bandwidth_logging();
let transport = tcp
.or_transport(quic)
.map(|either_output, _| match either_output {
Either::Left((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)),
Either::Right((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)),
})
.with_bandwidth_logging()
.boxed();
(
transport,
AgregatedBandwithSinks::new(tcp_bandwidth, Some(quic_bandwidth)),
)
} else {
tcp.with_bandwidth_logging()
(tcp, AgregatedBandwithSinks::new(tcp_bandwidth, None))
};

// // Enables DNS over the transport.
// Enables DNS over the transport.
let transport = libp2p::dns::TokioDnsConfig::system(transport)?.boxed();

Ok((transport, bandwidth))
Expand Down
42 changes: 37 additions & 5 deletions beacon_node/network/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use beacon_chain::{
use fnv::FnvHashMap;
pub use lighthouse_metrics::*;
use lighthouse_network::{
peer_manager::peerdb::client::ClientKind, types::GossipKind, BandwidthSinks, GossipTopic,
Gossipsub, NetworkGlobals,
metrics::AgregatedBandwithSinks, peer_manager::peerdb::client::ClientKind, types::GossipKind,
GossipTopic, Gossipsub, NetworkGlobals,
};
use std::sync::Arc;
use strum::IntoEnumIterator;
Expand Down Expand Up @@ -218,13 +218,30 @@ lazy_static! {
/*
* Bandwidth metrics
*/
pub static ref INBOUND_LIBP2P_QUIC_BYTES: Result<IntGauge> =
try_create_int_gauge("libp2p_inbound_quic_bytes", "The inbound quic bandwidth over libp2p");

pub static ref INBOUND_LIBP2P_TCP_BYTES: Result<IntGauge> =
try_create_int_gauge("libp2p_inbound_tcp_bytes", "The inbound tcp bandwidth over libp2p");

pub static ref INBOUND_LIBP2P_BYTES: Result<IntGauge> =
try_create_int_gauge("libp2p_inbound_bytes", "The inbound bandwidth over libp2p");

pub static ref OUTBOUND_LIBP2P_BYTES: Result<IntGauge> = try_create_int_gauge(
"libp2p_outbound_bytes",
"The outbound bandwidth over libp2p"
);

pub static ref OUTBOUND_LIBP2P_QUIC_BYTES: Result<IntGauge> = try_create_int_gauge(
"libp2p_quic_outbound_bytes",
"The outbound quic bandwidth over libp2p"
);

pub static ref OUTBOUND_LIBP2P_TCP_BYTES: Result<IntGauge> = try_create_int_gauge(
"libp2p_tcp_outbound_bytes",
"The outbound tcp bandwidth over libp2p"
);

pub static ref TOTAL_LIBP2P_BANDWIDTH: Result<IntGauge> = try_create_int_gauge(
"libp2p_total_bandwidth",
"The total inbound/outbound bandwidth over libp2p"
Expand Down Expand Up @@ -291,13 +308,28 @@ lazy_static! {
);
}

pub fn update_bandwidth_metrics(bandwidth: Arc<BandwidthSinks>) {
pub fn update_bandwidth_metrics(bandwidth: &AgregatedBandwithSinks) {
set_gauge(&INBOUND_LIBP2P_BYTES, bandwidth.total_inbound() as i64);
set_gauge(
&INBOUND_LIBP2P_TCP_BYTES,
bandwidth.total_tcp_inbound() as i64,
);
set_gauge(
&INBOUND_LIBP2P_QUIC_BYTES,
bandwidth.total_quic_inbound() as i64,
);
set_gauge(&OUTBOUND_LIBP2P_BYTES, bandwidth.total_outbound() as i64);
set_gauge(
&TOTAL_LIBP2P_BANDWIDTH,
(bandwidth.total_inbound() + bandwidth.total_outbound()) as i64,
&OUTBOUND_LIBP2P_TCP_BYTES,
bandwidth.total_tcp_outbound() as i64,
);
set_gauge(
&OUTBOUND_LIBP2P_QUIC_BYTES,
bandwidth.total_quic_outbound() as i64,
);
set_gauge(&TOTAL_LIBP2P_BANDWIDTH, bandwidth.total() as i64);
set_gauge(&INBOUND_LIBP2P_BYTES, bandwidth.total_inbound() as i64);
set_gauge(&OUTBOUND_LIBP2P_BYTES, bandwidth.total_outbound() as i64);
}

pub fn register_finality_update_error(error: &LightClientFinalityUpdateError) {
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
}
}
}
metrics::update_bandwidth_metrics(self.libp2p.bandwidth.clone());
metrics::update_bandwidth_metrics(&self.libp2p.bandwidth);
}
};
executor.spawn(service_fut, "network");
Expand Down

0 comments on commit bce7071

Please sign in to comment.