Skip to content

Commit

Permalink
Merge branch 'master' into test/multi-get-perf
Browse files Browse the repository at this point in the history
  • Loading branch information
rymnc authored Aug 30, 2024
2 parents 0af8533 + 95b373e commit c9ed68c
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 7 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
## [Unreleased]

### Added
- [2135](https://github.com/FuelLabs/fuel-core/pull/2135): Added metrics logging for number of blocks served over the p2p req/res protocol.
- [2142](https://github.com/FuelLabs/fuel-core/pull/2142): Added benchmarks for varied forms of db lookups to assist in optimizations.

## [Version 0.35.0]
Expand Down
26 changes: 24 additions & 2 deletions crates/metrics/src/p2p_metrics.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,24 @@
use crate::global_registry;
use prometheus_client::metrics::counter::Counter;
use prometheus_client::metrics::{
counter::Counter,
gauge::Gauge,
};
use std::sync::OnceLock;

pub struct P2PMetrics {
pub unique_peers: Counter,
pub blocks_requested: Gauge,
}

impl P2PMetrics {
fn new() -> Self {
let unique_peers = Counter::default();
let blocks_requested = Gauge::default();

let metrics = P2PMetrics { unique_peers };
let metrics = P2PMetrics {
unique_peers,
blocks_requested,
};

let mut registry = global_registry().registry.lock();
registry.register(
Expand All @@ -19,6 +27,12 @@ impl P2PMetrics {
metrics.unique_peers.clone(),
);

registry.register(
"Blocks_Requested",
"A Gauge which keeps track of how many blocks were requested and served over the p2p req/res protocol",
metrics.blocks_requested.clone()
);

metrics
}
}
Expand All @@ -28,3 +42,11 @@ static P2P_METRICS: OnceLock<P2PMetrics> = OnceLock::new();
pub fn p2p_metrics() -> &'static P2PMetrics {
P2P_METRICS.get_or_init(P2PMetrics::new)
}

pub fn increment_unique_peers() {
p2p_metrics().unique_peers.inc();
}

pub fn set_blocks_requested(count: usize) {
p2p_metrics().blocks_requested.set(count as i64);
}
15 changes: 11 additions & 4 deletions crates/services/p2p/src/p2p_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use crate::{
},
TryPeerId,
};
use fuel_core_metrics::p2p_metrics::p2p_metrics;
use fuel_core_metrics::p2p_metrics::increment_unique_peers;
use fuel_core_types::{
fuel_types::BlockHeight,
services::p2p::peer_reputation::AppScore,
Expand Down Expand Up @@ -271,6 +271,15 @@ impl FuelP2PService {
}
}

pub fn update_metrics<T>(&self, update_fn: T)
where
T: FnOnce(),
{
if self.metrics {
update_fn();
}
}

#[cfg(feature = "test-helpers")]
pub fn multiaddrs(&self) -> Vec<Multiaddr> {
let local_peer = self.local_peer_id;
Expand Down Expand Up @@ -644,9 +653,7 @@ impl FuelP2PService {
fn handle_identify_event(&mut self, event: identify::Event) -> Option<FuelP2PEvent> {
match event {
identify::Event::Received { peer_id, info } => {
if self.metrics {
p2p_metrics().unique_peers.inc();
}
self.update_metrics(increment_unique_peers);

let mut addresses = info.listen_addrs;
let agent_version = info.agent_version;
Expand Down
31 changes: 30 additions & 1 deletion crates/services/p2p/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use crate::{
},
};
use anyhow::anyhow;
use fuel_core_metrics::p2p_metrics::set_blocks_requested;
use fuel_core_services::{
stream::BoxStream,
RunnableService,
Expand Down Expand Up @@ -196,9 +197,20 @@ pub trait TaskP2PService: Send {
) -> anyhow::Result<()>;

fn update_block_height(&mut self, height: BlockHeight) -> anyhow::Result<()>;

fn update_metrics<T>(&self, update_fn: T)
where
T: FnOnce();
}

impl TaskP2PService for FuelP2PService {
fn update_metrics<T>(&self, update_fn: T)
where
T: FnOnce(),
{
FuelP2PService::update_metrics(self, update_fn)
}

fn get_all_peer_info(&self) -> Vec<(&PeerId, &PeerInfo)> {
self.peer_manager().get_all_peers().collect()
}
Expand Down Expand Up @@ -427,6 +439,13 @@ where
V: AtomicView + 'static,
V::LatestView: P2pDb,
{
fn update_metrics<T>(&self, update_fn: T)
where
T: FnOnce(),
{
self.p2p_service.update_metrics(update_fn)
}

fn process_request(
&mut self,
request_message: RequestMessage,
Expand Down Expand Up @@ -464,8 +483,11 @@ where
// If there are other types of data we send over p2p req/res protocol, then this needs
// to be generalized
let max_len = self.max_headers_per_request;
let range_len = range.len();

self.update_metrics(|| set_blocks_requested(range_len));

if range.len() > max_len {
if range_len > max_len {
tracing::error!(
requested_length = range.len(),
max_len,
Expand Down Expand Up @@ -1031,6 +1053,13 @@ pub mod tests {
}

impl TaskP2PService for FakeP2PService {
fn update_metrics<T>(&self, _: T)
where
T: FnOnce(),
{
unimplemented!()
}

fn get_all_peer_info(&self) -> Vec<(&PeerId, &PeerInfo)> {
self.peer_info.iter().map(|tup| (&tup.0, &tup.1)).collect()
}
Expand Down

0 comments on commit c9ed68c

Please sign in to comment.