Skip to content

Commit

Permalink
chore: do not keep track of which tipsets peers have seen (#5077)
Browse files Browse the repository at this point in the history
  • Loading branch information
lemmih authored Dec 16, 2024
1 parent 420612f commit 1c0650b
Show file tree
Hide file tree
Showing 7 changed files with 37 additions and 201 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@

### Removed

- [#5077](https://github.com/ChainSafe/forest/pull/5077) Remove
`peer_tipset_epoch` from the metrics.

### Fixed

## Forest v.0.23.2 "Feint"
Expand Down
46 changes: 10 additions & 36 deletions src/chain_sync/chain_muxer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::{
use cid::Cid;
use futures::{future::Future, stream::FuturesUnordered, StreamExt};
use fvm_ipld_blockstore::Blockstore;
use itertools::{Either, Itertools};
use itertools::Itertools;
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use thiserror::Error;
Expand Down Expand Up @@ -295,7 +295,7 @@ where
}
}

async fn handle_peer_disconnected_event(network: SyncNetworkContext<DB>, peer_id: PeerId) {
fn handle_peer_disconnected_event(network: SyncNetworkContext<DB>, peer_id: PeerId) {
network.peer_manager().remove_peer(&peer_id);
network.peer_manager().unmark_peer_bad(&peer_id);
}
Expand All @@ -321,7 +321,7 @@ where
block_delay: u32,
stateless_mode: bool,
) -> Result<Option<FullTipset>, ChainMuxerError> {
let (tipset, source) = match event {
let tipset = match event {
NetworkEvent::HelloRequestInbound => {
metrics::LIBP2P_MESSAGE_TOTAL
.get_or_create(&metrics::values::HELLO_REQUEST_INBOUND)
Expand All @@ -333,29 +333,14 @@ where
.get_or_create(&metrics::values::HELLO_RESPONSE_OUTBOUND)
.inc();
let tipset_keys = TipsetKey::from(request.heaviest_tip_set.clone());
network.peer_manager().update_peer_head(
source,
if let Ok(Some(ts)) = Tipset::load(chain_store.blockstore(), &tipset_keys) {
Either::Right(Arc::new(ts))
} else {
Either::Left(tipset_keys.clone())
},
);
let tipset = match Self::get_full_tipset(
Self::get_full_tipset(
network.clone(),
chain_store.clone(),
Some(source),
tipset_keys,
)
.await
{
Ok(tipset) => tipset,
Err(why) => {
debug!("Querying full tipset failed: {}", why);
return Err(why);
}
};
(tipset, source)
.inspect_err(|e| debug!("Querying full tipset failed: {}", e))?
}
NetworkEvent::HelloRequestOutbound => {
metrics::LIBP2P_MESSAGE_TOTAL
Expand Down Expand Up @@ -386,14 +371,10 @@ where
metrics::LIBP2P_MESSAGE_TOTAL
.get_or_create(&metrics::values::PEER_DISCONNECTED)
.inc();
// Spawn and immediately move on to the next event
tokio::task::spawn(Self::handle_peer_disconnected_event(
network.clone(),
peer_id,
));
Self::handle_peer_disconnected_event(network.clone(), peer_id);
return Ok(None);
}
NetworkEvent::PubsubMessage { source, message } => match message {
NetworkEvent::PubsubMessage { message } => match message {
PubsubMessage::Block(b) => {
metrics::LIBP2P_MESSAGE_TOTAL
.get_or_create(&metrics::values::PUBSUB_BLOCK)
Expand All @@ -402,14 +383,13 @@ where
return Ok(None);
}
// Assemble full tipset from block only in stateful mode
let tipset = Self::get_full_tipset(
Self::get_full_tipset(
network.clone(),
chain_store.clone(),
None,
TipsetKey::from(nunny::vec![*b.header.cid()]),
)
.await?;
(tipset, source)
.await?
}
PubsubMessage::Message(m) => {
metrics::LIBP2P_MESSAGE_TOTAL
Expand Down Expand Up @@ -447,17 +427,11 @@ where
}
};

// Update the peer head
network.peer_manager().update_peer_head(
source,
Either::Right(Arc::new(tipset.clone().into_tipset())),
);

if tipset.epoch() + (SECONDS_IN_DAY / block_delay as i64)
< chain_store.heaviest_tipset().epoch()
{
debug!(
"Skip processing tipset at epoch {} from {source} that is too old",
"Skip processing tipset at epoch {} that is too old",
tipset.epoch()
);
return Ok(None);
Expand Down
21 changes: 3 additions & 18 deletions src/health/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,8 @@ mod test {
use std::net::{IpAddr, Ipv4Addr, SocketAddr};

use crate::db::SettingsExt;
use crate::{
blocks::{CachingBlockHeader, Tipset},
chain_sync::SyncStage,
Client,
};
use crate::{chain_sync::SyncStage, Client};

use itertools::Either;
use reqwest::StatusCode;

use super::*;
Expand Down Expand Up @@ -201,12 +196,7 @@ mod test {
// instrument the state so that the live requirements are met
sync_state.write().set_stage(SyncStage::Headers);
let peer = libp2p::PeerId::random();
peer_manager.update_peer_head(
peer,
Either::Right(Arc::new(
Tipset::new(vec![CachingBlockHeader::default()]).unwrap(),
)),
);
peer_manager.touch_peer(&peer);

assert_eq!(
call_healthcheck(false).await.unwrap().status(),
Expand Down Expand Up @@ -283,12 +273,7 @@ mod test {
sync_state.write().set_epoch(i64::MAX);
sync_state.write().set_stage(SyncStage::Headers);
let peer = libp2p::PeerId::random();
peer_manager.update_peer_head(
peer,
Either::Right(Arc::new(
Tipset::new(vec![CachingBlockHeader::default()]).unwrap(),
)),
);
peer_manager.touch_peer(&peer);

assert_eq!(
call_healthcheck(false).await.unwrap().status(),
Expand Down
36 changes: 1 addition & 35 deletions src/libp2p/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
// Copyright 2019-2024 ChainSafe Systems
// SPDX-License-Identifier: Apache-2.0, MIT

use libp2p::PeerId;
use once_cell::sync::Lazy;
use prometheus_client::{
encoding::{EncodeLabelKey, EncodeLabelSet, EncodeLabelValue, LabelSetEncoder},
metrics::{counter::Counter, family::Family, gauge::Gauge},
};
use prometheus_client::metrics::{counter::Counter, gauge::Gauge};

pub static PEER_FAILURE_TOTAL: Lazy<Counter> = Lazy::new(|| {
let metric = Counter::default();
Expand Down Expand Up @@ -37,33 +33,3 @@ pub static BAD_PEERS: Lazy<Gauge> = Lazy::new(|| {
);
metric
});

pub static PEER_TIPSET_EPOCH: Lazy<Family<PeerLabel, Gauge>> = Lazy::new(|| {
let metric = Family::default();
crate::metrics::default_registry().register(
"peer_tipset_epoch",
"peer tipset epoch",
metric.clone(),
);
metric
});

#[derive(Clone, Debug, Hash, PartialEq, Eq)]
pub struct PeerLabel(PeerId);

impl PeerLabel {
pub const fn new(peer: PeerId) -> Self {
Self(peer)
}
}

impl EncodeLabelSet for PeerLabel {
fn encode(&self, mut encoder: LabelSetEncoder) -> Result<(), std::fmt::Error> {
let mut label_encoder = encoder.encode_label();
let mut label_key_encoder = label_encoder.encode_label_key()?;
EncodeLabelKey::encode(&"PEER", &mut label_key_encoder)?;
let mut label_value_encoder = label_key_encoder.encode_label_value()?;
EncodeLabelValue::encode(&self.0.to_string(), &mut label_value_encoder)?;
label_value_encoder.finish()
}
}
107 changes: 18 additions & 89 deletions src/libp2p/peer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,12 @@
// SPDX-License-Identifier: Apache-2.0, MIT

use std::{
cmp::Ordering,
sync::Arc,
time::{Duration, Instant},
};

use crate::{
blocks::{Tipset, TipsetKey},
shim::clock::ChainEpoch,
};
use ahash::{HashMap, HashSet};
use flume::{Receiver, Sender};
use itertools::Either;
use parking_lot::RwLock;
use rand::seq::SliceRandom;
use tracing::{debug, trace, warn};
Expand All @@ -31,11 +25,9 @@ const LOCAL_INV_ALPHA: u32 = 5;
/// Global duration multiplier, affects duration delta change.
const GLOBAL_INV_ALPHA: u32 = 20;

#[derive(Debug)]
#[derive(Debug, Default)]
/// Contains info about the peer's head [Tipset], as well as the request stats.
struct PeerInfo {
/// Head tipset key received from hello message or gossip sub message.
head: Either<TipsetKey, Arc<Tipset>>,
/// Number of successful requests.
successes: u32,
/// Number of failed requests.
Expand All @@ -44,24 +36,6 @@ struct PeerInfo {
average_time: Duration,
}

impl PeerInfo {
fn new(head: Either<TipsetKey, Arc<Tipset>>) -> Self {
Self {
head,
successes: 0,
failures: 0,
average_time: Default::default(),
}
}

fn head_epoch(&self) -> Option<ChainEpoch> {
match &self.head {
Either::Left(_) => None,
Either::Right(ts) => Some(ts.epoch()),
}
}
}

/// Peer tracking sets, these are handled together to avoid race conditions or
/// deadlocks when updating state.
#[derive(Default)]
Expand Down Expand Up @@ -105,53 +79,28 @@ impl Default for PeerManager {
}

impl PeerManager {
/// Updates peer's heaviest tipset. If the peer does not exist in the set, a
/// new `PeerInfo` will be generated.
pub fn update_peer_head(&self, peer_id: PeerId, head: Either<TipsetKey, Arc<Tipset>>) {
let mut peers = self.peers.write();
trace!("Updating head for PeerId {}", &peer_id);
let head_epoch = if let Some(pi) = peers.full_peers.get_mut(&peer_id) {
pi.head = head;
pi.head_epoch()
} else {
let pi = PeerInfo::new(head);
let head_epoch = pi.head_epoch();
peers.full_peers.insert(peer_id, pi);
metrics::FULL_PEERS.set(peers.full_peers.len() as _);
head_epoch
};
metrics::PEER_TIPSET_EPOCH
.get_or_create(&metrics::PeerLabel::new(peer_id))
.set(head_epoch.unwrap_or(-1));
}

/// Gets the head epoch of a peer
pub fn get_peer_head_epoch(&self, peer_id: &PeerId) -> Option<i64> {
let peers = self.peers.read();
peers.full_peers.get(peer_id).and_then(|pi| pi.head_epoch())
}

/// Returns true if peer is not marked as bad or not already in set.
pub fn is_peer_new(&self, peer_id: &PeerId) -> bool {
let peers = self.peers.read();
!peers.bad_peers.contains(peer_id) && !peers.full_peers.contains_key(peer_id)
}

/// Mark peer as active even if we haven't communicated with it yet.
#[cfg(test)]
pub fn touch_peer(&self, peer_id: &PeerId) {
let mut peers = self.peers.write();
peers.full_peers.entry(*peer_id).or_default();
}

/// Sort peers based on a score function with the success rate and latency
/// of requests.
pub(in crate::libp2p) fn sorted_peers(&self) -> Vec<PeerId> {
let peer_lk = self.peers.read();
let average_time = self.avg_global_time.read();
let mut n_stateful = 0;
let mut peers: Vec<_> = peer_lk
.full_peers
.iter()
.map(|(&p, info)| {
let is_stateful = info.head_epoch() != Some(0);
if is_stateful {
n_stateful += 1;
}

let cost = if info.successes + info.failures > 0 {
// Calculate cost based on fail rate and latency
// Note that when `success` is zero, the result is `inf`
Expand All @@ -161,32 +110,14 @@ impl PeerManager {
// There have been no failures or successes
average_time.as_secs_f64() * NEW_PEER_MUL
};
(p, is_stateful, cost)
(p, cost)
})
.collect();

// Unstable sort because hashmap iter order doesn't need to be preserved.
peers.sort_unstable_by(|(_, _, v1), (_, _, v2)| {
v1.partial_cmp(v2).unwrap_or(Ordering::Equal)
});

// Filter out nodes that are stateless when `n_stateful > 0`
if n_stateful > 0 {
peers
.into_iter()
.filter_map(
|(peer, is_stateful, _)| {
if is_stateful {
Some(peer)
} else {
None
}
},
)
.collect()
} else {
peers.into_iter().map(|(peer, _, _)| peer).collect()
}
peers.sort_unstable_by(|(_, v1), (_, v2)| v1.total_cmp(v2));

peers.into_iter().map(|(peer, _)| peer).collect()
}

/// Return shuffled slice of ordered peers from the peer manager. Ordering
Expand Down Expand Up @@ -228,10 +159,9 @@ impl PeerManager {
if peers.bad_peers.remove(peer) {
metrics::BAD_PEERS.set(peers.bad_peers.len() as _);
};
if let Some(peer_stats) = peers.full_peers.get_mut(peer) {
peer_stats.successes += 1;
log_time(peer_stats, dur);
}
let peer_stats = peers.full_peers.entry(*peer).or_default();
peer_stats.successes += 1;
log_time(peer_stats, dur);
}

/// Logs a failure for the given peer, and updates the average request
Expand All @@ -241,10 +171,9 @@ impl PeerManager {
let mut peers = self.peers.write();
if !peers.bad_peers.contains(peer) {
metrics::PEER_FAILURE_TOTAL.inc();
if let Some(peer_stats) = peers.full_peers.get_mut(peer) {
peer_stats.failures += 1;
log_time(peer_stats, dur);
}
let peer_stats = peers.full_peers.entry(*peer).or_default();
peer_stats.failures += 1;
log_time(peer_stats, dur);
}
}

Expand Down
Loading

0 comments on commit 1c0650b

Please sign in to comment.