From 5ee2c22791f9f561a2ac58560fed3a28c0f7ca8e Mon Sep 17 00:00:00 2001 From: Santiago Carmuega Date: Fri, 18 Feb 2022 18:07:08 -0300 Subject: [PATCH 1/3] feat: Implement Prometheus metric exporter --- Cargo.lock | 81 +++++++++++++++++++++++++++++++++++- Cargo.toml | 4 ++ src/mapper/prelude.rs | 2 + src/utils/facade.rs | 17 +++++++- src/utils/metrics.rs | 96 +++++++++++++++++++++++++++++++++++++++++++ src/utils/mod.rs | 9 ++++ 6 files changed, 207 insertions(+), 2 deletions(-) create mode 100644 src/utils/metrics.rs diff --git a/Cargo.lock b/Cargo.lock index 5a06c4af..3e556c59 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -32,6 +32,12 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "23b62fc65de8e4e7f52534fb52b0f3ed04746ae267519eef2a83941e8085068b" +[[package]] +name = "ascii" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbf56136a5198c7b01a49e3afcbef6cf84597273d298f54432926024107b0109" + [[package]] name = "async-compression" version = "0.3.12" @@ -170,10 +176,16 @@ dependencies = [ "libc", "num-integer", "num-traits", - "time", + "time 0.1.44", "winapi", ] +[[package]] +name = "chunked_transfer" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fff857943da45f546682664a79488be82e69e43c1a7a2307679ab9afb3a66d2e" + [[package]] name = "clap" version = "3.0.14" @@ -984,6 +996,15 @@ dependencies = [ "libc", ] +[[package]] +name = "num_threads" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97ba99ba6393e2c3734791401b66902d981cb03bf190af674ca69949b6d5fb15" +dependencies = [ + "libc", +] + [[package]] name = "object" version = "0.27.1" @@ -1071,6 +1092,7 @@ dependencies = [ "net2", "openssl", "pallas", + "prometheus_exporter", "reqwest", "serde", "serde_json", @@ -1255,6 +1277,32 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "prometheus" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b7f64969ffd5dd8f39bd57a68ac53c163a095ed9d0fb707146da1b27025a3504" +dependencies = [ + "cfg-if 1.0.0", + "fnv", + "lazy_static", + "memchr", + "parking_lot 0.11.2", + "thiserror", +] + +[[package]] +name = "prometheus_exporter" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "019a192344efa197e8edfb2b864a5369ba8a837578d1bee469f21d98a8ed1233" +dependencies = [ + "ascii", + "prometheus", + "thiserror", + "tiny_http", +] + [[package]] name = "quote" version = "1.0.15" @@ -1694,6 +1742,37 @@ dependencies = [ "winapi", ] +[[package]] +name = "time" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "004cbc98f30fa233c61a38bc77e96a9106e65c88f2d3bef182ae952027e5753d" +dependencies = [ + "itoa", + "libc", + "num_threads", + "time-macros", +] + +[[package]] +name = "time-macros" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "25eb0ca3468fc0acc11828786797f6ef9aa1555e4a211a60d64cc8e4d1be47d6" + +[[package]] +name = "tiny_http" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c5f8734c6d6943ad6df6b588d228a87b4af184998bcffa268ceddf05c2055a8c" +dependencies = [ + "ascii", + "chunked_transfer", + "log 0.4.14", + "time 0.3.7", + "url", +] + [[package]] name = "tinyvec" version = "1.5.1" diff --git a/Cargo.toml b/Cargo.toml index de5d1935..200a117c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -50,11 +50,15 @@ tokio = { version = "1.17.0", optional = true, features = ["rt"] } # feature: fingerprint murmur3 = { version = "0.5.1", optional = true } +# feature: metrics +prometheus_exporter = { version = "0.8.4", default-features = false, optional = true } + # required for CI to complete successfully openssl = { version = "0.10", optional = true, features = ["vendored"] } [features] default = [] +metrics = ["prometheus_exporter"] logs = ["file-rotate"] webhook = ["reqwest"] tuisink = ["tui"] diff --git a/src/mapper/prelude.rs b/src/mapper/prelude.rs index 8f201466..bcbd9ed5 100644 --- a/src/mapper/prelude.rs +++ b/src/mapper/prelude.rs @@ -64,6 +64,8 @@ impl EventWriter { fingerprint: None, }; + self.utils.track_source_progress(&evt); + self.output .send(evt) .expect("error sending event through output stage, pipeline must have crashed."); diff --git a/src/utils/facade.rs b/src/utils/facade.rs index b6a71531..0cf576b9 100644 --- a/src/utils/facade.rs +++ b/src/utils/facade.rs @@ -11,6 +11,12 @@ impl Utils { } } + /// To be used by sink stages to track progress + pub fn track_source_progress(&self, event: &Event) { + #[cfg(feature = "metrics")] + self.metrics.on_source_event(event); + } + /// To be used by sink stages to track progress pub fn track_sink_progress(&self, event: &Event) { let point = match (event.context.slot, &event.context.block_hash) { @@ -22,6 +28,15 @@ impl Utils { cursor.set_cursor(point).ok_or_warn("failed to set cursor") } - // TODO: add here future telemetry implementation + #[cfg(feature = "metrics")] + self.metrics.on_sink_event(event); + } + + #[cfg(feature = "metrics")] + pub fn track_chain_tip(&self, tip: impl Into) { + //self.metrics.update_chain_state(metrics::ChainState { + // tip: Some(tip.into()), + // ..Default::default() + //}) } } diff --git a/src/utils/metrics.rs b/src/utils/metrics.rs new file mode 100644 index 00000000..ee754e2f --- /dev/null +++ b/src/utils/metrics.rs @@ -0,0 +1,96 @@ +///! An utility to keep track of the progress of the pipeline as a whole +use prometheus_exporter::{ + prometheus::{register_counter, register_int_gauge, Counter, IntGauge}, +}; + +use merge::Merge; + +use crate::{ + model::{Event, EventData}, + Error, +}; + +#[derive(Clone)] +pub struct Tip { + pub block: u64, + pub slot: u64, +} + +#[derive(Default, Merge, Clone)] +pub(crate) struct ChainState { + pub tip: Option, +} + +pub(crate) struct Provider { + pub chain_tip: IntGauge, + pub rollback_count: Counter, + pub source_current_slot: IntGauge, + pub source_current_height: IntGauge, + pub source_event_count: Counter, + pub sink_current_slot: IntGauge, + pub sink_event_count: Counter, +} + +impl Provider { + pub(crate) fn start() -> Result { + let binding = "0.0.0.0:9186".parse()?; + prometheus_exporter::start(binding)?; + + let provider = Provider { + chain_tip: register_int_gauge!( + "chain_tip", + "the last detected tip of the chain (height)" + )?, + rollback_count: register_counter!( + "rollback_count", + "number of rollback events occurred" + )?, + source_current_slot: register_int_gauge!( + "source_current_slot", + "last slot processed by the source of the pipeline" + )?, + source_current_height: register_int_gauge!( + "source_current_height", + "last height (block #) processed by the source of the pipeline" + )?, + source_event_count: register_counter!( + "source_event_count", + "number of events processed by the source of the pipeline" + )?, + sink_current_slot: register_int_gauge!( + "sink_current_slot", + "last slot processed by the sink of the pipeline" + )?, + sink_event_count: register_counter!( + "sink_event_count", + "number of events processed by the sink of the pipeline" + )?, + }; + + Ok(provider) + } + + pub(crate) fn on_source_event(&self, event: &Event) { + self.source_event_count.inc(); + + if let Some(slot) = &event.context.slot { + self.source_current_slot.set(*slot as i64); + } + + if let Some(block) = &event.context.block_number { + self.source_current_height.set(*block as i64); + } + + if matches!(event.data, EventData::RollBack { .. }) { + self.rollback_count.inc(); + } + } + + pub(crate) fn on_sink_event(&self, event: &Event) { + self.sink_event_count.inc(); + + if let Some(slot) = &event.context.slot { + self.sink_current_slot.set(*slot as i64); + } + } +} diff --git a/src/utils/mod.rs b/src/utils/mod.rs index 8cd41f5a..d97326b2 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -27,6 +27,9 @@ pub mod throttle; pub(crate) mod bech32; pub(crate) mod time; +#[cfg(feature = "metrics")] +pub(crate) mod metrics; + mod facade; pub(crate) trait SwallowResult { @@ -105,6 +108,9 @@ pub struct Utils { pub(crate) time: Option, pub(crate) bech32: Bech32Provider, pub(crate) cursor: Option, + + #[cfg(feature = "metrics")] + pub(crate) metrics: metrics::Provider, } impl Utils { @@ -115,6 +121,9 @@ impl Utils { bech32: Bech32Provider::new(Bech32Config::from_well_known(&well_known)), cursor, well_known, + + #[cfg(feature = "metrics")] + metrics: metrics::Provider::start().unwrap(), //.expect("metric server started"), } } } From d2592da589c5a9af200624eaf1abe7f60d4bb930 Mon Sep 17 00:00:00 2001 From: Santiago Carmuega Date: Sun, 20 Feb 2022 08:24:20 -0300 Subject: [PATCH 2/3] feat: Add metrics config --- Cargo.toml | 5 +---- src/bin/oura/daemon.rs | 24 +++++++++++++++++++----- src/bin/oura/dump.rs | 2 +- src/bin/oura/watch.rs | 2 +- src/mapper/prelude.rs | 2 +- src/utils/facade.rs | 11 ++++++----- src/utils/metrics.rs | 24 ++++++++++++++++++------ src/utils/mod.rs | 34 +++++++++++++++++++++++----------- 8 files changed, 70 insertions(+), 34 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 200a117c..3d299d8a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,6 +28,7 @@ serde_json = "1.0.78" strum = "0.23" strum_macros = "0.23" minicbor = "0.13" +prometheus_exporter = { version = "0.8.4", default-features = false } # feature logs file-rotate = { version = "0.6.0", optional = true } @@ -50,15 +51,11 @@ tokio = { version = "1.17.0", optional = true, features = ["rt"] } # feature: fingerprint murmur3 = { version = "0.5.1", optional = true } -# feature: metrics -prometheus_exporter = { version = "0.8.4", default-features = false, optional = true } - # required for CI to complete successfully openssl = { version = "0.10", optional = true, features = ["vendored"] } [features] default = [] -metrics = ["prometheus_exporter"] logs = ["file-rotate"] webhook = ["reqwest"] tuisink = ["tui"] diff --git a/src/bin/oura/daemon.rs b/src/bin/oura/daemon.rs index e6c2ec7c..eafc73ed 100644 --- a/src/bin/oura/daemon.rs +++ b/src/bin/oura/daemon.rs @@ -10,7 +10,7 @@ use oura::{ BootstrapResult, FilterProvider, PartialBootstrapResult, SinkProvider, SourceProvider, StageReceiver, }, - utils::{cursor, ChainWellKnownInfo, Utils, WithUtils}, + utils::{cursor, metrics, ChainWellKnownInfo, Utils, WithUtils}, Error, }; @@ -125,6 +125,8 @@ struct ConfigRoot { chain: Option, cursor: Option, + + metrics: Option, } impl ConfigRoot { @@ -149,12 +151,23 @@ impl ConfigRoot { } } -fn bootstrap_utils(chain: Option, cursor: Option) -> Utils { +fn bootstrap_utils( + chain: Option, + cursor: Option, + metrics: Option, +) -> Utils { let well_known = chain.unwrap_or_default(); + let mut utils = Utils::new(well_known); - let cursor = cursor.map(cursor::Provider::initialize); + if let Some(cursor) = cursor { + utils = utils.with_cursor(cursor); + } + + if let Some(metrics) = metrics { + utils = utils.with_metrics(metrics); + } - Utils::new(well_known, cursor) + utils } /// Sets up the whole pipeline from configuration @@ -165,9 +178,10 @@ fn bootstrap(config: ConfigRoot) -> Result>, Error> { sink, chain, cursor, + metrics, } = config; - let utils = Arc::new(bootstrap_utils(chain, cursor)); + let utils = Arc::new(bootstrap_utils(chain, cursor, metrics)); let mut threads = Vec::with_capacity(10); diff --git a/src/bin/oura/dump.rs b/src/bin/oura/dump.rs index c15f2d16..0d978092 100644 --- a/src/bin/oura/dump.rs +++ b/src/bin/oura/dump.rs @@ -106,7 +106,7 @@ pub fn run(args: &ArgMatches) -> Result<(), Error> { // TODO: map add cli arg to enable / disable cursor - let utils = Arc::new(Utils::new(well_known, None)); + let utils = Arc::new(Utils::new(well_known)); #[allow(deprecated)] let source_setup = match mode { diff --git a/src/bin/oura/watch.rs b/src/bin/oura/watch.rs index 02186f45..bd508836 100644 --- a/src/bin/oura/watch.rs +++ b/src/bin/oura/watch.rs @@ -84,7 +84,7 @@ pub fn run(args: &ArgMatches) -> Result<(), Error> { let well_known = ChainWellKnownInfo::try_from_magic(*magic)?; - let utils = Arc::new(Utils::new(well_known, None)); + let utils = Arc::new(Utils::new(well_known)); #[allow(deprecated)] let source_setup = match mode { diff --git a/src/mapper/prelude.rs b/src/mapper/prelude.rs index 9b8c0ab6..b01dc7f5 100644 --- a/src/mapper/prelude.rs +++ b/src/mapper/prelude.rs @@ -52,7 +52,7 @@ impl EventWriter { well_known: Option, config: Config, ) -> Self { - let utils = Arc::new(Utils::new(well_known.unwrap_or_default(), None)); + let utils = Arc::new(Utils::new(well_known.unwrap_or_default())); Self::new(output, utils, config) } diff --git a/src/utils/facade.rs b/src/utils/facade.rs index 0cf576b9..6705e562 100644 --- a/src/utils/facade.rs +++ b/src/utils/facade.rs @@ -13,8 +13,9 @@ impl Utils { /// To be used by sink stages to track progress pub fn track_source_progress(&self, event: &Event) { - #[cfg(feature = "metrics")] - self.metrics.on_source_event(event); + if let Some(metrics) = &self.metrics { + metrics.on_source_event(event); + } } /// To be used by sink stages to track progress @@ -28,11 +29,11 @@ impl Utils { cursor.set_cursor(point).ok_or_warn("failed to set cursor") } - #[cfg(feature = "metrics")] - self.metrics.on_sink_event(event); + if let Some(metrics) = &self.metrics { + metrics.on_sink_event(event); + } } - #[cfg(feature = "metrics")] pub fn track_chain_tip(&self, tip: impl Into) { //self.metrics.update_chain_state(metrics::ChainState { // tip: Some(tip.into()), diff --git a/src/utils/metrics.rs b/src/utils/metrics.rs index ee754e2f..7da33619 100644 --- a/src/utils/metrics.rs +++ b/src/utils/metrics.rs @@ -1,15 +1,20 @@ ///! An utility to keep track of the progress of the pipeline as a whole -use prometheus_exporter::{ - prometheus::{register_counter, register_int_gauge, Counter, IntGauge}, -}; +use prometheus_exporter::prometheus::{register_counter, register_int_gauge, Counter, IntGauge}; use merge::Merge; +use serde::{Deserialize, Serialize}; use crate::{ model::{Event, EventData}, Error, }; +#[derive(Debug, Serialize, Deserialize)] +pub struct Config { + binding: Option, + endpoint: Option, +} + #[derive(Clone)] pub struct Tip { pub block: u64, @@ -32,9 +37,16 @@ pub(crate) struct Provider { } impl Provider { - pub(crate) fn start() -> Result { - let binding = "0.0.0.0:9186".parse()?; - prometheus_exporter::start(binding)?; + pub(crate) fn initialize(config: &Config) -> Result { + let binding = config + .binding + .as_deref() + .unwrap_or("0.0.0.0:9186") + .parse()?; + + let endpoint = config.endpoint.as_deref().unwrap_or("/metrics"); + + prometheus_exporter::Builder::new(binding).with_endpoint(endpoint)?; let provider = Provider { chain_tip: register_int_gauge!( diff --git a/src/utils/mod.rs b/src/utils/mod.rs index 7b6d9f12..7a28767c 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -22,14 +22,12 @@ use crate::{ use crate::Error; pub mod cursor; +pub mod metrics; pub mod throttle; pub(crate) mod bech32; pub(crate) mod time; -#[cfg(feature = "metrics")] -pub(crate) mod metrics; - mod facade; pub(crate) trait SwallowResult { @@ -122,22 +120,36 @@ pub struct Utils { pub(crate) time: Option, pub(crate) bech32: Bech32Provider, pub(crate) cursor: Option, - - #[cfg(feature = "metrics")] - pub(crate) metrics: metrics::Provider, + pub(crate) metrics: Option, } +// TODO: refactor this using the builder pattern impl Utils { - // TODO: refactor this using the builder pattern - pub fn new(well_known: ChainWellKnownInfo, cursor: Option) -> Self { + pub fn new(well_known: ChainWellKnownInfo) -> Self { Self { time: NaiveTime::new(well_known.clone()).into(), bech32: Bech32Provider::new(Bech32Config::from_well_known(&well_known)), - cursor, well_known, + cursor: None, + metrics: None, + } + } + + pub fn with_cursor(self, config: cursor::Config) -> Self { + let provider = cursor::Provider::initialize(config); - #[cfg(feature = "metrics")] - metrics: metrics::Provider::start().unwrap(), //.expect("metric server started"), + Self { + cursor: provider.into(), + ..self + } + } + + pub fn with_metrics(self, config: metrics::Config) -> Self { + let provider = metrics::Provider::initialize(&config).expect("metric server started"); + + Self { + metrics: provider.into(), + ..self } } } From dd04471ea2eeee4b34f868163013051d4058ecae Mon Sep 17 00:00:00 2001 From: Santiago Carmuega Date: Sun, 20 Feb 2022 09:11:20 -0300 Subject: [PATCH 3/3] feat: Track chain tip in metrics --- src/sources/n2c/run.rs | 5 ++++- src/sources/n2n/run.rs | 5 ++++- src/utils/facade.rs | 9 ++++----- src/utils/metrics.rs | 12 ++++++++++-- 4 files changed, 22 insertions(+), 9 deletions(-) diff --git a/src/sources/n2c/run.rs b/src/sources/n2c/run.rs index 1a334d04..08b82d75 100644 --- a/src/sources/n2c/run.rs +++ b/src/sources/n2c/run.rs @@ -36,7 +36,7 @@ impl chainsync::Observer for ChainObserver { fn on_roll_forward( &mut self, content: chainsync::BlockContent, - _tip: &chainsync::Tip, + tip: &chainsync::Tip, ) -> Result<(), Box> { // parse the block and extract the point of the chain let cbor = Vec::from(content.deref()); @@ -73,6 +73,9 @@ impl chainsync::Observer for ChainObserver { log_buffer_state(&self.chain_buffer); + // notify chain tip to the pipeline metrics + self.event_writer.utils.track_chain_tip(tip.1); + Ok(()) } diff --git a/src/sources/n2n/run.rs b/src/sources/n2n/run.rs index 860fdc94..a2b4747e 100644 --- a/src/sources/n2n/run.rs +++ b/src/sources/n2n/run.rs @@ -75,7 +75,7 @@ impl chainsync::Observer for &mut ChainObserver { fn on_roll_forward( &mut self, content: chainsync::HeaderContent, - _tip: &chainsync::Tip, + tip: &chainsync::Tip, ) -> Result<(), Error> { // parse the header and extract the point of the chain let header = MultiEraHeader::try_from(content)?; @@ -97,6 +97,9 @@ impl chainsync::Observer for &mut ChainObserver { log_buffer_state(&self.chain_buffer); + // notify chain tip to the pipeline metrics + self.event_writer.utils.track_chain_tip(tip.1); + Ok(()) } diff --git a/src/utils/facade.rs b/src/utils/facade.rs index 6705e562..c0159671 100644 --- a/src/utils/facade.rs +++ b/src/utils/facade.rs @@ -34,10 +34,9 @@ impl Utils { } } - pub fn track_chain_tip(&self, tip: impl Into) { - //self.metrics.update_chain_state(metrics::ChainState { - // tip: Some(tip.into()), - // ..Default::default() - //}) + pub fn track_chain_tip(&self, tip: u64) { + if let Some(metrics) = &self.metrics { + metrics.on_chain_tip(tip); + } } } diff --git a/src/utils/metrics.rs b/src/utils/metrics.rs index 7da33619..f440a00a 100644 --- a/src/utils/metrics.rs +++ b/src/utils/metrics.rs @@ -44,9 +44,13 @@ impl Provider { .unwrap_or("0.0.0.0:9186") .parse()?; - let endpoint = config.endpoint.as_deref().unwrap_or("/metrics"); + let mut builder = prometheus_exporter::Builder::new(binding); - prometheus_exporter::Builder::new(binding).with_endpoint(endpoint)?; + if let Some(endpoint) = &config.endpoint { + builder.with_endpoint(endpoint)?; + } + + builder.start()?; let provider = Provider { chain_tip: register_int_gauge!( @@ -82,6 +86,10 @@ impl Provider { Ok(provider) } + pub(crate) fn on_chain_tip(&self, tip: u64) { + self.chain_tip.set(tip as i64); + } + pub(crate) fn on_source_event(&self, event: &Event) { self.source_event_count.inc();