Skip to content

Commit

Permalink
feat: add ScopeDurationLogger
Browse files Browse the repository at this point in the history
ScopeDurationLogger is used to log a warning for scopes (eg fn) that
take longer than 1 millisecond (by default) to execute.

The duration threshold can be changed at runtime with env var:
  LOG_SLOW_SCOPE_THRESHOLD=0.000001 cargo run

The default of 1 millisecond is specified as:
  LOG_SLOW_SCOPE_THRESHOLD=0.001

The ScopeDurationLogger is used at most execution entry points, eg:
 * all rpc methods
 * peer message handlers
 * main to peer message handlers
 * peer to main message handlers

The idea is to gain a better understanding of where we are doing
slow processing.  There might be some surprises and low hanging
fruit.
  • Loading branch information
dan-da committed Nov 3, 2024
1 parent e3002a6 commit de96f98
Show file tree
Hide file tree
Showing 5 changed files with 246 additions and 0 deletions.
41 changes: 41 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -446,3 +446,44 @@ pub(crate) fn log_tokio_lock_event(lock_event: sync_tokio::LockEvent) {
}
}
const LOG_TOKIO_LOCK_EVENT_CB: sync_tokio::LockCallbackFn = log_tokio_lock_event;

pub struct ScopeDurationLogger<'a> {
start: Instant,
description: &'a str,
log_slow_fn_threshold: f64,
}
impl<'a> ScopeDurationLogger<'a> {
pub fn new_with_threshold(description: &'a str, log_slow_fn_threshold: f64) -> Self {
Self {
start: Instant::now(),
description,
log_slow_fn_threshold,
}
}

pub fn new(description: &'a str) -> Self {
Self::new_with_threshold(
description,
match env::var("LOG_SLOW_SCOPE_THRESHOLD") {
Ok(t) => t.parse().unwrap(),
Err(_) => 0.001,
},
)
}
}

impl Drop for ScopeDurationLogger<'_> {
fn drop(&mut self) {
let elapsed = self.start.elapsed();
let duration = elapsed.as_secs_f64();

if duration >= self.log_slow_fn_threshold {
let msg = format!(
"executed {} in {} secs. exceeds slow fn threshold of {} secs",
self.description, duration, self.log_slow_fn_threshold,
);

tracing::warn!("{}", msg);
}
}
}
25 changes: 25 additions & 0 deletions src/macros.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,24 @@
/// returns name of current function.
macro_rules! fn_name_bare {
() => {{
fn f() {}
fn type_name_of<T>(_: T) -> &'static str {
std::any::type_name::<T>()
}
type_name_of(f)
.rsplit("::")
.find(|&part| part != "f" && part != "{{closure}}")
.expect("Short function name")
}};
}

/// returns name of current function plus "()"
macro_rules! fn_name {
() => {{
format!("{}()", crate::macros::fn_name_bare!())
}};
}

/// executes an expression, times duration, and emits trace! message
///
/// The trace level is `tracing::Level::TRACE` by default.
Expand Down Expand Up @@ -155,6 +176,10 @@ pub(crate) use duration_async_info;
pub(crate) use duration_debug;
#[allow(unused_imports)]
pub(crate) use duration_info;
#[allow(unused_imports)]
pub(crate) use fn_name;
#[allow(unused_imports)]
pub(crate) use fn_name_bare;

