Skip to content

Commit

Permalink
[Refactor] Complete metrics overhaul
Browse files Browse the repository at this point in the history
Metrics got an entire overhaul. Instead of relying on a broken
prometheus library to publish our metrics, we now use the
`tracing` library and with OpenTelemetry that we bind together
then publish into a prometheus library.

Metrics are now mostly derive-macros. This means that the struct
can express what it wants to export and a help text. The library
will choose if it is able to export it.

Tracing now works by calling `.publish()` on the parent structs,
those structs need to call `.publish()` on all the child members
it wishes to publish data about. If a "group" is requested, use
the `group!()` macro, which under-the-hood calls `tracing::span`
with some special labels. At primitive layers, it will call the
`publish!()` macro, which will call `tracing::event!()` macro
under-the-hood with some special fields set. A custom
`tracing::Subscriber` will intercept all the events and spans
and convert them into a json-like object. This object can then
be exported as real json or encoded into other formats like
otel/prometheus.

closes: TraceMachina#1164, TraceMachina#650, TraceMachina#384, TraceMachina#209
towards: TraceMachina#206
  • Loading branch information
allada committed Jul 25, 2024
1 parent f59f8ba commit 7cebcf6
Show file tree
Hide file tree
Showing 75 changed files with 2,770 additions and 1,439 deletions.
318 changes: 318 additions & 0 deletions Cargo.lock

Large diffs are not rendered by default.

14 changes: 14 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ nativelink-service = { path = "nativelink-service" }
nativelink-store = { path = "nativelink-store" }
nativelink-util = { path = "nativelink-util" }
nativelink-worker = { path = "nativelink-worker" }
nativelink-metric = { path = "nativelink-metric" }
nativelink-metric-collector = { path = "nativelink-metric-collector" }

async-lock = "3.3.0"
axum = "0.6.20"
Expand All @@ -58,3 +60,15 @@ tokio-rustls = "0.25.0"
tonic = { version = "0.11.0", features = ["gzip", "tls"] }
tower = "0.4.13"
tracing = "0.1.40"
opentelemetry_sdk = { version = "0.23.0", features = ["metrics"] }
tracing-subscriber = "0.3.18"
tracing-opentelemetry = { version = "0.25.0", features = ["metrics"] }
opentelemetry-stdout = "0.5.0"
opentelemetry_api = { version = "0.20.0", features = ["metrics"] }
opentelemetry = { version = "0.23.0", features = ["metrics"] }
prometheus = "0.13.4"
opentelemetry-prometheus = "0.16.0"
serde_json = "1.0.120"

[dev-dependencies]
nativelink-metric-tests = { path = "nativelink-metric-tests" }
1 change: 1 addition & 0 deletions nativelink-error/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ autobenches = false

[dependencies]
nativelink-proto = { path = "../nativelink-proto" }
nativelink-metric = { path = "../nativelink-metric" }

hex = "0.4.3"
prost = "0.12.4"
Expand Down
7 changes: 7 additions & 0 deletions nativelink-error/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use prost_types::TimestampError;
use serde::{Deserialize, Serialize};
use nativelink_metric::{MetricFieldData, MetricKind, MetricPublishKnownKindData, MetricsComponent};

