Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: rewrite metrics_unify using Rust #1148

Merged
merged 3 commits into from
Dec 31, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ members = [
"extensions/sha256/transpiler",
"extensions/sha256/guest",
"extensions/sha256/tests",
"ci/scripts/metric_unify",
]
exclude = ["crates/sdk/example"]
resolver = "2"
Expand Down
10 changes: 10 additions & 0 deletions ci/scripts/metric_unify/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
[package]
name = "metric_unify"
version = "0.1.0"
edition = "2021"

[dependencies]
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
clap = { version = "4.0", features = ["derive"] }
num-format = "0.4"
362 changes: 362 additions & 0 deletions ci/scripts/metric_unify/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,362 @@
mod types;

use std::collections::HashMap;

use clap::Parser;
use serde::{Deserialize, Serialize};

use crate::types::{Labels, Metric, MetricDb, MetricsFile};

#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
struct Args {
/// Path to the metrics JSON file
#[arg(value_name = "METRICS_JSON")]
metrics_json: String,

/// Path to the aggregation JSON file
#[arg(long, value_name = "AGGREGATION_JSON")]
aggregation_json: Option<String>,
}

#[derive(Debug, Serialize, Deserialize)]
struct AggregationFile {
aggregations: Vec<Aggregation>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
struct Aggregation {
name: String,
group_by: Vec<String>,
metrics: Vec<String>,
operation: String,
}

impl MetricDb {
fn new(metrics_file: &str) -> Result<Self, Box<dyn std::error::Error>> {
let file = std::fs::File::open(metrics_file)?;
let metrics: MetricsFile = serde_json::from_reader(file)?;

let mut db = MetricDb::default();

// Process counters
for entry in metrics.counter {
if entry.value == 0.0 {
continue;
}
let labels = Labels::from(entry.labels);
db.add_to_flat_dict(labels, entry.metric, entry.value);
}

// Process gauges
for entry in metrics.gauge {
let labels = Labels::from(entry.labels);
db.add_to_flat_dict(labels, entry.metric, entry.value);
}

db.separate_by_label_types();

Ok(db)
}

fn add_to_flat_dict(&mut self, labels: Labels, metric: String, value: f64) {
self.flat_dict
.entry(labels)
.or_default()
.push(Metric::new(metric, value));
}

// Custom sorting function that ensures 'group' comes first.
// Other keys are sorted alphabetically.
fn custom_sort_label_keys(label_keys: &mut Vec<String>) {
jonathanpwang marked this conversation as resolved.
Show resolved Hide resolved
// Prioritize 'group' by giving it the lowest possible sort value
label_keys.sort_by_key(|key| {
if key == "group" {
(0, key.clone()) // Lowest priority for 'group'
} else {
(1, key.clone()) // Normal priority for other keys
}
});
}

fn separate_by_label_types(&mut self) {
self.dict_by_label_types.clear();

for (labels, metrics) in &self.flat_dict {
// Get sorted label keys
let mut label_keys: Vec<String> = labels.0.iter().map(|(key, _)| key.clone()).collect();
Self::custom_sort_label_keys(&mut label_keys);

// Create label_values based on sorted keys
let label_dict: HashMap<String, String> = labels.0.iter().cloned().collect();

let label_values: Vec<String> = label_keys
.iter()
.map(|key| label_dict.get(key).unwrap().clone())
.collect();

// Add to dict_by_label_types
self.dict_by_label_types
.entry(label_keys)
.or_default()
.entry(label_values)
.or_default()
.extend(metrics.clone());
}
}

fn generate_markdown_tables(&self) -> String {
let mut markdown_output = String::new();
// Get sorted keys to iterate in consistent order
let mut sorted_keys: Vec<_> = self.dict_by_label_types.keys().cloned().collect();
sorted_keys.sort();

for label_keys in sorted_keys {
let metrics_dict = &self.dict_by_label_types[&label_keys];
let mut metric_names: Vec<String> = metrics_dict
.values()
.flat_map(|metrics| metrics.iter().map(|m| m.name.clone()))
.collect::<std::collections::HashSet<_>>()
.into_iter()
.collect();
metric_names.sort_by(|a, b| b.cmp(a));

// Create table header
let header = format!(
"| {} | {} |",
label_keys.join(" | "),
metric_names.join(" | ")
);

let separator = "| ".to_string()
+ &vec!["---"; label_keys.len() + metric_names.len()].join(" | ")
+ " |";

markdown_output.push_str(&header);
markdown_output.push('\n');
markdown_output.push_str(&separator);
markdown_output.push('\n');

// Fill table rows
for (label_values, metrics) in metrics_dict {
let mut row = String::new();
row.push_str("| ");
row.push_str(&label_values.join(" | "));
row.push_str(" | ");

// Add metric values
for metric_name in &metric_names {
let metric_value = metrics
.iter()
.find(|m| &m.name == metric_name)
.map(|m| Self::format_number(m.value))
.unwrap_or_default();

row.push_str(&format!("{} | ", metric_value));
}

markdown_output.push_str(&row);
markdown_output.push('\n');
}

markdown_output.push('\n');
}

markdown_output
}

fn generate_aggregation_tables(&self, aggregations: &[Aggregation]) -> String {
let mut markdown_output = String::new();
let group_tuple = vec!["group".to_string()];

// Get metrics grouped by "group" label
if let Some(metrics_dict) = self.dict_by_label_types.get(&group_tuple) {
let mut group_to_metrics: HashMap<String, Vec<Metric>> = HashMap::new();

// Collect metrics for each group
for (group_values, metrics) in metrics_dict {
let group_name = &group_values[0];
let agg_metrics: Vec<Metric> = metrics
.iter()
.filter(|metric| aggregations.iter().any(|a| a.name == metric.name))
.cloned()
.collect();

if !agg_metrics.is_empty() {
group_to_metrics
.entry(group_name.clone())
.or_default()
.extend(agg_metrics);
}
}

if !group_to_metrics.is_empty() {
// Get all unique metric names
let mut metric_names: Vec<String> = group_to_metrics
.values()
.flat_map(|metrics| metrics.iter().map(|m| m.name.clone()))
.collect::<std::collections::HashSet<_>>()
.into_iter()
.collect();
metric_names.sort();

// Create table header
let header = format!("| group | {} |", metric_names.join(" | "));
let separator =
format!("| --- | {} |", vec!["---"; metric_names.len()].join(" | "));
markdown_output.push_str(&header);
markdown_output.push('\n');
markdown_output.push_str(&separator);
markdown_output.push('\n');

// Fill table rows
for (group_name, metrics) in group_to_metrics {
let mut row = format!("| {} |", group_name);

for metric_name in &metric_names {
let metric_str = metrics
.iter()
.find(|m| &m.name == metric_name)
.map(|m| format!(" {} |", Self::format_number(m.value)))
.unwrap_or_default();

row.push_str(&metric_str);
}

markdown_output.push_str(&row);
markdown_output.push('\n');
}
markdown_output.push('\n');
}
}

markdown_output
}

fn read_aggregations(
aggregation_file: &str,
) -> Result<Vec<Aggregation>, Box<dyn std::error::Error>> {
let file = std::fs::File::open(aggregation_file)?;
let aggregation_data: AggregationFile = serde_json::from_reader(file)?;
Ok(aggregation_data.aggregations)
}

fn apply_aggregations(&mut self, aggregations: &[Aggregation]) {
for aggregation in aggregations {
let mut group_by_dict: HashMap<Vec<String>, f64> = HashMap::new();

if aggregation.operation == "sum" || aggregation.operation == "unique" {
for (tuple_keys, metrics_dict) in &self.dict_by_label_types {
// Skip if not all group_by keys are present in tuple_keys
if !aggregation
.group_by
.iter()
.all(|key| tuple_keys.contains(key))
{
continue;
}

for (tuple_values, metrics) in metrics_dict {
// Create a mapping from label keys to values
let label_dict: HashMap<_, _> =
tuple_keys.iter().zip(tuple_values.iter()).collect();

// Extract values for group_by keys
let group_by_values: Vec<String> = aggregation
.group_by
.iter()
.map(|key| label_dict[key].clone())
.collect();

// Process metrics
for metric in metrics {
if aggregation.metrics.contains(&metric.name) {
match aggregation.operation.as_str() {
"sum" => {
*group_by_dict
.entry(group_by_values.clone())
.or_default() += metric.value;
}
"unique" => {
let entry = group_by_dict
.entry(group_by_values.clone())
.or_default();
if *entry != 0.0 && *entry != metric.value {
println!(
"[WARN] Overwriting {}: previous value = {}, new value = {}",
metric.name, entry, metric.value
);
}
*entry = metric.value;
}
_ => unreachable!(),
}
}
}
}
}

// Add aggregated metrics back to the database
for (group_by_values, agg_value) in group_by_dict {
let labels = Labels(
aggregation
.group_by
.iter()
.zip(group_by_values.iter())
.map(|(k, v)| (k.clone(), v.clone()))
.collect(),
);

let metric = Metric::new(aggregation.name.clone(), agg_value);

// Check if metric already exists
if let Some(metrics) = self.flat_dict.get_mut(&labels) {
if let Some(existing_metric) =
metrics.iter_mut().find(|m| m.name == aggregation.name)
{
if existing_metric.value != agg_value {
println!(
"[WARN] Overwriting {}: previous value = {}, new value = {}",
aggregation.name, existing_metric.value, agg_value
);
}
existing_metric.value = agg_value;
} else {
metrics.push(metric);
}
} else {
self.flat_dict.insert(labels, vec![metric]);
}
}
}
}

self.separate_by_label_types();
}
}

fn main() -> Result<(), Box<dyn std::error::Error>> {
let args = Args::parse();
let mut db = MetricDb::new(&args.metrics_json)?;

let mut markdown_output = String::new();

if let Some(aggregation_file) = args.aggregation_json {
let aggregations = MetricDb::read_aggregations(&aggregation_file)?;
db.apply_aggregations(&aggregations);

// Generate aggregation tables
let agg_tables = db.generate_aggregation_tables(&aggregations);
markdown_output.push_str(&agg_tables);

// Add detailed metrics in a collapsible section
markdown_output.push_str("\n<details>\n<summary>Detailed Metrics</summary>\n\n");
markdown_output.push_str(&db.generate_markdown_tables());
markdown_output.push_str("</details>\n\n");
} else {
markdown_output.push_str(&db.generate_markdown_tables());
}

println!("{}", markdown_output);
Ok(())
}
Loading
Loading