#[cfg(test)]
mod test {
Expand Down
36 changes: 36 additions & 0 deletions src/main_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,10 @@ impl MainLoopHandler {
async fn handle_miner_task_message(&mut self, msg: MinerToMain) -> Result<()> {
match msg {
MinerToMain::NewBlockFound(new_block_info) => {
let _ = crate::ScopeDurationLogger::new(
&(crate::macros::fn_name!() + "::MinerToMain::NewBlockFound"),
);

let new_block = new_block_info.block;

info!("Miner found new block: {}", new_block.kernel.header.height);
Expand Down Expand Up @@ -401,6 +405,10 @@ impl MainLoopHandler {
debug!("Received {} from a peer task", msg.get_type());
match msg {
PeerTaskToMain::NewBlocks(blocks) => {
let _ = crate::ScopeDurationLogger::new(
&(crate::macros::fn_name!() + "::PeerTaskToMain::NewBlocks"),
);

let last_block = blocks.last().unwrap().to_owned();
{
// The peer tasks also check this condition, if block is more canonical than current
Expand Down Expand Up @@ -475,6 +483,10 @@ impl MainLoopHandler {
claimed_max_height,
claimed_max_pow_family,
)) => {
let _ = crate::ScopeDurationLogger::new(
&(crate::macros::fn_name!() + "::PeerTaskToMain::AddPeerMaxBlockHeight"),
);

let claimed_state =
PeerSynchronizationState::new(claimed_max_height, claimed_max_pow_family);
main_loop_state
Expand All @@ -501,6 +513,10 @@ impl MainLoopHandler {
}
}
PeerTaskToMain::RemovePeerMaxBlockHeight(socket_addr) => {
let _ = crate::ScopeDurationLogger::new(
&(crate::macros::fn_name!() + "::PeerTaskToMain::RemovePeerMaxBlockHeight"),
);

debug!(
"Removing max block height from sync data structure for peer {}",
socket_addr
Expand All @@ -526,6 +542,10 @@ impl MainLoopHandler {
}
}
PeerTaskToMain::PeerDiscoveryAnswer((pot_peers, reported_by, distance)) => {
let _ = crate::ScopeDurationLogger::new(
&(crate::macros::fn_name!() + "::PeerTaskToMain::PeerDiscoveryAnswer"),
);

let max_peers = self.global_state_lock.cli().max_peers;
for pot_peer in pot_peers {
main_loop_state.potential_peers.add(
Expand All @@ -538,6 +558,10 @@ impl MainLoopHandler {
}
}
PeerTaskToMain::Transaction(pt2m_transaction) => {
let _ = crate::ScopeDurationLogger::new(
&(crate::macros::fn_name!() + "::PeerTaskToMain::Transaction"),
);

debug!(
"`peer_loop` received following transaction from peer. {} inputs, {} outputs. Synced to mutator set hash: {}",
pt2m_transaction.transaction.kernel.inputs.len(),
Expand Down Expand Up @@ -1086,6 +1110,8 @@ impl MainLoopHandler {

// Handle peer discovery
_ = &mut peer_discovery_timer => {
let _ = crate::ScopeDurationLogger::new(&(crate::macros::fn_name!() + "::select::peer_discovery_timer"));

// Check number of peers we are connected to and connect to more peers
// if needed.
debug!("Timer: peer discovery job");
Expand All @@ -1097,6 +1123,8 @@ impl MainLoopHandler {

// Handle synchronization (i.e. batch-downloading of blocks)
_ = &mut block_sync_timer => {
let _ = crate::ScopeDurationLogger::new(&(crate::macros::fn_name!() + "::select::block_sync_timer"));

trace!("Timer: block-synchronization job");
self.block_sync(&mut main_loop_state).await?;

Expand All @@ -1106,6 +1134,8 @@ impl MainLoopHandler {

// Handle mempool cleanup, i.e. removing stale/too old txs from mempool
_ = &mut mempool_cleanup_timer => {
let _ = crate::ScopeDurationLogger::new(&(crate::macros::fn_name!() + "::select::mempool_cleanup_timer"));

debug!("Timer: mempool-cleaner job");
self.global_state_lock.lock_guard_mut().await.mempool_prune_stale_transactions().await;

Expand All @@ -1115,6 +1145,8 @@ impl MainLoopHandler {

// Handle incoming UTXO notification cleanup, i.e. removing stale/too old UTXO notification from pool
_ = &mut utxo_notification_cleanup_timer => {
let _ = crate::ScopeDurationLogger::new(&(crate::macros::fn_name!() + "::select::utxo_notification_cleanup_timer"));

debug!("Timer: UTXO notification pool cleanup job");

// Danger: possible loss of funds.
Expand All @@ -1131,6 +1163,8 @@ impl MainLoopHandler {

// Handle membership proof resynchronization
_ = &mut mp_resync_timer => {
let _ = crate::ScopeDurationLogger::new(&(crate::macros::fn_name!() + "::select::mp_resync_timer"));

debug!("Timer: Membership proof resync job");
self.global_state_lock.resync_membership_proofs().await?;

Expand All @@ -1139,6 +1173,8 @@ impl MainLoopHandler {

// Check if it's time to run the proof upgrader
_ = &mut tx_proof_upgrade_timer => {
let _ = crate::ScopeDurationLogger::new(&(crate::macros::fn_name!() + "::select::tx_upgrade_proof_timer"));

trace!("Timer: tx-proof-upgrader");
self.proof_upgrader(&mut main_loop_state).await?;

Expand Down
Loading

0 comments on commit de96f98

Please sign in to comment.