#[macro_export]
macro_rules! make_err {
Expand Down Expand Up @@ -47,6 +48,12 @@ pub struct Error {
pub messages: Vec<String>,
}

impl MetricsComponent for Error {
fn publish(&self, kind: MetricKind, field_metadata: MetricFieldData) -> Result<MetricPublishKnownKindData, nativelink_metric::Error> {
self.to_string().publish(kind, field_metadata)
}
}

impl Error {
pub fn new(code: Code, msg: String) -> Self {
let mut msgs = Vec::with_capacity(1);
Expand Down
24 changes: 24 additions & 0 deletions nativelink-metric-collector/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
[package]
name = "nativelink-metric-collector"
version = "0.4.0"
edition = "2021"
rust-version = "1.79.0"

[dependencies]
tracing = "0.1.40"
opentelemetry_sdk = { version = "0.23.0", features = ["metrics", "rt-tokio"] }
tracing-subscriber = "0.3.18"
# tracing-opentelemetry = { version = "0.25.0", features = ["metrics"] }
# opentelemetry-stdout = "0.5.0"
# opentelemetry_api = { version = "0.20.0", features = ["metrics"] }
opentelemetry = { version = "0.23.0", features = ["metrics"] }
parking_lot = "0.12.2"
# tokio = { version = "1.37.0" }
nativelink-metric = { path = "../nativelink-metric" }
serde_json = "1.0.120"
serde = "1.0.204"

[dev-dependencies]
nativelink-macro = { path = "../nativelink-macro" }
# nativelink-util = { path = "../nativelink-util" }
nativelink-error = { path = "../nativelink-error" }
8 changes: 8 additions & 0 deletions nativelink-metric-collector/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
pub use tracing_layers::MetricsCollectorLayer;

mod metrics_collection;
mod metrics_visitors;
mod tracing_layers;
mod otel_exporter;

pub use otel_exporter::otel_export;
81 changes: 81 additions & 0 deletions nativelink-metric-collector/src/metrics_collection.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
use std::{
borrow::Cow,
collections::HashMap,
ops::{Deref, DerefMut},
};

use serde::Serialize;

use crate::metrics_visitors::CollectionKind;

#[derive(Debug, Serialize)]
#[serde(untagged)]
pub enum CollectedMetricPrimitiveValue {
Counter(u64),
String(Cow<'static, str>),
}

#[derive(Default, Debug)]
pub struct CollectedMetricPrimitive {
pub value: Option<CollectedMetricPrimitiveValue>,
pub help: String,
pub value_type: CollectionKind,
}

impl Serialize for CollectedMetricPrimitive {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
match &self.value {
Some(CollectedMetricPrimitiveValue::Counter(value)) => {
serializer.serialize_u64(*value)
}
Some(CollectedMetricPrimitiveValue::String(value)) => {
serializer.serialize_str(value)
}
None => serializer.serialize_none(),
}
}
}

pub type CollectedMetricChildren = HashMap<String, CollectedMetrics>;

#[derive(Debug, Serialize)]
#[serde(untagged)]
pub enum CollectedMetrics {
Primitive(CollectedMetricPrimitive),
Component(Box<CollectedMetricChildren>),
}

impl CollectedMetrics {
pub fn new_component() -> Self {
Self::Component(Box::new(CollectedMetricChildren::default()))
}
}

#[derive(Default, Debug, Serialize)]
pub struct RootMetricCollectedMetrics {
#[serde(flatten)]
inner: CollectedMetricChildren,
}

impl RootMetricCollectedMetrics {
pub fn to_json5(&self) -> Result<std::string::String, serde_json::Error> {
serde_json::to_string_pretty(self)
}
}

impl Deref for RootMetricCollectedMetrics {
type Target = CollectedMetricChildren;

fn deref(&self) -> &Self::Target {
&self.inner
}
}

impl DerefMut for RootMetricCollectedMetrics {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.inner
}
}
136 changes: 136 additions & 0 deletions nativelink-metric-collector/src/metrics_visitors.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
use std::{borrow::Cow, fmt::Debug};

use nativelink_metric::MetricKind;
use serde::Serialize;
use tracing::field::{Field, Visit};

use crate::metrics_collection::{CollectedMetricPrimitive, CollectedMetricPrimitiveValue};

#[derive(Default, Debug, Serialize)]
pub enum CollectionKind {
#[default]
Counter = 0,
String = 1,
}

impl From<MetricKind> for CollectionKind {
fn from(kind: MetricKind) -> Self {
match kind {
MetricKind::Counter => CollectionKind::Counter,
MetricKind::String => CollectionKind::String,
_ => CollectionKind::String,
}
}
}

#[derive(Debug)]
enum ValueWithPrimitiveType {
String(String),
U64(u64),
}

impl Default for ValueWithPrimitiveType {
fn default() -> Self {
ValueWithPrimitiveType::U64(0)
}
}

#[derive(Default, Debug)]
pub struct MetricDataVisitor {
pub name: String,
value: ValueWithPrimitiveType,
help: String,
value_type: Option<CollectionKind>,
}

impl From<MetricDataVisitor> for CollectedMetricPrimitive {
fn from(visitor: MetricDataVisitor) -> Self {
let (value, derived_type) = match visitor.value {
ValueWithPrimitiveType::String(s) => {
(CollectedMetricPrimitiveValue::String(Cow::Owned(s)), CollectionKind::String)
},
ValueWithPrimitiveType::U64(u) => {
(CollectedMetricPrimitiveValue::Counter(u), CollectionKind::Counter)
},
};
CollectedMetricPrimitive {
value: Some(value),
help: visitor.help,
value_type: visitor.value_type.unwrap_or(derived_type),
}
}
}

impl Visit for MetricDataVisitor {
// Required method
fn record_debug(&mut self, _field: &Field, _value: &dyn Debug) {}

// Provided methods
fn record_f64(&mut self, field: &Field, value: f64) {
if field.name() == "__value" {
self.value = ValueWithPrimitiveType::String(value.to_string())
}
}
fn record_i64(&mut self, field: &Field, value: i64) {
if field.name() == "__value" {
match u64::try_from(value) {
Ok(v) => self.value = ValueWithPrimitiveType::U64(v),
Err(_) => self.value = ValueWithPrimitiveType::String(value.to_string()),
}
}
}
fn record_u64(&mut self, field: &Field, value: u64) {
match field.name() {
"__value" => self.value = ValueWithPrimitiveType::U64(value),
"__type" => self.value_type = Some(MetricKind::from(value).into()),
"__help" => self.help = value.to_string(),
"__name" => self.name = value.to_string(),
field => panic!("UNKNOWN FIELD {field}"),
}
}
fn record_i128(&mut self, field: &Field, value: i128) {
if field.name() == "__value" {
match u64::try_from(value) {
Ok(v) => self.value = ValueWithPrimitiveType::U64(v),
Err(_) => self.value = ValueWithPrimitiveType::String(value.to_string()),
}
}
}
fn record_u128(&mut self, field: &Field, value: u128) {
if field.name() == "__value" {
match u64::try_from(value) {
Ok(v) => self.value = ValueWithPrimitiveType::U64(v),
Err(_) => self.value = ValueWithPrimitiveType::String(value.to_string()),
}
}
}
fn record_bool(&mut self, field: &Field, value: bool) {
if field.name() == "__value" {
self.value = ValueWithPrimitiveType::U64(u64::from(value));
}
}
fn record_str(&mut self, field: &Field, value: &str) {
match field.name() {
"__value" => self.value = ValueWithPrimitiveType::String(value.to_string()),
"__help" => self.help = value.to_string(),
"__name" => self.name = value.to_string(),
field => panic!("UNKNOWN FIELD {field}"),
}
}
fn record_error(&mut self, _field: &Field, _value: &(dyn std::error::Error + 'static)) {}
}

pub struct SpanFields {
pub name: Cow<'static, str>,
}

impl Visit for SpanFields {
// Required method
fn record_debug(&mut self, _field: &Field, _value: &dyn Debug) {}

fn record_str(&mut self, field: &Field, value: &str) {
if field.name() == "__name" {
self.name = Cow::Owned(value.to_string());
}
}
}
51 changes: 51 additions & 0 deletions nativelink-metric-collector/src/otel_exporter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
use opentelemetry::metrics::Meter;
use tracing::info;

use crate::metrics_collection::{CollectedMetricChildren, CollectedMetricPrimitive, CollectedMetricPrimitiveValue, CollectedMetrics, RootMetricCollectedMetrics};

const MAX_METRIC_NAME_LENGTH: usize = 256;

pub fn otel_export(mut root_prefix: String, meter: &Meter, root_collected_metrics: &RootMetricCollectedMetrics) {
if !root_prefix.is_empty() {
root_prefix.push('_');
}
process_children(&mut root_prefix, meter, &root_collected_metrics);
}

fn process_children(prefix: &mut String, meter: &Meter, children: &CollectedMetricChildren) {
for (name, child) in children {
prefix.push_str(name);
let mut added_prefix_len = name.len();
match child {
CollectedMetrics::Primitive(primitive) => {
process_primitive(prefix, meter, primitive);
}
CollectedMetrics::Component(component) => {
prefix.push('_');
added_prefix_len += 1;
process_children(prefix, meter, component);
}
}
prefix.truncate(prefix.len() - added_prefix_len);
}
}

fn process_primitive(prefix: &mut String, meter: &Meter, primitive: &CollectedMetricPrimitive) {
match &primitive.value {
Some(CollectedMetricPrimitiveValue::Counter(value)) => {
if prefix.len() > MAX_METRIC_NAME_LENGTH {
info!("Metric name longer than 256 characters: {}", prefix);
return;
}
let counter = meter
.u64_counter(prefix.clone())
.with_description(primitive.help.clone())
.init();
counter.add(*value, &[]);
}
Some(CollectedMetricPrimitiveValue::String(_value)) => {
// We don't publish strings in metrics.
}
None => {}
}
}
Loading

0 comments on commit 7cebcf6

Please sign in to comment.