Skip to content

Commit

Permalink
Remove periodic tick and make all metrics in chain sync and syncing e…
Browse files Browse the repository at this point in the history
…ngine reactive
  • Loading branch information
nazar-pc committed Aug 19, 2024
1 parent 9c6fb22 commit 9135bb1
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 65 deletions.
2 changes: 1 addition & 1 deletion substrate/client/network/sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ schnellru = { workspace = true }
smallvec = { workspace = true, default-features = true }
thiserror = { workspace = true }
tokio-stream = { workspace = true }
tokio = { features = ["macros", "time"], workspace = true, default-features = true }
tokio = { features = ["macros"], workspace = true, default-features = true }
fork-tree = { workspace = true, default-features = true }
prometheus-endpoint = { workspace = true, default-features = true }
sc-client-api = { workspace = true, default-features = true }
Expand Down
37 changes: 8 additions & 29 deletions substrate/client/network/sync/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ use prometheus_endpoint::{
};
use prost::Message;
use schnellru::{ByLength, LruMap};
use tokio::time::{Interval, MissedTickBehavior};

use sc_client_api::{BlockBackend, HeaderBackend, ProofProvider};
use sc_consensus::{import_queue::ImportQueueService, IncomingBlock};
Expand Down Expand Up @@ -93,9 +92,6 @@ use std::{
},
};

/// Interval at which we perform time based maintenance
const TICK_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(1100);

/// Maximum number of known block hashes to keep for a peer.
const MAX_KNOWN_BLOCKS: usize = 1024; // ~32kb per peer + LruHashSet overhead

