diff --git a/substrate/client/utils/src/metrics.rs b/substrate/client/utils/src/metrics.rs index 6bbdbe2e2e59..308e90cb2537 100644 --- a/substrate/client/utils/src/metrics.rs +++ b/substrate/client/utils/src/metrics.rs @@ -24,7 +24,10 @@ use prometheus::{ Error as PrometheusError, Registry, }; -use prometheus::{core::GenericCounterVec, Opts}; +use prometheus::{ + core::{GenericCounterVec, GenericGaugeVec}, + Opts, +}; lazy_static! { pub static ref TOKIO_THREADS_TOTAL: GenericCounter = @@ -36,18 +39,32 @@ lazy_static! { } lazy_static! { - pub static ref UNBOUNDED_CHANNELS_COUNTER : GenericCounterVec = GenericCounterVec::new( - Opts::new("substrate_unbounded_channel_len", "Items in each mpsc::unbounded instance"), - &["entity", "action"] // 'name of channel, send|received|dropped + pub static ref UNBOUNDED_CHANNELS_COUNTER: GenericCounterVec = GenericCounterVec::new( + Opts::new( + "substrate_unbounded_channel_len", + "Items sent/received/dropped on each mpsc::unbounded instance" + ), + &["entity", "action"], // name of channel, send|received|dropped + ).expect("Creating of statics doesn't fail. qed"); + pub static ref UNBOUNDED_CHANNELS_SIZE: GenericGaugeVec = GenericGaugeVec::new( + Opts::new( + "substrate_unbounded_channel_size", + "Size (number of messages to be processed) of each mpsc::unbounded instance", + ), + &["entity"], // name of channel ).expect("Creating of statics doesn't fail. qed"); - } +pub static SENT_LABEL: &'static str = "send"; +pub static RECEIVED_LABEL: &'static str = "received"; +pub static DROPPED_LABEL: &'static str = "dropped"; + /// Register the statics to report to registry pub fn register_globals(registry: &Registry) -> Result<(), PrometheusError> { registry.register(Box::new(TOKIO_THREADS_ALIVE.clone()))?; registry.register(Box::new(TOKIO_THREADS_TOTAL.clone()))?; registry.register(Box::new(UNBOUNDED_CHANNELS_COUNTER.clone()))?; + registry.register(Box::new(UNBOUNDED_CHANNELS_SIZE.clone()))?; Ok(()) } diff --git a/substrate/client/utils/src/mpsc.rs b/substrate/client/utils/src/mpsc.rs index 039e03f9e618..c24a5bd8904a 100644 --- a/substrate/client/utils/src/mpsc.rs +++ b/substrate/client/utils/src/mpsc.rs @@ -20,7 +20,9 @@ pub use async_channel::{TryRecvError, TrySendError}; -use crate::metrics::UNBOUNDED_CHANNELS_COUNTER; +use crate::metrics::{ + DROPPED_LABEL, RECEIVED_LABEL, SENT_LABEL, UNBOUNDED_CHANNELS_COUNTER, UNBOUNDED_CHANNELS_SIZE, +}; use async_channel::{Receiver, Sender}; use futures::{ stream::{FusedStream, Stream}, @@ -102,7 +104,10 @@ impl TracingUnboundedSender { /// Proxy function to `async_channel::Sender::try_send`. pub fn unbounded_send(&self, msg: T) -> Result<(), TrySendError> { self.inner.try_send(msg).map(|s| { - UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[self.name, "send"]).inc(); + UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[self.name, SENT_LABEL]).inc(); + UNBOUNDED_CHANNELS_SIZE + .with_label_values(&[self.name]) + .set(self.inner.len().saturated_into()); if self.inner.len() >= self.queue_size_warning && self.warning_fired @@ -140,7 +145,10 @@ impl TracingUnboundedReceiver { /// that discounts the messages taken out. pub fn try_recv(&mut self) -> Result { self.inner.try_recv().map(|s| { - UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[self.name, "received"]).inc(); + UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[self.name, RECEIVED_LABEL]).inc(); + UNBOUNDED_CHANNELS_SIZE + .with_label_values(&[self.name]) + .set(self.inner.len().saturated_into()); s }) } @@ -155,14 +163,16 @@ impl Drop for TracingUnboundedReceiver { fn drop(&mut self) { // Close the channel to prevent any further messages to be sent into the channel self.close(); - // the number of messages about to be dropped + // The number of messages about to be dropped let count = self.inner.len(); - // discount the messages + // Discount the messages if count > 0 { UNBOUNDED_CHANNELS_COUNTER - .with_label_values(&[self.name, "dropped"]) + .with_label_values(&[self.name, DROPPED_LABEL]) .inc_by(count.saturated_into()); } + // Reset the size metric to 0 + UNBOUNDED_CHANNELS_SIZE.with_label_values(&[self.name]).set(0); // Drain all the pending messages in the channel since they can never be accessed, // this can be removed once https://github.com/smol-rs/async-channel/issues/23 is // resolved @@ -180,7 +190,10 @@ impl Stream for TracingUnboundedReceiver { match Pin::new(&mut s.inner).poll_next(cx) { Poll::Ready(msg) => { if msg.is_some() { - UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[s.name, "received"]).inc(); + UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[s.name, RECEIVED_LABEL]).inc(); + UNBOUNDED_CHANNELS_SIZE + .with_label_values(&[s.name]) + .set(s.inner.len().saturated_into()); } Poll::Ready(msg) },