Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Correct the metrics for topic subscriptions #5344

Merged
merged 3 commits into from
Mar 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions beacon_node/lighthouse_network/src/gossipsub/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -826,6 +826,13 @@ where
}
}

/// Register topics to ensure metrics are recorded correctly for these topics.
pub fn register_topics_for_metrics(&mut self, topics: Vec<TopicHash>) {
if let Some(metrics) = &mut self.metrics {
metrics.register_allowed_topics(topics);
}
}

/// Adds a new peer to the list of explicitly connected peers.
pub fn add_explicit_peer(&mut self, peer_id: &PeerId) {
tracing::debug!(peer=%peer_id, "Adding explicit peer");
Expand Down
12 changes: 10 additions & 2 deletions beacon_node/lighthouse_network/src/gossipsub/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ const DEFAULT_MAX_TOPICS: usize = 300;

// Default value that limits how many topics for which there has never been a subscription do we
// store metrics.
const DEFAULT_MAX_NEVER_SUBSCRIBED_TOPICS: usize = 50;
const DEFAULT_MAX_NEVER_SUBSCRIBED_TOPICS: usize = 100;

#[derive(Debug, Clone)]
pub struct Config {
Expand Down Expand Up @@ -392,13 +392,21 @@ impl Metrics {
}
}

/// Increase the number of peers do we known are subscribed to this topic.
/// Registers a set of topics that we want to store calculate metrics for.
pub(crate) fn register_allowed_topics(&mut self, topics: Vec<TopicHash>) {
for topic_hash in topics {
self.topic_info.insert(topic_hash, true);
}
}

/// Increase the number of peers that are subscribed to this topic.
pub(crate) fn inc_topic_peers(&mut self, topic: &TopicHash) {
if self.register_topic(topic).is_ok() {
self.topic_peers_count.get_or_create(topic).inc();
}
}

/// Decrease the number of peers that are subscribed to this topic.
pub(crate) fn dec_topic_peers(&mut self, topic: &TopicHash) {
if self.register_topic(topic).is_ok() {
self.topic_peers_count.get_or_create(topic).dec();
Expand Down
36 changes: 33 additions & 3 deletions beacon_node/lighthouse_network/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ use crate::rpc::*;
use crate::service::behaviour::BehaviourEvent;
pub use crate::service::behaviour::Gossipsub;
use crate::types::{
fork_core_topics, subnet_from_topic_hash, GossipEncoding, GossipKind, GossipTopic,
SnappyTransform, Subnet, SubnetDiscovery, ALTAIR_CORE_TOPICS, BASE_CORE_TOPICS,
CAPELLA_CORE_TOPICS, DENEB_CORE_TOPICS, LIGHT_CLIENT_GOSSIP_TOPICS,
attestation_sync_committee_topics, fork_core_topics, subnet_from_topic_hash, GossipEncoding,
GossipKind, GossipTopic, SnappyTransform, Subnet, SubnetDiscovery, ALTAIR_CORE_TOPICS,
BASE_CORE_TOPICS, CAPELLA_CORE_TOPICS, DENEB_CORE_TOPICS, LIGHT_CLIENT_GOSSIP_TOPICS,
};
use crate::EnrExt;
use crate::Eth2Enr;
Expand Down Expand Up @@ -275,6 +275,22 @@ impl<AppReqId: ReqId, TSpec: EthSpec> Network<AppReqId, TSpec> {
.with_peer_score(params, thresholds)
.expect("Valid score params and thresholds");

// If we are using metrics, then register which topics we want to make sure to keep
// track of
if ctx.libp2p_registry.is_some() {
let topics_to_keep_metrics_for = attestation_sync_committee_topics::<TSpec>()
.map(|gossip_kind| {
Topic::from(GossipTopic::new(
gossip_kind,
GossipEncoding::default(),
enr_fork_id.fork_digest,
))
.into()
})
.collect::<Vec<TopicHash>>();
gossipsub.register_topics_for_metrics(topics_to_keep_metrics_for);
}

(gossipsub, update_gossipsub_scores)
};

Expand Down Expand Up @@ -640,6 +656,20 @@ impl<AppReqId: ReqId, TSpec: EthSpec> Network<AppReqId, TSpec> {
let topic = GossipTopic::new(kind, GossipEncoding::default(), new_fork_digest);
self.subscribe(topic);
}

// Register the new topics for metrics
let topics_to_keep_metrics_for = attestation_sync_committee_topics::<TSpec>()
.map(|gossip_kind| {
Topic::from(GossipTopic::new(
gossip_kind,
GossipEncoding::default(),
new_fork_digest,
))
.into()
})
.collect::<Vec<TopicHash>>();
self.gossipsub_mut()
.register_topics_for_metrics(topics_to_keep_metrics_for);
}

/// Unsubscribe from all topics that doesn't have the given fork_digest
Expand Down
6 changes: 3 additions & 3 deletions beacon_node/lighthouse_network/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pub use pubsub::{PubsubMessage, SnappyTransform};
pub use subnet::{Subnet, SubnetDiscovery};
pub use sync_state::{BackFillState, SyncState};
pub use topics::{
core_topics_to_subscribe, fork_core_topics, subnet_from_topic_hash, GossipEncoding, GossipKind,
GossipTopic, ALTAIR_CORE_TOPICS, BASE_CORE_TOPICS, CAPELLA_CORE_TOPICS, DENEB_CORE_TOPICS,
LIGHT_CLIENT_GOSSIP_TOPICS,
attestation_sync_committee_topics, core_topics_to_subscribe, fork_core_topics,
subnet_from_topic_hash, GossipEncoding, GossipKind, GossipTopic, ALTAIR_CORE_TOPICS,
BASE_CORE_TOPICS, CAPELLA_CORE_TOPICS, DENEB_CORE_TOPICS, LIGHT_CLIENT_GOSSIP_TOPICS,
};
13 changes: 12 additions & 1 deletion beacon_node/lighthouse_network/src/types/topics.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::gossipsub::{IdentTopic as Topic, TopicHash};
use serde::{Deserialize, Serialize};
use strum::AsRefStr;
use types::{ChainSpec, EthSpec, ForkName, SubnetId, SyncSubnetId};
use types::{ChainSpec, EthSpec, ForkName, SubnetId, SyncSubnetId, Unsigned};

use crate::Subnet;

Expand Down Expand Up @@ -62,6 +62,17 @@ pub fn fork_core_topics<T: EthSpec>(fork_name: &ForkName, spec: &ChainSpec) -> V
}
}

/// Returns all the attestation and sync committee topics, for a given fork.
pub fn attestation_sync_committee_topics<TSpec: EthSpec>() -> impl Iterator<Item = GossipKind> {
(0..TSpec::SubnetBitfieldLength::to_usize())
.map(|subnet_id| GossipKind::Attestation(SubnetId::new(subnet_id as u64)))
.chain(
(0..TSpec::SyncCommitteeSubnetCount::to_usize()).map(|sync_committee_id| {
GossipKind::SyncCommitteeMessage(SyncSubnetId::new(sync_committee_id as u64))
}),
)
}

/// Returns all the topics that we need to subscribe to for a given fork
/// including topics from older forks and new topics for the current fork.
pub fn core_topics_to_subscribe<T: EthSpec>(
Expand Down
Loading