Skip to content

Commit

Permalink
Add sync network context cache size metrics (#6049)
Browse files Browse the repository at this point in the history
* Add sync network context cache size metrics
  • Loading branch information
dapplion authored Jul 9, 2024
1 parent d46ac6c commit bde0428
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 21 deletions.
10 changes: 10 additions & 0 deletions beacon_node/network/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,16 @@ lazy_static! {
"sync_lookups_stuck_total",
"Total count of sync lookups that are stuck and dropped",
);
pub static ref SYNC_ACTIVE_NETWORK_REQUESTS: Result<IntGaugeVec> = try_create_int_gauge_vec(
"sync_active_network_requests",
"Current count of active network requests from sync",
&["type"],
);
pub static ref SYNC_UNKNOWN_NETWORK_REQUESTS: Result<IntCounterVec> = try_create_int_counter_vec(
"sync_unknwon_network_request",
"Total count of network messages received for unknown active requests",
&["type"],
);

/*
* Block Delay Metrics
Expand Down
5 changes: 5 additions & 0 deletions beacon_node/network/src/sync/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,8 @@ impl<T: BeaconChainTypes> SyncManager<T> {
// unless there is a bug.
let mut prune_lookups_interval = tokio::time::interval(Duration::from_secs(15));

let mut register_metrics_interval = tokio::time::interval(Duration::from_secs(5));

// process any inbound messages
loop {
tokio::select! {
Expand All @@ -582,6 +584,9 @@ impl<T: BeaconChainTypes> SyncManager<T> {
_ = prune_lookups_interval.tick() => {
self.block_lookups.prune_lookups();
}
_ = register_metrics_interval.tick() => {
self.network.register_metrics();
}
}
}
}
Expand Down
64 changes: 43 additions & 21 deletions beacon_node/network/src/sync/network_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use self::requests::{ActiveBlobsByRootRequest, ActiveBlocksByRootRequest};
pub use self::requests::{BlobsByRootSingleBlockRequest, BlocksByRootSingleRequest};
use super::block_sidecar_coupling::BlocksAndBlobsRequestInfo;
use super::range_sync::{BatchId, ByRangeRequestType, ChainId};
use crate::metrics;
use crate::network_beacon_processor::NetworkBeaconProcessor;
use crate::service::NetworkMessage;
use crate::status::ToStatusMessage;
Expand Down Expand Up @@ -348,27 +349,28 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
request_id: Id,
block_or_blob: BlockOrBlob<T::EthSpec>,
) -> Option<BlocksAndBlobsByRangeResponse<T::EthSpec>> {
match self.range_blocks_and_blobs_requests.entry(request_id) {
Entry::Occupied(mut entry) => {
let (_, info) = entry.get_mut();
match block_or_blob {
BlockOrBlob::Block(maybe_block) => info.add_block_response(maybe_block),
BlockOrBlob::Blob(maybe_sidecar) => info.add_sidecar_response(maybe_sidecar),
}
if info.is_finished() {
// If the request is finished, dequeue everything
let (sender_id, info) = entry.remove();
let request_type = info.get_request_type();
Some(BlocksAndBlobsByRangeResponse {
sender_id,
request_type,
responses: info.into_responses(),
})
} else {
None
}
}
Entry::Vacant(_) => None,
let Entry::Occupied(mut entry) = self.range_blocks_and_blobs_requests.entry(request_id)
else {
metrics::inc_counter_vec(&metrics::SYNC_UNKNOWN_NETWORK_REQUESTS, &["range_blocks"]);
return None;
};

let (_, info) = entry.get_mut();
match block_or_blob {
BlockOrBlob::Block(maybe_block) => info.add_block_response(maybe_block),
BlockOrBlob::Blob(maybe_sidecar) => info.add_sidecar_response(maybe_sidecar),
}
if info.is_finished() {
// If the request is finished, dequeue everything
let (sender_id, info) = entry.remove();
let request_type = info.get_request_type();
Some(BlocksAndBlobsByRangeResponse {
sender_id,
request_type,
responses: info.into_responses(),
})
} else {
None
}
}

Expand Down Expand Up @@ -631,6 +633,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
block: RpcEvent<Arc<SignedBeaconBlock<T::EthSpec>>>,
) -> Option<RpcResponseResult<Arc<SignedBeaconBlock<T::EthSpec>>>> {
let Entry::Occupied(mut request) = self.blocks_by_root_requests.entry(request_id) else {
metrics::inc_counter_vec(&metrics::SYNC_UNKNOWN_NETWORK_REQUESTS, &["blocks_by_root"]);
return None;
};

Expand Down Expand Up @@ -668,6 +671,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
blob: RpcEvent<Arc<BlobSidecar<T::EthSpec>>>,
) -> Option<RpcResponseResult<FixedBlobSidecarList<T::EthSpec>>> {
let Entry::Occupied(mut request) = self.blobs_by_root_requests.entry(request_id) else {
metrics::inc_counter_vec(&metrics::SYNC_UNKNOWN_NETWORK_REQUESTS, &["blobs_by_root"]);
return None;
};

Expand Down Expand Up @@ -771,6 +775,24 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
SendErrorProcessor::SendError
})
}

pub(crate) fn register_metrics(&self) {
metrics::set_gauge_vec(
&metrics::SYNC_ACTIVE_NETWORK_REQUESTS,
&["blocks_by_root"],
self.blocks_by_root_requests.len() as i64,
);
metrics::set_gauge_vec(
&metrics::SYNC_ACTIVE_NETWORK_REQUESTS,
&["blobs_by_root"],
self.blobs_by_root_requests.len() as i64,
);
metrics::set_gauge_vec(
&metrics::SYNC_ACTIVE_NETWORK_REQUESTS,
&["range_blocks"],
self.range_blocks_and_blobs_requests.len() as i64,
);
}
}

fn to_fixed_blob_sidecar_list<E: EthSpec>(
Expand Down

0 comments on commit bde0428

Please sign in to comment.