Skip to content

Commit

Permalink
Track requested peer counts
Browse files Browse the repository at this point in the history
  • Loading branch information
koute committed Feb 17, 2022
1 parent 489fe01 commit 9f1c870
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 16 deletions.
6 changes: 3 additions & 3 deletions client/network/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -465,10 +465,10 @@ impl<B: BlockT> Protocol<B> {
self.behaviour.open_peers()
}

/// Returns the list of all the peers that the peerset currently requests us to be connected
/// Returns the number of all the peers that the peerset currently requests us to be connected
/// to on the default set.
pub fn requested_peers(&self) -> impl Iterator<Item = &PeerId> {
self.behaviour.requested_peers(HARDCODED_PEERSETS_SYNC)
pub fn requested_peers_count(&self) -> usize {
self.behaviour.requested_peers_count(HARDCODED_PEERSETS_SYNC)
}

/// Returns the number of discovered nodes that we keep in memory.
Expand Down
83 changes: 74 additions & 9 deletions client/network/src/protocol/notifications/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@ pub struct Notifications {
/// List of peers in our state.
peers: FnvHashMap<(PeerId, sc_peerset::SetId), PeerState>,

/// A map containing the number of all the peers that the peerset currently requests us to be
/// connected to.
requested_peer_counts: FnvHashMap<sc_peerset::SetId, usize>,

/// The elements in `peers` occasionally contain `Delay` objects that we would normally have
/// to be polled one by one. In order to avoid doing so, as an optimization, every `Delay` is
/// instead put inside of `delays` and reference by a [`DelayId`]. This stream
Expand Down Expand Up @@ -363,6 +367,32 @@ pub enum NotificationsOut {
},
}

fn increment_requested_peer_counts(
requested_peer_counts: &mut FnvHashMap<sc_peerset::SetId, usize>,
set_id: sc_peerset::SetId,
) {
*requested_peer_counts.entry(set_id).or_insert(0) += 1;
}

fn decrement_requested_peer_counts(
requested_peer_counts: &mut FnvHashMap<sc_peerset::SetId, usize>,
set_id: sc_peerset::SetId,
) {
match requested_peer_counts.entry(set_id) {
Entry::Occupied(mut entry) => {
let value = entry.get_mut();
if *value == 1 {
entry.remove();
} else {
*value -= 1;
}
},
Entry::Vacant(..) => {
warn!(target: "sub-libp2p", "Underflow of requested peer count for {:?}", set_id)
},
}
}

impl Notifications {
/// Creates a `CustomProtos`.
pub fn new(
Expand All @@ -384,6 +414,7 @@ impl Notifications {
notif_protocols,
peerset,
peers: FnvHashMap::default(),
requested_peer_counts: Default::default(),
delays: Default::default(),
next_delay_id: DelayId(0),
incoming: SmallVec::new(),
Expand Down Expand Up @@ -457,6 +488,7 @@ impl Notifications {
} else {
timer_deadline
});
decrement_requested_peer_counts(&mut self.requested_peer_counts, set_id);
*entry.into_mut() = PeerState::Disabled { connections, backoff_until }
},

Expand Down Expand Up @@ -506,6 +538,7 @@ impl Notifications {
.any(|(_, s)| matches!(s, ConnectionState::Opening)));

let backoff_until = ban.map(|dur| Instant::now() + dur);
decrement_requested_peer_counts(&mut self.requested_peer_counts, set_id);
*entry.into_mut() = PeerState::Disabled { connections, backoff_until }
},

Expand Down Expand Up @@ -560,15 +593,18 @@ impl Notifications {
}
}

