Skip to content

Commit

Permalink
collect bandwidth metrics per transport (#4805)
Browse files Browse the repository at this point in the history
## Issue Addressed

Following the conversation on libp2p/rust-libp2p#3666 the changes introduced in this PR will allow us to give more insights if the bandwidth limitations happen at the transport level, namely if quic helps vs yamux and it's [window size limitation](libp2p/rust-yamux#162) or if the bottleneck is at the gossipsub level.
## Proposed Changes

introduce new quic and tcp bandwidth metric gauges.

cc @mxinden (turned out to be easier, Thomas gave me a hint)
  • Loading branch information
jxs committed Oct 19, 2023
1 parent 98cac2b commit f063917
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 38 deletions.
46 changes: 46 additions & 0 deletions beacon_node/lighthouse_network/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
use libp2p::bandwidth::BandwidthSinks;
use std::sync::Arc;

pub use lighthouse_metrics::*;

lazy_static! {
Expand Down Expand Up @@ -184,3 +187,46 @@ pub fn scrape_discovery_metrics() {
set_gauge(&DISCOVERY_SENT_BYTES, metrics.bytes_sent as i64);
set_gauge(&DISCOVERY_RECV_BYTES, metrics.bytes_recv as i64);
}

/// Aggregated `BandwidthSinks` of tcp and quic transports
/// used in libp2p.
pub struct AggregatedBandwidthSinks {
tcp_sinks: Arc<BandwidthSinks>,
quic_sinks: Option<Arc<BandwidthSinks>>,
}

impl AggregatedBandwidthSinks {
/// Create a new `AggregatedBandwidthSinks`.
pub fn new(tcp_sinks: Arc<BandwidthSinks>, quic_sinks: Option<Arc<BandwidthSinks>>) -> Self {
AggregatedBandwidthSinks {
tcp_sinks,
quic_sinks,
}
}

/// Total QUIC inbound bandwidth.
pub fn total_quic_inbound(&self) -> u64 {
self.quic_sinks
.as_ref()
.map(|q| q.total_inbound())
.unwrap_or_default()
}

/// Total TCP inbound bandwidth.
pub fn total_tcp_inbound(&self) -> u64 {
self.tcp_sinks.total_inbound()
}

/// Total QUIC outbound bandwidth.
pub fn total_quic_outbound(&self) -> u64 {
self.quic_sinks
.as_ref()
.map(|q| q.total_outbound())
.unwrap_or_default()
}

/// Total TCP outbound bandwidth.
pub fn total_tcp_outbound(&self) -> u64 {
self.tcp_sinks.total_outbound()
}
}
4 changes: 2 additions & 2 deletions beacon_node/lighthouse_network/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::config::{gossipsub_config, GossipsubConfigParams, NetworkLoad};
use crate::discovery::{
subnet_predicate, DiscoveredPeers, Discovery, FIND_NODE_QUERY_CLOSEST_PEERS,
};
use crate::metrics::AggregatedBandwidthSinks;
use crate::peer_manager::{
config::Config as PeerManagerCfg, peerdb::score::PeerAction, peerdb::score::ReportSource,
ConnectionDirection, PeerManager, PeerManagerEvent,
Expand All @@ -24,7 +25,6 @@ use crate::{error, metrics, Enr, NetworkGlobals, PubsubMessage, TopicHash};
use api_types::{PeerRequestId, Request, RequestId, Response};
use futures::stream::StreamExt;
use gossipsub_scoring_parameters::{lighthouse_gossip_thresholds, PeerScoreSettings};
use libp2p::bandwidth::BandwidthSinks;
use libp2p::gossipsub::{
self, IdentTopic as Topic, MessageAcceptance, MessageAuthenticity, MessageId, PublishError,
TopicScoreParams,
Expand Down Expand Up @@ -128,7 +128,7 @@ pub struct Network<AppReqId: ReqId, TSpec: EthSpec> {
update_gossipsub_scores: tokio::time::Interval,
gossip_cache: GossipCache,
/// The bandwidth logger for the underlying libp2p transport.
pub bandwidth: Arc<BandwidthSinks>,
pub bandwidth: AggregatedBandwidthSinks,
/// This node's PeerId.
pub local_peer_id: PeerId,
/// Logger for behaviour actions.
Expand Down
37 changes: 23 additions & 14 deletions beacon_node/lighthouse_network/src/service/utils.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use crate::metrics::AggregatedBandwidthSinks;
use crate::multiaddr::Protocol;
use crate::rpc::{MetaData, MetaDataV1, MetaDataV2};
use crate::types::{
error, EnrAttestationBitfield, EnrSyncCommitteeBitfield, GossipEncoding, GossipKind,
};
use crate::{GossipTopic, NetworkConfig};
use futures::future::Either;
use libp2p::bandwidth::BandwidthSinks;
use libp2p::core::{multiaddr::Multiaddr, muxing::StreamMuxerBox, transport::Boxed};
use libp2p::gossipsub;
use libp2p::identity::{secp256k1, Keypair};
Expand Down Expand Up @@ -44,7 +44,7 @@ type BoxedTransport = Boxed<(PeerId, StreamMuxerBox)>;
pub fn build_transport(
local_private_key: Keypair,
quic_support: bool,
) -> std::io::Result<(BoxedTransport, Arc<BandwidthSinks>)> {
) -> std::io::Result<(BoxedTransport, AggregatedBandwidthSinks)> {
// mplex config
let mut mplex_config = libp2p_mplex::MplexConfig::new();
mplex_config.set_max_buffer_size(256);
Expand All @@ -55,30 +55,39 @@ pub fn build_transport(
yamux_config.set_window_update_mode(yamux::WindowUpdateMode::on_read());

// Creates the TCP transport layer
let tcp = libp2p::tcp::tokio::Transport::new(libp2p::tcp::Config::default().nodelay(true))
.upgrade(core::upgrade::Version::V1)
.authenticate(generate_noise_config(&local_private_key))
.multiplex(core::upgrade::SelectUpgrade::new(
yamux_config,
mplex_config,
))
.timeout(Duration::from_secs(10));
let (tcp, tcp_bandwidth) =
libp2p::tcp::tokio::Transport::new(libp2p::tcp::Config::default().nodelay(true))
.upgrade(core::upgrade::Version::V1)
.authenticate(generate_noise_config(&local_private_key))
.multiplex(core::upgrade::SelectUpgrade::new(
yamux_config,
mplex_config,
))
.timeout(Duration::from_secs(10))
.with_bandwidth_logging();

let (transport, bandwidth) = if quic_support {
// Enables Quic
// The default quic configuration suits us for now.
let quic_config = libp2p_quic::Config::new(&local_private_key);
tcp.or_transport(libp2p_quic::tokio::Transport::new(quic_config))
let (quic, quic_bandwidth) =
libp2p_quic::tokio::Transport::new(quic_config).with_bandwidth_logging();
let transport = tcp
.or_transport(quic)
.map(|either_output, _| match either_output {
Either::Left((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)),
Either::Right((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)),
})
.with_bandwidth_logging()
.boxed();
(
transport,
AggregatedBandwidthSinks::new(tcp_bandwidth, Some(quic_bandwidth)),
)
} else {
tcp.with_bandwidth_logging()
(tcp, AggregatedBandwidthSinks::new(tcp_bandwidth, None))
};

// // Enables DNS over the transport.
// Enables DNS over the transport.
let transport = libp2p::dns::TokioDnsConfig::system(transport)?.boxed();

Ok((transport, bandwidth))
Expand Down
42 changes: 21 additions & 21 deletions beacon_node/network/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use beacon_chain::{
use fnv::FnvHashMap;
pub use lighthouse_metrics::*;
use lighthouse_network::{
peer_manager::peerdb::client::ClientKind, types::GossipKind, BandwidthSinks, GossipTopic,
Gossipsub, NetworkGlobals,
metrics::AggregatedBandwidthSinks, peer_manager::peerdb::client::ClientKind, types::GossipKind,
GossipTopic, Gossipsub, NetworkGlobals,
};
use std::sync::Arc;
use strum::IntoEnumIterator;
Expand Down Expand Up @@ -226,18 +226,8 @@ lazy_static! {
/*
* Bandwidth metrics
*/
pub static ref INBOUND_LIBP2P_BYTES: Result<IntGauge> =
try_create_int_gauge("libp2p_inbound_bytes", "The inbound bandwidth over libp2p");

pub static ref OUTBOUND_LIBP2P_BYTES: Result<IntGauge> = try_create_int_gauge(
"libp2p_outbound_bytes",
"The outbound bandwidth over libp2p"
);
pub static ref TOTAL_LIBP2P_BANDWIDTH: Result<IntGauge> = try_create_int_gauge(
"libp2p_total_bandwidth",
"The total inbound/outbound bandwidth over libp2p"
);

pub static ref LIBP2P_BYTES: Result<IntCounterVec> =
try_create_int_counter_vec("libp2p_inbound_bytes", "The bandwidth over libp2p", &["direction", "transport"]);

/*
* Sync related metrics
Expand Down Expand Up @@ -328,13 +318,23 @@ lazy_static! {
);
}

pub fn update_bandwidth_metrics(bandwidth: Arc<BandwidthSinks>) {
set_gauge(&INBOUND_LIBP2P_BYTES, bandwidth.total_inbound() as i64);
set_gauge(&OUTBOUND_LIBP2P_BYTES, bandwidth.total_outbound() as i64);
set_gauge(
&TOTAL_LIBP2P_BANDWIDTH,
(bandwidth.total_inbound() + bandwidth.total_outbound()) as i64,
);
pub fn update_bandwidth_metrics(bandwidth: &AggregatedBandwidthSinks) {
if let Some(tcp_in_bandwidth) = get_int_counter(&LIBP2P_BYTES, &["inbound", "tcp"]) {
tcp_in_bandwidth.reset();
tcp_in_bandwidth.inc_by(bandwidth.total_tcp_inbound());
}
if let Some(tcp_out_bandwidth) = get_int_counter(&LIBP2P_BYTES, &["outbound", "tcp"]) {
tcp_out_bandwidth.reset();
tcp_out_bandwidth.inc_by(bandwidth.total_tcp_outbound());
}
if let Some(quic_in_bandwidth) = get_int_counter(&LIBP2P_BYTES, &["inbound", "quic"]) {
quic_in_bandwidth.reset();
quic_in_bandwidth.inc_by(bandwidth.total_quic_inbound());
}
if let Some(quic_out_bandwidth) = get_int_counter(&LIBP2P_BYTES, &["outbound", "quic"]) {
quic_out_bandwidth.reset();
quic_out_bandwidth.inc_by(bandwidth.total_quic_outbound());
}
}

pub fn register_finality_update_error(error: &LightClientFinalityUpdateError) {
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
}
}
}
metrics::update_bandwidth_metrics(self.libp2p.bandwidth.clone());
metrics::update_bandwidth_metrics(&self.libp2p.bandwidth);
}
};
executor.spawn(service_fut, "network");
Expand Down

0 comments on commit f063917

Please sign in to comment.