Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat display jitter #882

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 29 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ tracing-subscriber = { version = "0.3.18", default-features = false, features =
tracing-chrome = "0.7.1"
petgraph = "0.6.4"
csv = "1.3.0"
iso8601-timestamp = "0.2.16"
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we avoid needing this dep? Can humantime be used instead? Note it could be in [dev-dependancies] as well.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see input the difference (RTTx - RTTx-1) - I can easily move to this type of model.
Yes, it should only be needed as a [dev-dependancies].

serde_with = "3.4.0"

# Library dependencies (Linux)
Expand Down
289 changes: 289 additions & 0 deletions src/backend/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,14 @@ pub struct Hop {
extensions: Option<Extensions>,
mean: f64,
m2: f64,
/// The ABS(RTTx - RTTx-n)
jitter: Option<Duration>,
/// The Sequential jitter average calculated for each
javg: Option<f64>,
/// The worst jitter reading recorded
jmax: Option<Duration>,
/// The interval calculation i.e smooth
jinta: f64,
}

impl Hop {
Expand Down Expand Up @@ -219,6 +227,31 @@ impl Hop {
}
}

/// The duration of the jitter probe observed.
pub fn jitter_ms(&self) -> Option<f64> {
self.jitter.map(|j| j.as_secs_f64() * 1000_f64)
}
/// The duration of the jworst probe observed.
pub fn jmax_ms(&self) -> Option<f64> {
self.jmax.map(|x| x.as_secs_f64() * 1000_f64)
}
/// The jitter average duration of all probes.
pub fn javg_ms(&self) -> Option<f64> {
if self.total_recv() > 0 {
self.javg
} else {
None
}
}
/// The jitter interval of all probes.
pub fn jinta(&self) -> Option<f64> {
if self.total_recv() > 0 {
Some(self.jinta)
} else {
None
}
}

/// The last N samples.
pub fn samples(&self) -> &[Duration] {
&self.samples
Expand All @@ -244,6 +277,10 @@ impl Default for Hop {
m2: 0f64,
samples: Vec::default(),
extensions: None,
jitter: None,
javg: None,
jmax: None,
jinta: 0f64,
}
}
}
Expand Down Expand Up @@ -336,6 +373,21 @@ impl TraceData {
let dur = probe.duration();
let dur_ms = dur.as_secs_f64() * 1000_f64;
hop.total_time += dur;
//Before last is set use it to calc jitter
let last_ms = hop.last_ms().unwrap_or_default();
let jitter_ms = (last_ms - dur_ms).abs();
let jitter_dur = Duration::from_secs_f64(jitter_ms / 1000_f64);
hop.jitter = Some(jitter_dur);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is set to be Some unconditionally, I guess we should only set this if hop.last is Some?

let mut javg_ms = hop.javg_ms().unwrap_or_default();
//Welfords online algorithm avg without dataset values.
javg_ms += (jitter_ms - javg_ms) / hop.total_recv as f64;
hop.javg = Some(javg_ms);
// algorithm is from rfc1889, A.8 or rfc3550
hop.jinta += jitter_ms - ((hop.jinta + 8.0) / 16.0);
//Max following logic of hop.worst
hop.jmax = hop
.jmax
.map_or(Some(jitter_dur), |d| Some(d.max(jitter_dur)));
hop.last = Some(dur);
hop.samples.insert(0, dur);
hop.best = hop.best.map_or(Some(dur), |d| Some(d.min(dur)));
Expand Down Expand Up @@ -381,3 +433,240 @@ impl TraceData {
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::backend::trace::{Hop, TraceData};
use iso8601_timestamp::Timestamp;
use serde::{Deserialize, Serialize};
use std::fs::File;
use std::io::Read;
use std::str::FromStr;
use std::time::SystemTime;
use test_case::test_case;
use trippy::tracing::tracer::CompletionReason;
use trippy::tracing::types::{Port, Round, Sequence, TimeToLive, TraceId};
use trippy::tracing::{IcmpPacketType, Probe};

#[derive(Serialize, Deserialize, Debug)]
struct ImportProbe {
sequence: u16,
identifier: u16,
src_port: u16,
dest_port: u16,
ttl: u8,
round: usize,
sent: Option<Timestamp>,
status: String,
host: Option<String>,
received: Option<Timestamp>,
icmp_packet_type: Option<String>,
extensions: Option<String>,
}
impl ImportProbe {
fn sequence(&self) -> Sequence {
Sequence(self.sequence)
}
fn identifier(&self) -> TraceId {
TraceId(self.identifier)
}
fn src_port(&self) -> Port {
Port(self.src_port)
}
fn dest_port(&self) -> Port {
Port(self.dest_port)
}
fn ttl(&self) -> TimeToLive {
TimeToLive(self.ttl)
}
fn round(&self) -> Round {
Round(self.round)
}
fn sent(&self) -> SystemTime {
match SystemTime::try_from(self.sent.unwrap()) {
Ok(st) => st,
Err(_d) => SystemTime::now(),
}
}
fn received(&self) -> Option<SystemTime> {
self.received.map(|r| SystemTime::try_from(r).ok().unwrap())
}
fn status(&self) -> ProbeStatus {
match self.status.as_str() {
"Complete" => ProbeStatus::Complete,
"NotSent" => ProbeStatus::NotSent,
"Awaited" => ProbeStatus::Awaited,
_ => ProbeStatus::Skipped,
}
}
fn host(&self) -> Option<IpAddr> {
self.host
.as_ref()
.map(|h| IpAddr::V4(Ipv4Addr::from_str(h).unwrap()))
}
}
impl From<ImportProbe> for Probe {
fn from(value: ImportProbe) -> Self {
Self {
sequence: value.sequence(),
identifier: value.identifier(),
src_port: value.src_port(),
dest_port: value.dest_port(),
ttl: value.ttl(),
round: value.round(),
sent: Some(value.sent()),
status: value.status(),
host: value.host(),
received: value.received(),
icmp_packet_type: Some(IcmpPacketType::NotApplicable),
extensions: None,
}
}
}
#[derive(Deserialize,Serialize,Debug,Clone)]
struct HopResults {
/// The total probes sent for this hop.
total_sent: usize,
/// The total probes received for this hop.
total_recv: usize,
/// The round trip time for this hop in the current round.
last: Option<Duration>,
/// The best round trip time for this hop across all rounds.
best: Option<Duration>,
/// The worst round trip time for this hop across all rounds.
worst: Option<Duration>,
/// The history of round trip times across the last N rounds.
mean: f64,
m2: f64,
/// The ABS(RTTx - RTTx-n)
jitter: Option<Duration>,
/// The Sequential jitter average calculated for each
javg: Option<f64>,
/// The worst jitter reading recorded
jmax: Option<Duration>,
/// The interval calculation i.e smooth
jinta: f64,
}
impl HopResults {
#[allow(clippy::too_many_arguments)]
fn new(
total_sent: usize,
total_recv: usize,
last: u64,
best: u64,
worst: u64,
mean: f64,
m2: f64,
jitter: u64,
javg: f64,
jmax: u64,
jinta: f64,
) -> Self {
Self {total_sent,
total_recv,
last: Some(Duration::from_millis(last)),
best: Some(Duration::from_millis(best)),
worst: Some(Duration::from_millis(worst)),
mean,
m2,
jitter: Some(Duration::from_millis(jitter)),
javg: Some(javg),
jmax: Some(Duration::from_millis(jmax)),
jinta,
}
}
//TODO: Create combined struct ImportProbe Vec<ImportProbe> & HopResults.
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that is a good idea. Can we use yaml rather than json as well?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After looking at the TUN test code I had the same exact thought, make all test input files the same type.
Consider it done!

//JSON will define the test and expected results.
}
impl PartialEq<&Hop> for HopResults {
fn eq(&self, other: &&Hop) -> bool {
self.last == other.last
&& self.jitter == other.jitter
&& self.jmax == other.jmax
&& self.javg == other.javg
&& format!("{:.2}",self.jinta) == format!("{:.2}", other.jinta)
}
}
#[test_case("base_line.json", &HopResults::new(2,2,700,300,700,0.0,0.0,400,350.0,400,680.28))]
fn test_probe_file(json_file: &str, expected: &HopResults) {
let mut json = String::new();
let mut dir_json_file = "./tests/data/".to_owned();
Copy link
Owner

@fujiapple852 fujiapple852 Jan 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use include_str!() to read these at compile time? See

macro_rules! sim {
($path:expr) => {{
let yaml = include_str!(concat!("../resources/simulation/", $path));
serde_yaml::from_str(yaml)?
}};
}
for an example

dir_json_file.push_str(json_file);
let mut file = File::open(dir_json_file).expect("Failed to open input file");
// Read the network inputs into a string
file.read_to_string(&mut json).expect("Failed to read file");

let mut trace = Trace::new(100);
let import_probes: Vec<ImportProbe> = serde_json::from_str(&json).unwrap();
let probes: Vec<Probe> = import_probes.into_iter().map(Probe::from).collect();
let round = TracerRound::new(&probes, TimeToLive(1), CompletionReason::TargetFound);
trace.update_from_round(&round);
println!("Hop = {:#?}", trace.hops(Trace::default_flow_id())[0]);
let hop: &Hop = &trace.hops(Trace::default_flow_id())[0];
///Check if expected matches results
assert_eq!(expected, &hop );
}
#[test]
//#[ignore = "WIP"]
fn test_probe_raw_list() {
let json = r#"[{
"sequence": 1,
"identifier": 1,
"src_port": 80,
"dest_port": 80,
"ttl": 1,
"round": 1,
"sent": "2023-01-01T12:01:55.100",
"status": "Complete",
"host": "10.1.0.2",
"received": "2023-01-01T12:01:55.400",
"icmp_packet_type": null,
"extensions": null
},{
"sequence": 2,
"identifier": 1,
"src_port": 80,
"dest_port": 80,
"ttl": 1,
"round": 1,
"sent": "2023-01-01T12:01:56.100",
"status": "Complete",
"host": "10.1.0.2",
"received": "2023-01-01T12:01:56.800",
"icmp_packet_type": null,
"extensions": null
}]"#;
let mut trace = Trace::new(100);
let import_probes: Vec<ImportProbe> = serde_json::from_str(json).unwrap();
let probes: Vec<Probe> = import_probes.into_iter().map(Probe::from).collect();
let round = TracerRound::new(&probes, TimeToLive(1), CompletionReason::TargetFound);
trace.update_from_round(&round);
println!("Hop = {:#?}", trace.hops(Trace::default_flow_id())[0]);
assert_eq!(2, trace.hops(Trace::default_flow_id())[0].total_recv);
}

#[test]
fn test_probe_raw_single() {
let json = r#"{
"sequence": 2,
"identifier": 2,
"src_port": 80,
"dest_port": 80,
"ttl": 63,
"round": 2,
"sent": "2022-01-01T12:02:00Z",
"status": "Complete",
"host": "10.1.0.1",
"received": "2022-01-01T12:02:01Z",
"icmp_packet_type": null,
"extensions": null
}"#;

let import_probe: ImportProbe = serde_json::from_str(json).expect("Failed to deserialize JSON");
let probe = Probe::from(import_probe);
let mut td = TraceData::new(2);
td.update_from_probe(&probe);
assert_eq!(probe.sequence.0, 2);
println!("Host: {:?}, Age: {:?}", probe.host, probe.sequence);
}
}
Loading
Loading