/// Returns the list of all the peers that the peerset currently requests us to be connected to.
pub fn requested_peers<'a>(
&'a self,
set_id: sc_peerset::SetId,
) -> impl Iterator<Item = &'a PeerId> + 'a {
self.peers
.iter()
.filter(move |((_, set), state)| *set == set_id && state.is_requested())
.map(|((id, _), _)| id)
/// Returns the number of all the peers that the peerset currently requests us to be connected
/// to.
pub fn requested_peers_count(&self, set_id: sc_peerset::SetId) -> usize {
let count = self.requested_peer_counts.get(&set_id).copied().unwrap_or(0);
debug_assert_eq!(
self.peers
.iter()
.filter(move |((_, set), state)| *set == set_id && state.is_requested())
.count(),
count
);
count
}

/// Returns the list of reserved peers.
Expand Down Expand Up @@ -647,6 +683,7 @@ impl Notifications {
condition: DialPeerCondition::Disconnected,
handler,
});
increment_requested_peer_counts(&mut self.requested_peer_counts, set_id);
entry.insert(PeerState::Requested);
return
},
Expand All @@ -665,6 +702,7 @@ impl Notifications {
set_id,
timer_deadline,
);
increment_requested_peer_counts(&mut self.requested_peer_counts, set_id);
*occ_entry.into_mut() =
PeerState::PendingRequest { timer: *timer, timer_deadline: *timer_deadline };
},
Expand All @@ -684,6 +722,7 @@ impl Notifications {
condition: DialPeerCondition::Disconnected,
handler,
});
increment_requested_peer_counts(&mut self.requested_peer_counts, set_id);
*occ_entry.into_mut() = PeerState::Requested;
},

Expand Down Expand Up @@ -711,6 +750,7 @@ impl Notifications {
.boxed(),
);

increment_requested_peer_counts(&mut self.requested_peer_counts, set_id);
*occ_entry.into_mut() = PeerState::DisabledPendingEnable {
connections,
timer: delay_id,
Expand All @@ -737,6 +777,7 @@ impl Notifications {
event: NotifsHandlerIn::Open { protocol_index: set_id.into() },
});
*connec_state = ConnectionState::Opening;
increment_requested_peer_counts(&mut self.requested_peer_counts, set_id);
*occ_entry.into_mut() = PeerState::Enabled { connections };
} else {
// If no connection is available, switch to `DisabledPendingEnable` in order
Expand Down Expand Up @@ -771,6 +812,7 @@ impl Notifications {
.boxed(),
);

increment_requested_peer_counts(&mut self.requested_peer_counts, set_id);
*occ_entry.into_mut() = PeerState::DisabledPendingEnable {
connections,
timer: delay_id,
Expand Down Expand Up @@ -813,6 +855,7 @@ impl Notifications {
*connec_state = ConnectionState::Opening;
}

increment_requested_peer_counts(&mut self.requested_peer_counts, set_id);
*occ_entry.into_mut() = PeerState::Enabled { connections };
},

Expand Down Expand Up @@ -870,6 +913,7 @@ impl Notifications {
trace!(target: "sub-libp2p",
"PSM => Drop({}, {:?}): Interrupting pending enabling.",
entry.key().0, set_id);
decrement_requested_peer_counts(&mut self.requested_peer_counts, set_id);
*entry.into_mut() =
PeerState::Disabled { connections, backoff_until: Some(timer_deadline) };
},
Expand Down Expand Up @@ -919,6 +963,7 @@ impl Notifications {
*connec_state = ConnectionState::Closing;
}

decrement_requested_peer_counts(&mut self.requested_peer_counts, set_id);
*entry.into_mut() = PeerState::Disabled { connections, backoff_until: None }
},

Expand All @@ -929,13 +974,17 @@ impl Notifications {
// well at the same time.
trace!(target: "sub-libp2p", "PSM => Drop({}, {:?}): Not yet connected.",
entry.key().0, set_id);

decrement_requested_peer_counts(&mut self.requested_peer_counts, set_id);
entry.remove();
},

