From 27d29ace30681fb4029416fbe2c9896e7246b5a2 Mon Sep 17 00:00:00 2001 From: Santiago Carmuega Date: Fri, 24 Mar 2023 02:30:10 +0100 Subject: [PATCH] chore: Improve network tracing messages (#237) --- pallas-miniprotocols/src/blockfetch/client.rs | 6 ++++++ pallas-miniprotocols/src/chainsync/client.rs | 3 --- pallas-upstream/src/blockfetch.rs | 9 ++++++--- pallas-upstream/src/plexer.rs | 8 +++++++- 4 files changed, 19 insertions(+), 7 deletions(-) diff --git a/pallas-miniprotocols/src/blockfetch/client.rs b/pallas-miniprotocols/src/blockfetch/client.rs index 1ec3ddf7..0ba207e6 100644 --- a/pallas-miniprotocols/src/blockfetch/client.rs +++ b/pallas-miniprotocols/src/blockfetch/client.rs @@ -1,6 +1,7 @@ use pallas_codec::Fragment; use pallas_multiplexer::agents::{Channel, ChannelBuffer, ChannelError}; use thiserror::Error; +use tracing::{debug, info, warn}; use crate::common::Point; @@ -125,10 +126,12 @@ where pub fn recv_while_busy(&mut self) -> Result { match self.recv_message()? { Message::StartBatch => { + info!("batch start"); self.0 = State::Streaming; Ok(Some(())) } Message::NoBlocks => { + warn!("no blocks"); self.0 = State::Idle; Ok(None) } @@ -138,6 +141,7 @@ where pub fn request_range(&mut self, range: Range) -> Result { self.send_request_range(range)?; + debug!("range requested"); self.recv_while_busy() } @@ -157,6 +161,7 @@ where .ok_or(Error::NoBlocks)?; let body = self.recv_while_streaming()?.ok_or(Error::InvalidInbound)?; + debug!("body received"); match self.recv_while_streaming()? { Some(_) => Err(Error::InvalidInbound), @@ -170,6 +175,7 @@ where let mut all = vec![]; while let Some(block) = self.recv_while_streaming()? { + debug!("body received"); all.push(block); } diff --git a/pallas-miniprotocols/src/chainsync/client.rs b/pallas-miniprotocols/src/chainsync/client.rs index 8a93e573..5d6e6096 100644 --- a/pallas-miniprotocols/src/chainsync/client.rs +++ b/pallas-miniprotocols/src/chainsync/client.rs @@ -149,7 +149,6 @@ where } } - #[instrument(skip_all)] pub fn find_intersect(&mut self, points: Vec) -> Result { self.send_find_intersect(points)?; self.recv_intersect_response() @@ -195,7 +194,6 @@ where } } - #[instrument(skip_all)] pub fn request_next(&mut self) -> Result, Error> { debug!("requesting next block"); @@ -212,7 +210,6 @@ where point.ok_or(Error::IntersectionNotFound) } - #[instrument(skip_all)] pub fn intersect_tip(&mut self) -> Result { let (_, Tip(point, _)) = self.find_intersect(vec![Point::Origin])?; diff --git a/pallas-upstream/src/blockfetch.rs b/pallas-upstream/src/blockfetch.rs index 1097f648..107bf525 100644 --- a/pallas-upstream/src/blockfetch.rs +++ b/pallas-upstream/src/blockfetch.rs @@ -1,5 +1,5 @@ use gasket::messaging::SendAdapter; -use tracing::{debug, error, instrument}; +use tracing::{error, info, instrument}; use pallas_crypto::hash::Hash; use pallas_miniprotocols::blockfetch; @@ -39,14 +39,16 @@ where } } - #[instrument(skip(self))] + #[instrument(skip(self), fields(slot, %hash))] fn fetch_block(&mut self, slot: u64, hash: Hash<32>) -> Result, gasket::error::Error> { + info!("fetching block"); + match self .client .fetch_single(Point::Specific(slot, hash.to_vec())) { Ok(x) => { - debug!("block fetch succeded"); + info!("block fetch succeeded"); Ok(x) } Err(blockfetch::Error::ChannelError(x)) => { @@ -77,6 +79,7 @@ where let msg = match msg.payload { ChainSyncEvent::RollForward(s, h) => { let body = self.fetch_block(s, h)?; + self.block_count.inc(1); BlockFetchEvent::RollForward(s, h, body) } ChainSyncEvent::Rollback(x) => BlockFetchEvent::Rollback(x), diff --git a/pallas-upstream/src/plexer.rs b/pallas-upstream/src/plexer.rs index 25705447..dc47f01a 100644 --- a/pallas-upstream/src/plexer.rs +++ b/pallas-upstream/src/plexer.rs @@ -92,6 +92,7 @@ pub struct Worker { channel3_out: Option, demuxer: Option>, muxer: Option>, + ops_count: gasket::metrics::Counter, } impl Worker { @@ -110,6 +111,7 @@ impl Worker { channel3_out, demuxer: None, muxer: None, + ops_count: Default::default(), } } @@ -141,7 +143,9 @@ impl Worker { impl gasket::runtime::Worker for Worker { fn metrics(&self) -> gasket::metrics::Registry { // TODO: define networking metrics (bytes in / out, etc) - gasket::metrics::Builder::new().build() + gasket::metrics::Builder::new() + .with_counter("ops_count", &self.ops_count) + .build() } fn bootstrap(&mut self) -> Result<(), gasket::error::Error> { @@ -196,6 +200,8 @@ impl gasket::runtime::Worker for Worker { mux_res.unwrap()?; demux_res.unwrap()?; + self.ops_count.inc(1); + Ok(gasket::runtime::WorkOutcome::Partial) } }