Skip to content

Commit

Permalink
Make prometheus configurable at compile time
Browse files Browse the repository at this point in the history
  • Loading branch information
HKalbasi authored and thearossman committed Jan 22, 2025
1 parent abbc176 commit c3d5bad
Show file tree
Hide file tree
Showing 8 changed files with 77 additions and 51 deletions.
14 changes: 10 additions & 4 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,19 @@ aes-gcm = "0.10.3"

# Statistics
tokio = { version = "1.42.0", features = ["full"] }
hyper = { version = "1", features = ["full"] }
hyper-util = { version = "0.1", features = ["full"] }
array-init = "2.0"
prometheus-client = "0.22.3"
http-body-util = "0.1.2"
hyper = { version = "1", features = ["full"], optional = true }
hyper-util = { version = "0.1", features = ["full"], optional = true }
prometheus-client = { version = "0.22.3", optional = true }
http-body-util = { version = "0.1.2", optional = true }

[features]
timing = []
prometheus = [
"dep:hyper",
"dep:hyper-util",
"dep:prometheus-client",
"dep:http-body-util",
]
mlx5 = []
default = []
9 changes: 7 additions & 2 deletions core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@
use crate::lcore::{CoreId, SocketId};

use std::net::Ipv4Addr;
use std::fs;
#[cfg(feature = "prometheus")]
use std::net::{IpAddr, Ipv4Addr};
use std::path::Path;
use std::{fs, net::IpAddr};

use serde::{Deserialize, Serialize};