// PendingRequest => Backoff
PeerState::PendingRequest { timer, timer_deadline } => {
trace!(target: "sub-libp2p", "PSM => Drop({}, {:?}): Not yet connected",
entry.key().0, set_id);

decrement_requested_peer_counts(&mut self.requested_peer_counts, set_id);
*entry.into_mut() = PeerState::Backoff { timer, timer_deadline }
},

Expand Down Expand Up @@ -1010,6 +1059,7 @@ impl Notifications {
*connec_state = ConnectionState::Opening;
}

increment_requested_peer_counts(&mut self.requested_peer_counts, incoming.set_id);
*state = PeerState::Enabled { connections };
},

Expand Down Expand Up @@ -1230,6 +1280,7 @@ impl NetworkBehaviour for Notifications {
if connections.is_empty() {
trace!(target: "sub-libp2p", "PSM <= Dropped({}, {:?})", peer_id, set_id);
self.peerset.dropped(set_id, *peer_id, DropReason::Unknown);
decrement_requested_peer_counts(&mut self.requested_peer_counts, set_id);
*entry.get_mut() = PeerState::Backoff { timer, timer_deadline };
} else {
*entry.get_mut() =
Expand Down Expand Up @@ -1382,6 +1433,7 @@ impl NetworkBehaviour for Notifications {
.boxed(),
);

decrement_requested_peer_counts(&mut self.requested_peer_counts, set_id);
*entry.get_mut() = PeerState::Backoff {
timer: delay_id,
timer_deadline: Instant::now() + Duration::from_secs(ban_dur),
Expand All @@ -1392,6 +1444,7 @@ impl NetworkBehaviour for Notifications {
trace!(target: "sub-libp2p", "PSM <= Dropped({}, {:?})", peer_id, set_id);
self.peerset.dropped(set_id, *peer_id, DropReason::Unknown);

decrement_requested_peer_counts(&mut self.requested_peer_counts, set_id);
*entry.get_mut() = PeerState::Disabled { connections, backoff_until: None };
} else {
*entry.get_mut() = PeerState::Enabled { connections };
Expand Down Expand Up @@ -1465,6 +1518,10 @@ impl NetworkBehaviour for Notifications {
.boxed(),
);

decrement_requested_peer_counts(
&mut self.requested_peer_counts,
set_id,
);
*entry.into_mut() = PeerState::Backoff {
timer: delay_id,
timer_deadline: now + ban_duration,
Expand Down Expand Up @@ -1751,6 +1808,10 @@ impl NetworkBehaviour for Notifications {
{
trace!(target: "sub-libp2p", "PSM <= Dropped({}, {:?})", source, set_id);
self.peerset.dropped(set_id, source, DropReason::Refused);
decrement_requested_peer_counts(
&mut self.requested_peer_counts,
set_id,
);
*entry.into_mut() =
PeerState::Disabled { connections, backoff_until: None };
} else {
Expand Down Expand Up @@ -1928,6 +1989,10 @@ impl NetworkBehaviour for Notifications {
self.peerset.dropped(set_id, source, DropReason::Refused);

let ban_dur = Uniform::new(5, 10).sample(&mut rand::thread_rng());
decrement_requested_peer_counts(
&mut self.requested_peer_counts,
set_id,
);
*entry.into_mut() = PeerState::Disabled {
connections,
backoff_until: Some(Instant::now() + Duration::from_secs(ban_dur)),
Expand Down
8 changes: 4 additions & 4 deletions client/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2108,10 +2108,10 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
.peerset_num_discovered
.set(this.network_service.behaviour_mut().user_protocol().num_discovered_peers()
as u64);
metrics.peerset_num_requested.set(
this.network_service.behaviour_mut().user_protocol().requested_peers().count()
as u64,
);
metrics
.peerset_num_requested
.set(this.network_service.behaviour_mut().user_protocol().requested_peers_count()
as u64);
metrics.pending_connections.set(
Swarm::network_info(&this.network_service).connection_counters().num_pending()
as u64,
Expand Down

0 comments on commit 9f1c870

Please sign in to comment.