Skip to content

Commit

Permalink
feat: Implement Prometheus metric exporter (#154)
Browse files Browse the repository at this point in the history
  • Loading branch information
scarmuega authored Feb 20, 2022
1 parent b82a6c5 commit c34d80e
Show file tree
Hide file tree
Showing 11 changed files with 269 additions and 15 deletions.
81 changes: 80 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ serde_json = "1.0.78"
strum = "0.23"
strum_macros = "0.23"
minicbor = "0.13"
prometheus_exporter = { version = "0.8.4", default-features = false }

# feature logs
file-rotate = { version = "0.6.0", optional = true }
Expand Down
24 changes: 19 additions & 5 deletions src/bin/oura/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use oura::{
BootstrapResult, FilterProvider, PartialBootstrapResult, SinkProvider, SourceProvider,
StageReceiver,
},
utils::{cursor, ChainWellKnownInfo, Utils, WithUtils},
utils::{cursor, metrics, ChainWellKnownInfo, Utils, WithUtils},
Error,
};

Expand Down Expand Up @@ -125,6 +125,8 @@ struct ConfigRoot {
chain: Option<ChainWellKnownInfo>,

cursor: Option<cursor::Config>,

metrics: Option<metrics::Config>,
}

impl ConfigRoot {
Expand All @@ -149,12 +151,23 @@ impl ConfigRoot {
}
}

fn bootstrap_utils(chain: Option<ChainWellKnownInfo>, cursor: Option<cursor::Config>) -> Utils {
fn bootstrap_utils(
chain: Option<ChainWellKnownInfo>,
cursor: Option<cursor::Config>,
metrics: Option<metrics::Config>,
) -> Utils {
let well_known = chain.unwrap_or_default();
let mut utils = Utils::new(well_known);

let cursor = cursor.map(cursor::Provider::initialize);
if let Some(cursor) = cursor {
utils = utils.with_cursor(cursor);
}

if let Some(metrics) = metrics {
utils = utils.with_metrics(metrics);
}

Utils::new(well_known, cursor)
utils
}

/// Sets up the whole pipeline from configuration
Expand All @@ -165,9 +178,10 @@ fn bootstrap(config: ConfigRoot) -> Result<Vec<JoinHandle<()>>, Error> {
sink,
chain,
cursor,
metrics,
} = config;

let utils = Arc::new(bootstrap_utils(chain, cursor));
let utils = Arc::new(bootstrap_utils(chain, cursor, metrics));

let mut threads = Vec::with_capacity(10);

Expand Down
2 changes: 1 addition & 1 deletion src/bin/oura/dump.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ pub fn run(args: &ArgMatches) -> Result<(), Error> {

// TODO: map add cli arg to enable / disable cursor

let utils = Arc::new(Utils::new(well_known, None));
let utils = Arc::new(Utils::new(well_known));

#[allow(deprecated)]
let source_setup = match mode {
Expand Down
2 changes: 1 addition & 1 deletion src/bin/oura/watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ pub fn run(args: &ArgMatches) -> Result<(), Error> {

let well_known = ChainWellKnownInfo::try_from_magic(*magic)?;

let utils = Arc::new(Utils::new(well_known, None));
let utils = Arc::new(Utils::new(well_known));

#[allow(deprecated)]
let source_setup = match mode {
Expand Down
4 changes: 3 additions & 1 deletion src/mapper/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ impl EventWriter {
well_known: Option<ChainWellKnownInfo>,
config: Config,
) -> Self {
let utils = Arc::new(Utils::new(well_known.unwrap_or_default(), None));
let utils = Arc::new(Utils::new(well_known.unwrap_or_default()));

Self::new(output, utils, config)
}
Expand All @@ -65,6 +65,8 @@ impl EventWriter {
fingerprint: None,
};

self.utils.track_source_progress(&evt);

self.output
.send(evt)
.expect("error sending event through output stage, pipeline must have crashed.");
Expand Down
5 changes: 4 additions & 1 deletion src/sources/n2c/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ impl chainsync::Observer<chainsync::BlockContent> for ChainObserver {
fn on_roll_forward(
&mut self,
content: chainsync::BlockContent,
_tip: &chainsync::Tip,
tip: &chainsync::Tip,
) -> Result<(), Box<dyn std::error::Error>> {
// parse the block and extract the point of the chain
let cbor = Vec::from(content.deref());
Expand Down Expand Up @@ -73,6 +73,9 @@ impl chainsync::Observer<chainsync::BlockContent> for ChainObserver {

log_buffer_state(&self.chain_buffer);

// notify chain tip to the pipeline metrics
self.event_writer.utils.track_chain_tip(tip.1);

Ok(())
}

Expand Down
5 changes: 4 additions & 1 deletion src/sources/n2n/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ impl chainsync::Observer<chainsync::HeaderContent> for &mut ChainObserver {
fn on_roll_forward(
&mut self,
content: chainsync::HeaderContent,
_tip: &chainsync::Tip,
tip: &chainsync::Tip,
) -> Result<(), Error> {
// parse the header and extract the point of the chain
let header = MultiEraHeader::try_from(content)?;
Expand All @@ -97,6 +97,9 @@ impl chainsync::Observer<chainsync::HeaderContent> for &mut ChainObserver {

log_buffer_state(&self.chain_buffer);

// notify chain tip to the pipeline metrics
self.event_writer.utils.track_chain_tip(tip.1);

Ok(())
}

Expand Down
17 changes: 16 additions & 1 deletion src/utils/facade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@ impl Utils {
}
}

/// To be used by sink stages to track progress
pub fn track_source_progress(&self, event: &Event) {
if let Some(metrics) = &self.metrics {
metrics.on_source_event(event);
}
}

/// To be used by sink stages to track progress
pub fn track_sink_progress(&self, event: &Event) {
let point = match (event.context.slot, &event.context.block_hash) {
Expand All @@ -22,6 +29,14 @@ impl Utils {
cursor.set_cursor(point).ok_or_warn("failed to set cursor")
}

// TODO: add here future telemetry implementation
if let Some(metrics) = &self.metrics {
metrics.on_sink_event(event);
}
}

pub fn track_chain_tip(&self, tip: u64) {
if let Some(metrics) = &self.metrics {
metrics.on_chain_tip(tip);
}
}
}
Loading

0 comments on commit c34d80e

Please sign in to comment.