Expand Down Expand Up @@ -338,6 +339,7 @@ pub struct OnlineConfig {

/// Prometheus metrics exporter server. Defaults to `None`.
#[serde(default = "default_prometheus")]
#[cfg(feature = "prometheus")]
pub prometheus: Option<PrometheusConfig>,

/// List of network interfaces to read from.
Expand Down Expand Up @@ -372,6 +374,7 @@ fn default_monitor() -> Option<MonitorConfig> {
None
}

#[cfg(feature = "prometheus")]
fn default_prometheus() -> Option<PrometheusConfig> {
None
}
Expand Down Expand Up @@ -494,6 +497,7 @@ fn default_log() -> Option<LogConfig> {
/// interval = 1000
/// ```
#[derive(Deserialize, Serialize, Debug, Clone, Copy)]
#[cfg(feature = "prometheus")]
pub struct PrometheusConfig {
/// Listen port for Prometheus metrics.
pub port: u16,
Expand All @@ -503,6 +507,7 @@ pub struct PrometheusConfig {
pub ip: IpAddr,
}

#[cfg(feature = "prometheus")]
fn default_prometheus_ip() -> IpAddr {
IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1))
}
Expand Down
4 changes: 3 additions & 1 deletion core/src/lcore/monitor.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::config::RuntimeConfig;
use crate::dpdk;
use crate::port::{statistics::PortStats, Port, PortId, RxQueue, RxQueueType};
use crate::stats::DPDK_STATS;

use std::collections::{BTreeMap, HashMap};
use std::ffi::CString;
Expand Down Expand Up @@ -136,6 +135,7 @@ impl Monitor {
let delta = curr_ts - prev_ts;
match AggRxStats::collect(&self.ports, &display.keywords) {
Ok(curr_rx) => {
#[cfg(feature = "prometheus")]
curr_rx.update_prometheus_stats();
let nms = delta.as_millis() as f64;
if init {
Expand Down Expand Up @@ -477,7 +477,9 @@ impl AggRxStats {
self.hw_dropped_pkts + self.sw_dropped_pkts
}

#[cfg(feature = "prometheus")]
fn update_prometheus_stats(&self) {
use crate::stats::DPDK_STATS;
DPDK_STATS
.ingress_pkts
.inc_by(self.ingress_pkts - DPDK_STATS.ingress_pkts.get());
Expand Down
12 changes: 8 additions & 4 deletions core/src/lcore/rx_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use crate::dpdk;
use crate::memory::mbuf::Mbuf;
use crate::port::{RxQueue, RxQueueType};
use crate::stats::{
update_thread_local_stats, StatExt, IDLE_CYCLES, IGNORED_BY_PACKET_FILTER_BYTE,
IGNORED_BY_PACKET_FILTER_PKT, TOTAL_BYTE, TOTAL_CYCLES, TOTAL_PKT,
StatExt, IDLE_CYCLES, IGNORED_BY_PACKET_FILTER_BYTE, IGNORED_BY_PACKET_FILTER_PKT, TOTAL_BYTE,
TOTAL_CYCLES, TOTAL_PKT,
};
use crate::subscription::*;

Expand All @@ -24,6 +24,7 @@ where
pub(crate) id: CoreId,
pub(crate) rxqueues: Vec<RxQueue>,
pub(crate) conntrack: ConnTrackConfig,
#[cfg(feature = "prometheus")]
pub(crate) is_prometheus_enabled: bool,
pub(crate) subscription: Arc<Subscription<S>>,
pub(crate) is_running: Arc<AtomicBool>,
Expand All @@ -37,14 +38,15 @@ where
core_id: CoreId,
rxqueues: Vec<RxQueue>,
conntrack: ConnTrackConfig,
is_prometheus_enabled: bool,
#[cfg(feature = "prometheus")] is_prometheus_enabled: bool,
subscription: Arc<Subscription<S>>,
is_running: Arc<AtomicBool>,
) -> Self {
RxCore {
id: core_id,
rxqueues,
conntrack,
#[cfg(feature = "prometheus")]
is_prometheus_enabled,
subscription,
is_running,
Expand Down Expand Up @@ -98,8 +100,10 @@ where
let mbufs: Vec<Mbuf> = self.rx_burst(rxqueue, 32);
if mbufs.is_empty() {
IDLE_CYCLES.inc();

#[cfg(feature = "prometheus")]
if IDLE_CYCLES.get() & 1023 == 0 && self.is_prometheus_enabled {
update_thread_local_stats(self.id);
crate::stats::update_thread_local_stats(self.id);
}
}
TOTAL_CYCLES.inc();
Expand Down
12 changes: 6 additions & 6 deletions core/src/runtime/online.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
use hyper::service::service_fn;
use hyper_util::rt::TokioIo;
use tokio::net::TcpListener;

use crate::config::{ConnTrackConfig, OnlineConfig, RuntimeConfig};
use crate::dpdk;
use crate::filter::Filter;
Expand All @@ -10,7 +6,6 @@ use crate::lcore::rx_core::RxCore;
use crate::lcore::{CoreId, SocketId};
use crate::memory::mempool::Mempool;
use crate::port::*;
use crate::stats::serve_req;
use crate::subscription::*;

use std::collections::BTreeMap;
Expand Down Expand Up @@ -89,6 +84,7 @@ where
core_id,
rxqueues,
options.conntrack.clone(),
#[cfg(feature = "prometheus")]
options.online.prometheus.is_some(),
Arc::clone(&subscription),
Arc::clone(&is_running),
Expand Down Expand Up @@ -149,7 +145,11 @@ where
let jh1 = tokio::spawn(async move {
monitor.run().await;
});
#[cfg(feature = "prometheus")]
if let Some(prometheus) = self.options.online.prometheus {
use hyper::service::service_fn;
use hyper_util::rt::TokioIo;
use tokio::net::TcpListener;
tokio::spawn(async move {
let listener = TcpListener::bind((prometheus.ip, prometheus.port))
.await
Expand All @@ -162,7 +162,7 @@ where
let socket = TokioIo::new(socket);
tokio::spawn(async move {
if let Err(e) = hyper::server::conn::http1::Builder::new()
.serve_connection(socket, service_fn(serve_req))
.serve_connection(socket, service_fn(crate::stats::serve_req))
.await
{
eprintln!("Prometheus server error: {}", e);
Expand Down
40 changes: 40 additions & 0 deletions core/src/stats/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
use std::cell::Cell;

#[cfg(feature = "prometheus")]
mod prometheus;

#[cfg(feature = "prometheus")]
pub use prometheus::*;

thread_local! {
pub(crate) static IGNORED_BY_PACKET_FILTER_PKT: Cell<u64> = const { Cell::new(0) };
pub(crate) static IGNORED_BY_PACKET_FILTER_BYTE: Cell<u64> = const { Cell::new(0) };
pub(crate) static DROPPED_MIDDLE_OF_CONNECTION_TCP_PKT: Cell<u64> = const { Cell::new(0) };
pub(crate) static DROPPED_MIDDLE_OF_CONNECTION_TCP_BYTE: Cell<u64> = const { Cell::new(0) };
pub(crate) static TOTAL_PKT: Cell<u64> = const { Cell::new(0) };
pub(crate) static TOTAL_BYTE: Cell<u64> = const { Cell::new(0) };
pub(crate) static TCP_PKT: Cell<u64> = const { Cell::new(0) };
pub(crate) static TCP_BYTE: Cell<u64> = const { Cell::new(0) };
pub(crate) static UDP_PKT: Cell<u64> = const { Cell::new(0) };
pub(crate) static UDP_BYTE: Cell<u64> = const { Cell::new(0) };
pub(crate) static TCP_NEW_CONNECTIONS: Cell<u64> = const { Cell::new(0) };
pub(crate) static UDP_NEW_CONNECTIONS: Cell<u64> = const { Cell::new(0) };
pub(crate) static IDLE_CYCLES: Cell<u64> = const { Cell::new(0) };
pub(crate) static TOTAL_CYCLES: Cell<u64> = const { Cell::new(0) };

#[cfg(feature = "prometheus")]
pub(crate) static PROMETHEUS: std::cell::OnceCell<prometheus::PerCorePrometheusStats> = const { std::cell::OnceCell::new() };
}

pub(crate) trait StatExt: Sized {
fn inc(&'static self) {
self.inc_by(1);
}
fn inc_by(&'static self, val: u64);
}

impl StatExt for std::thread::LocalKey<Cell<u64>> {
fn inc_by(&'static self, val: u64) {
self.set(self.get() + val);
}
}
35 changes: 2 additions & 33 deletions core/src/stats.rs → core/src/stats/prometheus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,11 @@ use prometheus_client::{
registry::{Registry, Unit},
};
use std::{
cell::{Cell, OnceCell},
fmt::Write,
sync::{LazyLock, Mutex, OnceLock},
};

use super::*;
use crate::CoreId;

impl EncodeLabelSet for CoreId {
Expand Down Expand Up @@ -262,7 +262,7 @@ pub(crate) static DPDK_STATS: LazyLock<DpdkPrometheusStats> =
sw_dropped_pkts: Counter::default(),
});

struct PerCorePrometheusStats {
pub(crate) struct PerCorePrometheusStats {
ignored_by_packet_filter_pkt: Counter,
ignored_by_packet_filter_byte: Counter,
dropped_middle_of_connection_tcp_pkt: Counter,
Expand All @@ -279,37 +279,6 @@ struct PerCorePrometheusStats {
total_cycles: Counter,
}

thread_local! {
pub(crate) static IGNORED_BY_PACKET_FILTER_PKT: Cell<u64> = const { Cell::new(0) };
pub(crate) static IGNORED_BY_PACKET_FILTER_BYTE: Cell<u64> = const { Cell::new(0) };
pub(crate) static DROPPED_MIDDLE_OF_CONNECTION_TCP_PKT: Cell<u64> = const { Cell::new(0) };
pub(crate) static DROPPED_MIDDLE_OF_CONNECTION_TCP_BYTE: Cell<u64> = const { Cell::new(0) };
pub(crate) static TOTAL_PKT: Cell<u64> = const { Cell::new(0) };
pub(crate) static TOTAL_BYTE: Cell<u64> = const { Cell::new(0) };
pub(crate) static TCP_PKT: Cell<u64> = const { Cell::new(0) };
pub(crate) static TCP_BYTE: Cell<u64> = const { Cell::new(0) };
pub(crate) static UDP_PKT: Cell<u64> = const { Cell::new(0) };
pub(crate) static UDP_BYTE: Cell<u64> = const { Cell::new(0) };
pub(crate) static TCP_NEW_CONNECTIONS: Cell<u64> = const { Cell::new(0) };
pub(crate) static UDP_NEW_CONNECTIONS: Cell<u64> = const { Cell::new(0) };
pub(crate) static IDLE_CYCLES: Cell<u64> = const { Cell::new(0) };
pub(crate) static TOTAL_CYCLES: Cell<u64> = const { Cell::new(0) };
pub(crate) static PROMETHEUS: OnceCell<PerCorePrometheusStats> = const { OnceCell::new() };
}

pub(crate) trait StatExt: Sized {
fn inc(&'static self) {
self.inc_by(1);
}
fn inc_by(&'static self, val: u64);
}

impl StatExt for std::thread::LocalKey<Cell<u64>> {
fn inc_by(&'static self, val: u64) {
self.set(self.get() + val);
}
}

pub(crate) fn update_thread_local_stats(core: CoreId) {
PROMETHEUS.with(|pr| {
let pr = pr.get_or_init(|| PerCorePrometheusStats {
Expand Down
2 changes: 1 addition & 1 deletion examples/websites-prometheus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ edition = "2021"
[dependencies]
clap = { version = "3.2.23", features = ["derive"] }
env_logger = "0.8.4"
retina-core = { path = "../../core" }
retina-core = { path = "../../core", features = ["prometheus"] }
retina-filtergen = { path = "../../filtergen" }
retina-datatypes = { path = "../../datatypes" }
prometheus-client = "0.22.3"

0 comments on commit c3d5bad

Please sign in to comment.