Skip to content

Commit

Permalink
fix: extend metrics (#32)
Browse files Browse the repository at this point in the history
* fix: extend metrics

* fix: namespace gateway metrics

* fix: widen simulated latency band

* instrument all funcs
  • Loading branch information
Arqu authored May 10, 2022
1 parent e94a7e0 commit 34c53da
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 16 deletions.
23 changes: 19 additions & 4 deletions iroh-gateway/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use crate::metrics::*;
use crate::response::ResponseFormat;
use axum::body::Body;
use metrics::{counter, gauge, increment_counter};
use metrics::{counter, gauge, histogram, increment_counter};
use rand::{prelude::StdRng, Rng, SeedableRng};
use std::{fs::File, io::Read, path::Path, time::Duration};

Expand All @@ -17,6 +17,7 @@ impl Client {
Self {}
}

#[tracing::instrument()]
pub async fn get_file_simulated(
&self,
_path: &str,
Expand All @@ -26,7 +27,7 @@ impl Client {
let mut rng: StdRng = SeedableRng::from_entropy();

// some random latency
tokio::time::sleep(Duration::from_millis(rng.gen_range(0..150))).await;
tokio::time::sleep(Duration::from_millis(rng.gen_range(0..650))).await;

tokio::spawn(async move {
let test_path = Path::new("test_files/test_big.txt");
Expand All @@ -41,26 +42,40 @@ impl Client {
increment_counter!(METRICS_CACHE_HIT);
}

let mut f_size: f64 = 0.0;

while let Ok(n) = file.read(&mut buf) {
if first_block {
gauge!(METRICS_TIME_TO_FETCH_FIRST_BLOCK, start_time.elapsed());
histogram!(METRICS_HIST_TTFB, start_time.elapsed());
}
counter!(METRICS_BYTES_FETCHED, n as u64);
counter!(METRICS_BYTES_STREAMED, n as u64);
f_size += n as f64;
gauge!(
METRICS_BITRATE_IN,
f_size / start_time.elapsed().as_secs_f64()
);
if n == 0 {
gauge!(METRICS_TIME_TO_FETCH_FULL_FILE, start_time.elapsed());
gauge!(METRICS_TIME_TO_SERVE_FULL_FILE, start_time.elapsed());
histogram!(METRICS_HIST_TTSERVE, start_time.elapsed());
break;
}
sender
.send_data(axum::body::Bytes::from(buf[..n].to_vec()))
.await
.unwrap();
// todo(arqu): handle sender error
if first_block {
first_block = false;
gauge!(METRICS_TIME_TO_SERVE_FIRST_BLOCK, start_time.elapsed());
}
tokio::time::sleep(Duration::from_millis(rng.gen_range(0..150))).await;
gauge!(
METRICS_BITRATE_OUT,
f_size / start_time.elapsed().as_secs_f64()
);
counter!(METRICS_BYTES_STREAMED, n as u64);
tokio::time::sleep(Duration::from_millis(rng.gen_range(0..250))).await;
}
});

Expand Down
2 changes: 2 additions & 0 deletions iroh-gateway/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ fn get_filename(content_path: &str) -> String {
.unwrap_or_default()
}

#[tracing::instrument()]
fn response(
status_code: StatusCode,
body: BoxBody,
Expand All @@ -295,6 +296,7 @@ fn response(
})
}

#[tracing::instrument()]
fn error(status_code: StatusCode, message: &str) -> Result<GatewayResponse, GatewayError> {
Err(GatewayError {
status_code,
Expand Down
4 changes: 4 additions & 0 deletions iroh-gateway/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ use axum::{
http::StatusCode,
response::{IntoResponse, Response},
};
use metrics::increment_counter;
use serde_json::json;

use crate::metrics::METRICS_FAIL;

#[derive(Debug)]
pub struct GatewayError {
pub status_code: StatusCode,
Expand All @@ -13,6 +16,7 @@ pub struct GatewayError {

impl IntoResponse for GatewayError {
fn into_response(self) -> Response {
increment_counter!(METRICS_FAIL, "code" => self.status_code.as_u16().to_string());
let body = axum::Json(json!({
"code": self.status_code.as_u16(),
"success": false,
Expand Down
36 changes: 24 additions & 12 deletions iroh-gateway/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use git_version::git_version;
use metrics::{describe_counter, describe_gauge, Unit};
use metrics::{describe_counter, describe_gauge, describe_histogram, Unit};

use opentelemetry::trace::{TraceContextExt, TraceId};
use tracing_opentelemetry::OpenTelemetrySpanExt;
Expand All @@ -17,17 +17,21 @@ pub fn metrics_config() -> iroh_metrics::Config {
iroh_metrics::Config::new(service_name, instance_id, build, version, service_env)
}

pub const METRICS_CNT_REQUESTS_TOTAL: &str = "requests_total";
pub const METRICS_TIME_TO_FETCH_FIRST_BLOCK: &str = "time_to_fetch_first_block";
pub const METRICS_TIME_TO_FETCH_FULL_FILE: &str = "time_to_fetch_full_file";
pub const METRICS_TIME_TO_SERVE_FIRST_BLOCK: &str = "time_to_serve_first_block";
pub const METRICS_TIME_TO_SERVE_FULL_FILE: &str = "time_to_serve_full_file";
pub const METRICS_CACHE_HIT: &str = "cache_hit";
pub const METRICS_CACHE_MISS: &str = "cache_miss";
pub const METRICS_BYTES_STREAMED: &str = "bytes_streamed";
pub const METRICS_BYTES_FETCHED: &str = "bytes_fetched";
pub const METRICS_BITRATE_IN: &str = "bitrate_in";
pub const METRICS_BITRATE_OUT: &str = "bitrate_out";
pub const METRICS_CNT_REQUESTS_TOTAL: &str = "gw_requests_total";
pub const METRICS_TIME_TO_FETCH_FIRST_BLOCK: &str = "gw_time_to_fetch_first_block";
pub const METRICS_TIME_TO_FETCH_FULL_FILE: &str = "gw_time_to_fetch_full_file";
pub const METRICS_TIME_TO_SERVE_FIRST_BLOCK: &str = "gw_time_to_serve_first_block";
pub const METRICS_TIME_TO_SERVE_FULL_FILE: &str = "gw_time_to_serve_full_file";
pub const METRICS_CACHE_HIT: &str = "gw_cache_hit";
pub const METRICS_CACHE_MISS: &str = "gw_cache_miss";
pub const METRICS_BYTES_STREAMED: &str = "gw_bytes_streamed";
pub const METRICS_BYTES_FETCHED: &str = "gw_bytes_fetched";
pub const METRICS_BITRATE_IN: &str = "gw_bitrate_in";
pub const METRICS_BITRATE_OUT: &str = "gw_bitrate_out";
pub const METRICS_HIST_TTFB: &str = "gw_hist_time_to_fetch_first_block";
pub const METRICS_HIST_TTSERVE: &str = "gw_hist_time_to_serve_full_file";
pub const METRICS_ERROR: &str = "gw_error_count";
pub const METRICS_FAIL: &str = "gw_fail_count";

pub fn register_counters() {
describe_counter!(
Expand Down Expand Up @@ -77,6 +81,14 @@ pub fn register_counters() {
Unit::KilobitsPerSecond,
"Bitrate of outgoing stream"
);
describe_counter!(METRICS_ERROR, Unit::Count, "Number of errors");
describe_counter!(METRICS_FAIL, Unit::Count, "Number of failed requests");
describe_histogram!(METRICS_HIST_TTFB, Unit::Milliseconds, "Histogram of TTFB");
describe_histogram!(
METRICS_HIST_TTSERVE,
Unit::Milliseconds,
"Histogram of TTSERVE"
);
}

pub fn get_current_trace_id() -> TraceId {
Expand Down

0 comments on commit 34c53da

Please sign in to comment.