diff --git a/beacon_node/lighthouse_network/src/gossipsub/behaviour.rs b/beacon_node/lighthouse_network/src/gossipsub/behaviour.rs index 455c285c811..10025626d31 100644 --- a/beacon_node/lighthouse_network/src/gossipsub/behaviour.rs +++ b/beacon_node/lighthouse_network/src/gossipsub/behaviour.rs @@ -854,6 +854,13 @@ where } } + /// Register topics to ensure metrics are recorded correctly for these topics. + pub fn register_topics_for_metrics(&mut self, topics: Vec) { + if let Some(metrics) = &mut self.metrics { + metrics.register_allowed_topics(topics); + } + } + /// Adds a new peer to the list of explicitly connected peers. pub fn add_explicit_peer(&mut self, peer_id: &PeerId) { tracing::debug!(peer=%peer_id, "Adding explicit peer"); diff --git a/beacon_node/lighthouse_network/src/gossipsub/metrics.rs b/beacon_node/lighthouse_network/src/gossipsub/metrics.rs index 94bcdbc487b..91bcd5f54bc 100644 --- a/beacon_node/lighthouse_network/src/gossipsub/metrics.rs +++ b/beacon_node/lighthouse_network/src/gossipsub/metrics.rs @@ -38,7 +38,7 @@ const DEFAULT_MAX_TOPICS: usize = 300; // Default value that limits how many topics for which there has never been a subscription do we // store metrics. -const DEFAULT_MAX_NEVER_SUBSCRIBED_TOPICS: usize = 50; +const DEFAULT_MAX_NEVER_SUBSCRIBED_TOPICS: usize = 100; #[derive(Debug, Clone)] pub struct Config { @@ -392,13 +392,21 @@ impl Metrics { } } - /// Increase the number of peers do we known are subscribed to this topic. + /// Registers a set of topics that we want to store calculate metrics for. + pub(crate) fn register_allowed_topics(&mut self, topics: Vec) { + for topic_hash in topics { + self.topic_info.insert(topic_hash, true); + } + } + + /// Increase the number of peers that are subscribed to this topic. pub(crate) fn inc_topic_peers(&mut self, topic: &TopicHash) { if self.register_topic(topic).is_ok() { self.topic_peers_count.get_or_create(topic).inc(); } } + /// Decrease the number of peers that are subscribed to this topic. pub(crate) fn dec_topic_peers(&mut self, topic: &TopicHash) { if self.register_topic(topic).is_ok() { self.topic_peers_count.get_or_create(topic).dec(); diff --git a/beacon_node/lighthouse_network/src/service/mod.rs b/beacon_node/lighthouse_network/src/service/mod.rs index 3717c497386..63a22c53ef1 100644 --- a/beacon_node/lighthouse_network/src/service/mod.rs +++ b/beacon_node/lighthouse_network/src/service/mod.rs @@ -18,9 +18,9 @@ use crate::rpc::*; use crate::service::behaviour::BehaviourEvent; pub use crate::service::behaviour::Gossipsub; use crate::types::{ - fork_core_topics, subnet_from_topic_hash, GossipEncoding, GossipKind, GossipTopic, - SnappyTransform, Subnet, SubnetDiscovery, ALTAIR_CORE_TOPICS, BASE_CORE_TOPICS, - CAPELLA_CORE_TOPICS, DENEB_CORE_TOPICS, LIGHT_CLIENT_GOSSIP_TOPICS, + attestation_sync_committee_topics, fork_core_topics, subnet_from_topic_hash, GossipEncoding, + GossipKind, GossipTopic, SnappyTransform, Subnet, SubnetDiscovery, ALTAIR_CORE_TOPICS, + BASE_CORE_TOPICS, CAPELLA_CORE_TOPICS, DENEB_CORE_TOPICS, LIGHT_CLIENT_GOSSIP_TOPICS, }; use crate::EnrExt; use crate::Eth2Enr; @@ -275,6 +275,22 @@ impl Network { .with_peer_score(params, thresholds) .expect("Valid score params and thresholds"); + // If we are using metrics, then register which topics we want to make sure to keep + // track of + if ctx.libp2p_registry.is_some() { + let topics_to_keep_metrics_for = attestation_sync_committee_topics::() + .map(|gossip_kind| { + Topic::from(GossipTopic::new( + gossip_kind, + GossipEncoding::default(), + enr_fork_id.fork_digest, + )) + .into() + }) + .collect::>(); + gossipsub.register_topics_for_metrics(topics_to_keep_metrics_for); + } + (gossipsub, update_gossipsub_scores) }; @@ -640,6 +656,20 @@ impl Network { let topic = GossipTopic::new(kind, GossipEncoding::default(), new_fork_digest); self.subscribe(topic); } + + // Register the new topics for metrics + let topics_to_keep_metrics_for = attestation_sync_committee_topics::() + .map(|gossip_kind| { + Topic::from(GossipTopic::new( + gossip_kind, + GossipEncoding::default(), + new_fork_digest, + )) + .into() + }) + .collect::>(); + self.gossipsub_mut() + .register_topics_for_metrics(topics_to_keep_metrics_for); } /// Unsubscribe from all topics that doesn't have the given fork_digest diff --git a/beacon_node/lighthouse_network/src/types/mod.rs b/beacon_node/lighthouse_network/src/types/mod.rs index af9e9ef45d5..8cf52f47dcd 100644 --- a/beacon_node/lighthouse_network/src/types/mod.rs +++ b/beacon_node/lighthouse_network/src/types/mod.rs @@ -17,7 +17,7 @@ pub use pubsub::{PubsubMessage, SnappyTransform}; pub use subnet::{Subnet, SubnetDiscovery}; pub use sync_state::{BackFillState, SyncState}; pub use topics::{ - core_topics_to_subscribe, fork_core_topics, subnet_from_topic_hash, GossipEncoding, GossipKind, - GossipTopic, ALTAIR_CORE_TOPICS, BASE_CORE_TOPICS, CAPELLA_CORE_TOPICS, DENEB_CORE_TOPICS, - LIGHT_CLIENT_GOSSIP_TOPICS, + attestation_sync_committee_topics, core_topics_to_subscribe, fork_core_topics, + subnet_from_topic_hash, GossipEncoding, GossipKind, GossipTopic, ALTAIR_CORE_TOPICS, + BASE_CORE_TOPICS, CAPELLA_CORE_TOPICS, DENEB_CORE_TOPICS, LIGHT_CLIENT_GOSSIP_TOPICS, }; diff --git a/beacon_node/lighthouse_network/src/types/topics.rs b/beacon_node/lighthouse_network/src/types/topics.rs index 717b976de04..b9194022cee 100644 --- a/beacon_node/lighthouse_network/src/types/topics.rs +++ b/beacon_node/lighthouse_network/src/types/topics.rs @@ -1,7 +1,7 @@ use crate::gossipsub::{IdentTopic as Topic, TopicHash}; use serde::{Deserialize, Serialize}; use strum::AsRefStr; -use types::{ChainSpec, EthSpec, ForkName, SubnetId, SyncSubnetId}; +use types::{ChainSpec, EthSpec, ForkName, SubnetId, SyncSubnetId, Unsigned}; use crate::Subnet; @@ -62,6 +62,17 @@ pub fn fork_core_topics(fork_name: &ForkName, spec: &ChainSpec) -> V } } +/// Returns all the attestation and sync committee topics, for a given fork. +pub fn attestation_sync_committee_topics() -> impl Iterator { + (0..TSpec::SubnetBitfieldLength::to_usize()) + .map(|subnet_id| GossipKind::Attestation(SubnetId::new(subnet_id as u64))) + .chain( + (0..TSpec::SyncCommitteeSubnetCount::to_usize()).map(|sync_committee_id| { + GossipKind::SyncCommitteeMessage(SyncSubnetId::new(sync_committee_id as u64)) + }), + ) +} + /// Returns all the topics that we need to subscribe to for a given fork /// including topics from older forks and new topics for the current fork. pub fn core_topics_to_subscribe(