Expand Down Expand Up @@ -219,9 +215,6 @@ pub struct SyncingEngine<B: BlockT, Client> {
/// Set of channels for other protocols that have subscribed to syncing events.
event_streams: Vec<TracingUnboundedSender<SyncEvent>>,

/// Interval at which we call `tick`.
tick_timeout: Interval,

/// All connected peers. Contains both full and light node peers.
peers: HashMap<PeerId, Peer<B>>,

Expand Down Expand Up @@ -436,12 +429,6 @@ where
let max_out_peers = net_config.network_config.default_peers_set.out_peers;
let max_in_peers = (max_full_peers - max_out_peers) as usize;

let tick_timeout = {
let mut interval = tokio::time::interval(TICK_TIMEOUT);
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
interval
};

Ok((
Self {
roles,
Expand Down Expand Up @@ -469,7 +456,6 @@ where
max_in_peers,
event_streams: Vec::new(),
notification_service,
tick_timeout,
peer_store_handle,
metrics: if let Some(r) = metrics_registry {
match Metrics::register(r, is_major_syncing.clone()) {
Expand All @@ -493,15 +479,6 @@ where
))
}

/// Report Prometheus metrics.
pub fn report_metrics(&self) {
if let Some(metrics) = &self.metrics {
let n = u64::try_from(self.peers.len()).unwrap_or(std::u64::MAX);
metrics.peers.set(n);
}
self.strategy.report_metrics();
}

fn update_peer_info(
&mut self,
peer_id: &PeerId,
Expand Down Expand Up @@ -628,7 +605,6 @@ where
pub async fn run(mut self) {
loop {
tokio::select! {
_ = self.tick_timeout.tick() => self.perform_periodic_actions(),
command = self.service_rx.select_next_some() =>
self.process_service_command(command),
notification_event = self.notification_service.next_event() => match notification_event {
Expand Down Expand Up @@ -757,10 +733,6 @@ where
Ok(())
}

fn perform_periodic_actions(&mut self) {
self.report_metrics();
}

fn process_service_command(&mut self, command: ToServiceCommand<B>) {
match command {
ToServiceCommand::SetSyncForkRequest(peers, hash, number) => {
Expand Down Expand Up @@ -922,6 +894,9 @@ where
log::debug!(target: LOG_TARGET, "{peer_id} does not exist in `SyncingEngine`");
return
};
if let Some(metrics) = &self.metrics {
metrics.peers.dec();
}

if self.important_peers.contains(&peer_id) {
log::warn!(target: LOG_TARGET, "Reserved peer {peer_id} disconnected");
Expand Down Expand Up @@ -1097,7 +1072,11 @@ where

log::debug!(target: LOG_TARGET, "Connected {peer_id}");

self.peers.insert(peer_id, peer);
if self.peers.insert(peer_id, peer).is_none() {
if let Some(metrics) = &self.metrics {
metrics.peers.inc();
}
}
self.peer_store_handle.set_peer_role(&peer_id, status.roles.into());

if self.default_peers_set_no_slot_peers.contains(&peer_id) {
Expand Down
7 changes: 0 additions & 7 deletions substrate/client/network/sync/src/strategy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -455,13 +455,6 @@ where
self.chain_sync.as_ref().map_or(0, |chain_sync| chain_sync.num_sync_requests())
}

/// Report Prometheus metrics
pub fn report_metrics(&self) {
if let Some(ref chain_sync) = self.chain_sync {
chain_sync.report_metrics();
}
}

/// Let `WarpSync` know about target block header
pub fn set_warp_sync_target_block_header(
&mut self,
Expand Down
83 changes: 55 additions & 28 deletions substrate/client/network/sync/src/strategy/chain_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -665,7 +665,13 @@ where

self.fork_targets
.entry(*hash)
.or_insert_with(|| ForkTarget { number, peers: Default::default(), parent_hash: None })
.or_insert_with(|| {
if let Some(metrics) = &self.metrics {
metrics.fork_targets.inc();
}

ForkTarget { number, peers: Default::default(), parent_hash: None }
})
.peers
.extend(peers);
}
Expand Down Expand Up @@ -872,10 +878,16 @@ where
);
self.fork_targets
.entry(peer.best_hash)
.or_insert_with(|| ForkTarget {
number: peer.best_number,
parent_hash: None,
peers: Default::default(),
.or_insert_with(|| {
if let Some(metrics) = &self.metrics {
metrics.fork_targets.inc();
}

ForkTarget {
number: peer.best_number,
parent_hash: None,
peers: Default::default(),
}
})
.peers
.insert(*peer_id);
Expand Down Expand Up @@ -1115,10 +1127,16 @@ where
);
self.fork_targets
.entry(hash)
.or_insert_with(|| ForkTarget {
number,
parent_hash: Some(*announce.header.parent_hash()),
peers: Default::default(),
.or_insert_with(|| {
if let Some(metrics) = &self.metrics {
metrics.fork_targets.inc();
}

ForkTarget {
number,
parent_hash: Some(*announce.header.parent_hash()),
peers: Default::default(),
}
})
.peers
.insert(peer_id);
Expand Down Expand Up @@ -1150,6 +1168,9 @@ where
target.peers.remove(peer_id);
!target.peers.is_empty()
});
if let Some(metrics) = &self.metrics {
metrics.fork_targets.set(self.fork_targets.len().try_into().unwrap_or(u64::MAX));
}

let blocks = self.ready_blocks();

Expand All @@ -1158,18 +1179,6 @@ where
}
}

/// Report prometheus metrics.
pub fn report_metrics(&self) {
if let Some(metrics) = &self.metrics {
metrics
.fork_targets
.set(self.fork_targets.len().try_into().unwrap_or(std::u64::MAX));
metrics
.queued_blocks
.set(self.queue_blocks.len().try_into().unwrap_or(std::u64::MAX));
}
}

/// Returns the median seen block number.
fn median_seen(&self) -> Option<NumberFor<B>> {
let mut best_seens = self.peers.values().map(|p| p.best_number).collect::<Vec<_>>();
Expand Down Expand Up @@ -1235,6 +1244,11 @@ where
self.on_block_queued(h, n)
}
self.queue_blocks.extend(new_blocks.iter().map(|b| b.hash));
if let Some(metrics) = &self.metrics {
metrics
.queued_blocks
.set(self.queue_blocks.len().try_into().unwrap_or(u64::MAX));
}

self.actions.push(ChainSyncAction::ImportBlocks { origin, blocks: new_blocks })
}
Expand All @@ -1251,6 +1265,9 @@ where
/// through all peers to update our view of their state as well.
fn on_block_queued(&mut self, hash: &B::Hash, number: NumberFor<B>) {
if self.fork_targets.remove(hash).is_some() {
if let Some(metrics) = &self.metrics {
metrics.fork_targets.dec();
}
trace!(target: LOG_TARGET, "Completed fork sync {hash:?}");
}
if let Some(gap_sync) = &mut self.gap_sync {
Expand Down Expand Up @@ -1520,12 +1537,13 @@ where
std::cmp::min(self.best_queued_number, self.client.info().finalized_number);
let best_queued = self.best_queued_number;
let client = &self.client;
let queue = &self.queue_blocks;
let queue_blocks = &self.queue_blocks;
let allowed_requests = self.allowed_requests.take();
let max_parallel = if is_major_syncing { 1 } else { self.max_parallel_downloads };
let max_blocks_per_request = self.max_blocks_per_request;
let gap_sync = &mut self.gap_sync;
let disconnected_peers = &mut self.disconnected_peers;
let metrics = self.metrics.as_ref();
self.peers
.iter_mut()
.filter_map(move |(&id, peer)| {
Expand All @@ -1545,7 +1563,7 @@ where
MAX_BLOCKS_TO_LOOK_BACKWARDS.into() &&
best_queued < peer.best_number &&
peer.common_number < last_finalized &&
queue.len() <= MAJOR_SYNC_BLOCKS.into()
queue_blocks.len() <= MAJOR_SYNC_BLOCKS.into()
{
trace!(
target: LOG_TARGET,
Expand Down Expand Up @@ -1588,13 +1606,14 @@ where
last_finalized,
attrs,
|hash| {
if queue.contains(hash) {
if queue_blocks.contains(hash) {
BlockStatus::Queued
} else {
client.block_status(*hash).unwrap_or(BlockStatus::Unknown)
}
},
max_blocks_per_request,
metrics,
) {
trace!(target: LOG_TARGET, "Downloading fork {hash:?} from {id}");
peer.state = PeerSyncState::DownloadingStale(hash);
Expand Down Expand Up @@ -1735,7 +1754,11 @@ where

let mut has_error = false;
for (_, hash) in &results {
self.queue_blocks.remove(hash);
if self.queue_blocks.remove(hash) {
if let Some(metrics) = &self.metrics {
metrics.queued_blocks.dec();
}
}
self.blocks.clear_queued(hash);
if let Some(gap_sync) = &mut self.gap_sync {
gap_sync.blocks.clear_queued(hash);
Expand Down Expand Up @@ -2065,14 +2088,15 @@ fn peer_gap_block_request<B: BlockT>(
/// Get pending fork sync targets for a peer.
fn fork_sync_request<B: BlockT>(
id: &PeerId,
targets: &mut HashMap<B::Hash, ForkTarget<B>>,
fork_targets: &mut HashMap<B::Hash, ForkTarget<B>>,
best_num: NumberFor<B>,
finalized: NumberFor<B>,
attributes: BlockAttributes,
check_block: impl Fn(&B::Hash) -> BlockStatus,
max_blocks_per_request: u32,
metrics: Option<&Metrics>,
) -> Option<(B::Hash, BlockRequest<B>)> {
targets.retain(|hash, r| {
fork_targets.retain(|hash, r| {
if r.number <= finalized {
trace!(
target: LOG_TARGET,
Expand All @@ -2093,7 +2117,10 @@ fn fork_sync_request<B: BlockT>(
}
true
});
for (hash, r) in targets {
if let Some(metrics) = metrics {
metrics.fork_targets.set(fork_targets.len().try_into().unwrap_or(u64::MAX));
}
for (hash, r) in fork_targets {
if !r.peers.contains(&id) {
continue
}
Expand Down

0 comments on commit 9135bb1

Please sign in to comment.