Skip to content

Commit

Permalink
chore: Improve network tracing messages (#237)
Browse files Browse the repository at this point in the history
  • Loading branch information
scarmuega authored Mar 24, 2023
1 parent 2243acf commit 27d29ac
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 7 deletions.
6 changes: 6 additions & 0 deletions pallas-miniprotocols/src/blockfetch/client.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use pallas_codec::Fragment;
use pallas_multiplexer::agents::{Channel, ChannelBuffer, ChannelError};
use thiserror::Error;
use tracing::{debug, info, warn};

use crate::common::Point;

Expand Down Expand Up @@ -125,10 +126,12 @@ where
pub fn recv_while_busy(&mut self) -> Result<HasBlocks, Error> {
match self.recv_message()? {
Message::StartBatch => {
info!("batch start");
self.0 = State::Streaming;
Ok(Some(()))
}
Message::NoBlocks => {
warn!("no blocks");
self.0 = State::Idle;
Ok(None)
}
Expand All @@ -138,6 +141,7 @@ where

pub fn request_range(&mut self, range: Range) -> Result<HasBlocks, Error> {
self.send_request_range(range)?;
debug!("range requested");
self.recv_while_busy()
}

Expand All @@ -157,6 +161,7 @@ where
.ok_or(Error::NoBlocks)?;

let body = self.recv_while_streaming()?.ok_or(Error::InvalidInbound)?;
debug!("body received");

match self.recv_while_streaming()? {
Some(_) => Err(Error::InvalidInbound),
Expand All @@ -170,6 +175,7 @@ where
let mut all = vec![];

while let Some(block) = self.recv_while_streaming()? {
debug!("body received");
all.push(block);
}

Expand Down
3 changes: 0 additions & 3 deletions pallas-miniprotocols/src/chainsync/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,6 @@ where
}
}

#[instrument(skip_all)]
pub fn find_intersect(&mut self, points: Vec<Point>) -> Result<IntersectResponse, Error> {
self.send_find_intersect(points)?;
self.recv_intersect_response()
Expand Down Expand Up @@ -195,7 +194,6 @@ where
}
}

#[instrument(skip_all)]
pub fn request_next(&mut self) -> Result<NextResponse<O>, Error> {
debug!("requesting next block");

Expand All @@ -212,7 +210,6 @@ where
point.ok_or(Error::IntersectionNotFound)
}

#[instrument(skip_all)]
pub fn intersect_tip(&mut self) -> Result<Point, Error> {
let (_, Tip(point, _)) = self.find_intersect(vec![Point::Origin])?;

Expand Down
9 changes: 6 additions & 3 deletions pallas-upstream/src/blockfetch.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use gasket::messaging::SendAdapter;
use tracing::{debug, error, instrument};
use tracing::{error, info, instrument};

use pallas_crypto::hash::Hash;
use pallas_miniprotocols::blockfetch;
Expand Down Expand Up @@ -39,14 +39,16 @@ where
}
}

#[instrument(skip(self))]
#[instrument(skip(self), fields(slot, %hash))]
fn fetch_block(&mut self, slot: u64, hash: Hash<32>) -> Result<Vec<u8>, gasket::error::Error> {
info!("fetching block");

match self
.client
.fetch_single(Point::Specific(slot, hash.to_vec()))
{
Ok(x) => {
debug!("block fetch succeded");
info!("block fetch succeeded");
Ok(x)
}
Err(blockfetch::Error::ChannelError(x)) => {
Expand Down Expand Up @@ -77,6 +79,7 @@ where
let msg = match msg.payload {
ChainSyncEvent::RollForward(s, h) => {
let body = self.fetch_block(s, h)?;
self.block_count.inc(1);
BlockFetchEvent::RollForward(s, h, body)
}
ChainSyncEvent::Rollback(x) => BlockFetchEvent::Rollback(x),
Expand Down
8 changes: 7 additions & 1 deletion pallas-upstream/src/plexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ pub struct Worker {
channel3_out: Option<DemuxOutputPort>,
demuxer: Option<Demuxer<GasketEgress>>,
muxer: Option<Muxer<GasketIngress>>,
ops_count: gasket::metrics::Counter,
}

impl Worker {
Expand All @@ -110,6 +111,7 @@ impl Worker {
channel3_out,
demuxer: None,
muxer: None,
ops_count: Default::default(),
}
}

Expand Down Expand Up @@ -141,7 +143,9 @@ impl Worker {
impl gasket::runtime::Worker for Worker {
fn metrics(&self) -> gasket::metrics::Registry {
// TODO: define networking metrics (bytes in / out, etc)
gasket::metrics::Builder::new().build()
gasket::metrics::Builder::new()
.with_counter("ops_count", &self.ops_count)
.build()
}

fn bootstrap(&mut self) -> Result<(), gasket::error::Error> {
Expand Down Expand Up @@ -196,6 +200,8 @@ impl gasket::runtime::Worker for Worker {
mux_res.unwrap()?;
demux_res.unwrap()?;

self.ops_count.inc(1);

Ok(gasket::runtime::WorkOutcome::Partial)
}
}

0 comments on commit 27d29ac

Please sign in to comment.