diff --git a/Cargo.lock b/Cargo.lock index 9ee48f330749..d262c1795a6b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -16745,7 +16745,6 @@ dependencies = [ "hyper", "jsonrpsee", "log", - "pin-project", "serde_json", "substrate-prometheus-endpoint", "tokio", diff --git a/prdoc/pr_3504.prdoc b/prdoc/pr_3504.prdoc new file mode 100644 index 000000000000..90a8ddd38b8f --- /dev/null +++ b/prdoc/pr_3504.prdoc @@ -0,0 +1,13 @@ +# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0 +# See doc at https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/prdoc/schema_user.json + +title: add prometheus label "is_rate_limited" to rpc calls + +doc: + - audience: Node Operator + description: | + This PR adds a label "is_rate_limited" to the prometheus metrics "substrate_rpc_calls_time" and "substrate_rpc_calls_finished" + than can be used to distinguish rate-limited RPC calls from other RPC calls. Because rate-limited RPC calls may take + tens of seconds. + +crates: [ ] diff --git a/substrate/client/rpc-servers/Cargo.toml b/substrate/client/rpc-servers/Cargo.toml index 7f7a86799e0c..3adc81c57d59 100644 --- a/substrate/client/rpc-servers/Cargo.toml +++ b/substrate/client/rpc-servers/Cargo.toml @@ -26,5 +26,4 @@ tower = { version = "0.4.13", features = ["util"] } http = "0.2.8" hyper = "0.14.27" futures = "0.3.29" -pin-project = "1.1.3" governor = "0.6.0" diff --git a/substrate/client/rpc-servers/src/lib.rs b/substrate/client/rpc-servers/src/lib.rs index e65d954bed52..ad4b444c7ff4 100644 --- a/substrate/client/rpc-servers/src/lib.rs +++ b/substrate/client/rpc-servers/src/lib.rs @@ -49,7 +49,7 @@ pub use jsonrpsee::{ }, server::{middleware::rpc::RpcServiceBuilder, BatchRequestConfig}, }; -pub use middleware::{MetricsLayer, RateLimitLayer, RpcMetrics}; +pub use middleware::{Metrics, MiddlewareLayer, RpcMetrics}; const MEGABYTE: u32 = 1024 * 1024; @@ -173,13 +173,22 @@ where let is_websocket = ws::is_upgrade_request(&req); let transport_label = if is_websocket { "ws" } else { "http" }; - let metrics = metrics.map(|m| MetricsLayer::new(m, transport_label)); - let rate_limit = rate_limit.map(|r| RateLimitLayer::per_minute(r)); + let middleware_layer = match (metrics, rate_limit) { + (None, None) => None, + (Some(metrics), None) => Some( + MiddlewareLayer::new().with_metrics(Metrics::new(metrics, transport_label)), + ), + (None, Some(rate_limit)) => + Some(MiddlewareLayer::new().with_rate_limit_per_minute(rate_limit)), + (Some(metrics), Some(rate_limit)) => Some( + MiddlewareLayer::new() + .with_metrics(Metrics::new(metrics, transport_label)) + .with_rate_limit_per_minute(rate_limit), + ), + }; - // NOTE: The metrics needs to run first to include rate-limited calls in the - // metrics. let rpc_middleware = - RpcServiceBuilder::new().option_layer(metrics.clone()).option_layer(rate_limit); + RpcServiceBuilder::new().option_layer(middleware_layer.clone()); let mut svc = service_builder.set_rpc_middleware(rpc_middleware).build(methods, stop_handle); @@ -191,9 +200,9 @@ where // Spawn a task to handle when the connection is closed. tokio_handle.spawn(async move { let now = std::time::Instant::now(); - metrics.as_ref().map(|m| m.ws_connect()); + middleware_layer.as_ref().map(|m| m.ws_connect()); on_disconnect.await; - metrics.as_ref().map(|m| m.ws_disconnect(now)); + middleware_layer.as_ref().map(|m| m.ws_disconnect(now)); }); } diff --git a/substrate/client/rpc-servers/src/middleware/metrics.rs b/substrate/client/rpc-servers/src/middleware/metrics.rs index c2d1956c3b39..17849dc0c448 100644 --- a/substrate/client/rpc-servers/src/middleware/metrics.rs +++ b/substrate/client/rpc-servers/src/middleware/metrics.rs @@ -18,15 +18,9 @@ //! RPC middleware to collect prometheus metrics on RPC calls. -use std::{ - future::Future, - pin::Pin, - task::{Context, Poll}, - time::Instant, -}; +use std::time::Instant; -use jsonrpsee::{server::middleware::rpc::RpcServiceT, types::Request, MethodResponse}; -use pin_project::pin_project; +use jsonrpsee::{types::Request, MethodResponse}; use prometheus_endpoint::{ register, Counter, CounterVec, HistogramOpts, HistogramVec, Opts, PrometheusError, Registry, U64, @@ -77,7 +71,7 @@ impl RpcMetrics { "Total time [μs] of processed RPC calls", ) .buckets(HISTOGRAM_BUCKETS.to_vec()), - &["protocol", "method"], + &["protocol", "method", "is_rate_limited"], )?, metrics_registry, )?, @@ -97,7 +91,7 @@ impl RpcMetrics { "substrate_rpc_calls_finished", "Number of processed RPC calls (unique un-batched requests)", ), - &["protocol", "method", "is_error"], + &["protocol", "method", "is_error", "is_rate_limited"], )?, metrics_registry, )?, @@ -144,138 +138,90 @@ impl RpcMetrics { self.ws_sessions_closed.as_ref().map(|counter| counter.inc()); self.ws_sessions_time.with_label_values(&["ws"]).observe(micros as _); } -} - -/// Metrics layer. -#[derive(Clone)] -pub struct MetricsLayer { - inner: RpcMetrics, - transport_label: &'static str, -} - -impl MetricsLayer { - /// Create a new [`MetricsLayer`]. - pub fn new(metrics: RpcMetrics, transport_label: &'static str) -> Self { - Self { inner: metrics, transport_label } - } - - pub(crate) fn ws_connect(&self) { - self.inner.ws_connect(); - } - - pub(crate) fn ws_disconnect(&self, now: Instant) { - self.inner.ws_disconnect(now) - } -} - -impl tower::Layer for MetricsLayer { - type Service = Metrics; - - fn layer(&self, inner: S) -> Self::Service { - Metrics::new(inner, self.inner.clone(), self.transport_label) - } -} - -/// Metrics middleware. -#[derive(Clone)] -pub struct Metrics { - service: S, - metrics: RpcMetrics, - transport_label: &'static str, -} - -impl Metrics { - /// Create a new metrics middleware. - pub fn new(service: S, metrics: RpcMetrics, transport_label: &'static str) -> Metrics { - Metrics { service, metrics, transport_label } - } -} - -impl<'a, S> RpcServiceT<'a> for Metrics -where - S: Send + Sync + RpcServiceT<'a>, -{ - type Future = ResponseFuture<'a, S::Future>; - - fn call(&self, req: Request<'a>) -> Self::Future { - let now = Instant::now(); + pub(crate) fn on_call(&self, req: &Request, transport_label: &'static str) { log::trace!( target: "rpc_metrics", - "[{}] on_call name={} params={:?}", - self.transport_label, + "[{transport_label}] on_call name={} params={:?}", req.method_name(), req.params(), ); - self.metrics - .calls_started - .with_label_values(&[self.transport_label, req.method_name()]) + + self.calls_started + .with_label_values(&[transport_label, req.method_name()]) .inc(); + } - ResponseFuture { - fut: self.service.call(req.clone()), - metrics: self.metrics.clone(), - req, - now, - transport_label: self.transport_label, - } + pub(crate) fn on_response( + &self, + req: &Request, + rp: &MethodResponse, + is_rate_limited: bool, + transport_label: &'static str, + now: Instant, + ) { + log::trace!(target: "rpc_metrics", "[{transport_label}] on_response started_at={:?}", now); + log::trace!(target: "rpc_metrics::extra", "[{transport_label}] result={}", rp.as_result()); + + let micros = now.elapsed().as_micros(); + log::debug!( + target: "rpc_metrics", + "[{transport_label}] {} call took {} μs", + req.method_name(), + micros, + ); + self.calls_time + .with_label_values(&[ + transport_label, + req.method_name(), + if is_rate_limited { "true" } else { "false" }, + ]) + .observe(micros as _); + self.calls_finished + .with_label_values(&[ + transport_label, + req.method_name(), + // the label "is_error", so `success` should be regarded as false + // and vice-versa to be registrered correctly. + if rp.is_success() { "false" } else { "true" }, + if is_rate_limited { "true" } else { "false" }, + ]) + .inc(); } } -/// Response future for metrics. -#[pin_project] -pub struct ResponseFuture<'a, F> { - #[pin] - fut: F, - metrics: RpcMetrics, - req: Request<'a>, - now: Instant, - transport_label: &'static str, +/// Metrics with transport label. +#[derive(Clone, Debug)] +pub struct Metrics { + pub(crate) inner: RpcMetrics, + pub(crate) transport_label: &'static str, } -impl<'a, F> std::fmt::Debug for ResponseFuture<'a, F> { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.write_str("ResponseFuture") +impl Metrics { + /// Create a new [`Metrics`]. + pub fn new(metrics: RpcMetrics, transport_label: &'static str) -> Self { + Self { inner: metrics, transport_label } } -} -impl<'a, F: Future> Future for ResponseFuture<'a, F> { - type Output = F::Output; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.project(); + pub(crate) fn ws_connect(&self) { + self.inner.ws_connect(); + } - let res = this.fut.poll(cx); - if let Poll::Ready(rp) = &res { - let method_name = this.req.method_name(); - let transport_label = &this.transport_label; - let now = this.now; - let metrics = &this.metrics; + pub(crate) fn ws_disconnect(&self, now: Instant) { + self.inner.ws_disconnect(now) + } - log::trace!(target: "rpc_metrics", "[{transport_label}] on_response started_at={:?}", now); - log::trace!(target: "rpc_metrics::extra", "[{transport_label}] result={:?}", rp); + pub(crate) fn on_call(&self, req: &Request) { + self.inner.on_call(req, self.transport_label) + } - let micros = now.elapsed().as_micros(); - log::debug!( - target: "rpc_metrics", - "[{transport_label}] {method_name} call took {} μs", - micros, - ); - metrics - .calls_time - .with_label_values(&[transport_label, method_name]) - .observe(micros as _); - metrics - .calls_finished - .with_label_values(&[ - transport_label, - method_name, - // the label "is_error", so `success` should be regarded as false - // and vice-versa to be registrered correctly. - if rp.is_success() { "false" } else { "true" }, - ]) - .inc(); - } - res + pub(crate) fn on_response( + &self, + req: &Request, + rp: &MethodResponse, + is_rate_limited: bool, + now: Instant, + ) { + self.inner.on_response(req, rp, is_rate_limited, self.transport_label, now) } } diff --git a/substrate/client/rpc-servers/src/middleware/mod.rs b/substrate/client/rpc-servers/src/middleware/mod.rs index cac516913d32..88ed8b2f4335 100644 --- a/substrate/client/rpc-servers/src/middleware/mod.rs +++ b/substrate/client/rpc-servers/src/middleware/mod.rs @@ -18,10 +18,131 @@ //! JSON-RPC specific middleware. -/// Grafana metrics middleware. -pub mod metrics; -/// Rate limit middleware. -pub mod rate_limit; +use std::{ + num::NonZeroU32, + time::{Duration, Instant}, +}; + +use futures::future::{BoxFuture, FutureExt}; +use governor::{clock::Clock, Jitter}; +use jsonrpsee::{ + server::middleware::rpc::RpcServiceT, + types::{ErrorObject, Id, Request}, + MethodResponse, +}; + +mod metrics; +mod rate_limit; pub use metrics::*; pub use rate_limit::*; + +const MAX_JITTER: Duration = Duration::from_millis(50); +const MAX_RETRIES: usize = 10; + +/// JSON-RPC middleware layer. +#[derive(Debug, Clone, Default)] +pub struct MiddlewareLayer { + rate_limit: Option, + metrics: Option, +} + +impl MiddlewareLayer { + /// Create an empty MiddlewareLayer. + pub fn new() -> Self { + Self::default() + } + + /// Enable new rate limit middleware enforced per minute. + pub fn with_rate_limit_per_minute(self, n: NonZeroU32) -> Self { + Self { rate_limit: Some(RateLimit::per_minute(n)), metrics: self.metrics } + } + + /// Enable metrics middleware. + pub fn with_metrics(self, metrics: Metrics) -> Self { + Self { rate_limit: self.rate_limit, metrics: Some(metrics) } + } + + /// Register a new websocket connection. + pub fn ws_connect(&self) { + self.metrics.as_ref().map(|m| m.ws_connect()); + } + + /// Register that a websocket connection was closed. + pub fn ws_disconnect(&self, now: Instant) { + self.metrics.as_ref().map(|m| m.ws_disconnect(now)); + } +} + +impl tower::Layer for MiddlewareLayer { + type Service = Middleware; + + fn layer(&self, service: S) -> Self::Service { + Middleware { service, rate_limit: self.rate_limit.clone(), metrics: self.metrics.clone() } + } +} + +/// JSON-RPC middleware that handles metrics +/// and rate-limiting. +/// +/// These are part of the same middleware +/// because the metrics needs to know whether +/// a call was rate-limited or not because +/// it will impact the roundtrip for a call. +pub struct Middleware { + service: S, + rate_limit: Option, + metrics: Option, +} + +impl<'a, S> RpcServiceT<'a> for Middleware +where + S: Send + Sync + RpcServiceT<'a> + Clone + 'static, +{ + type Future = BoxFuture<'a, MethodResponse>; + + fn call(&self, req: Request<'a>) -> Self::Future { + let now = Instant::now(); + + self.metrics.as_ref().map(|m| m.on_call(&req)); + + let service = self.service.clone(); + let rate_limit = self.rate_limit.clone(); + let metrics = self.metrics.clone(); + + async move { + let mut is_rate_limited = false; + + if let Some(limit) = rate_limit.as_ref() { + let mut attempts = 0; + let jitter = Jitter::up_to(MAX_JITTER); + + loop { + if attempts >= MAX_RETRIES { + return reject_too_many_calls(req.id); + } + + if let Err(rejected) = limit.inner.check() { + tokio::time::sleep(jitter + rejected.wait_time_from(limit.clock.now())) + .await; + } else { + break; + } + + is_rate_limited = true; + attempts += 1; + } + } + + let rp = service.call(req.clone()).await; + metrics.as_ref().map(|m| m.on_response(&req, &rp, is_rate_limited, now)); + + rp + } + .boxed() + } +} + +fn reject_too_many_calls(id: Id) -> MethodResponse { + MethodResponse::error(id, ErrorObject::owned(-32999, "RPC rate limit exceeded", None::<()>)) +} diff --git a/substrate/client/rpc-servers/src/middleware/rate_limit.rs b/substrate/client/rpc-servers/src/middleware/rate_limit.rs index cdcc3ebf66f7..23335eb37686 100644 --- a/substrate/client/rpc-servers/src/middleware/rate_limit.rs +++ b/substrate/client/rpc-servers/src/middleware/rate_limit.rs @@ -16,92 +16,32 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -//! RPC rate limiting middleware. +//! RPC rate limit. -use std::{num::NonZeroU32, sync::Arc, time::Duration}; - -use futures::future::{BoxFuture, FutureExt}; use governor::{ - clock::{Clock, DefaultClock, QuantaClock}, + clock::{DefaultClock, QuantaClock}, middleware::NoOpMiddleware, state::{InMemoryState, NotKeyed}, - Jitter, -}; -use jsonrpsee::{ - server::middleware::rpc::RpcServiceT, - types::{ErrorObject, Id, Request}, - MethodResponse, + Quota, }; +use std::{num::NonZeroU32, sync::Arc}; type RateLimitInner = governor::RateLimiter; -const MAX_JITTER: Duration = Duration::from_millis(50); -const MAX_RETRIES: usize = 10; - -/// JSON-RPC rate limit middleware layer. +/// Rate limit. #[derive(Debug, Clone)] -pub struct RateLimitLayer(governor::Quota); - -impl RateLimitLayer { - /// Create new rate limit enforced per minute. - pub fn per_minute(n: NonZeroU32) -> Self { - Self(governor::Quota::per_minute(n)) - } +pub struct RateLimit { + pub(crate) inner: Arc, + pub(crate) clock: QuantaClock, } -/// JSON-RPC rate limit middleware -pub struct RateLimit { - service: S, - rate_limit: Arc, - clock: QuantaClock, -} - -impl tower::Layer for RateLimitLayer { - type Service = RateLimit; - - fn layer(&self, service: S) -> Self::Service { +impl RateLimit { + /// Create a new `RateLimit` per minute. + pub fn per_minute(n: NonZeroU32) -> Self { let clock = QuantaClock::default(); - RateLimit { - service, - rate_limit: Arc::new(RateLimitInner::direct_with_clock(self.0, &clock)), + Self { + inner: Arc::new(RateLimitInner::direct_with_clock(Quota::per_minute(n), &clock)), clock, } } } - -impl<'a, S> RpcServiceT<'a> for RateLimit -where - S: Send + Sync + RpcServiceT<'a> + Clone + 'static, -{ - type Future = BoxFuture<'a, MethodResponse>; - - fn call(&self, req: Request<'a>) -> Self::Future { - let service = self.service.clone(); - let rate_limit = self.rate_limit.clone(); - let clock = self.clock.clone(); - - async move { - let mut attempts = 0; - let jitter = Jitter::up_to(MAX_JITTER); - - loop { - if attempts >= MAX_RETRIES { - break reject_too_many_calls(req.id); - } - - if let Err(rejected) = rate_limit.check() { - tokio::time::sleep(jitter + rejected.wait_time_from(clock.now())).await; - } else { - break service.call(req).await; - } - - attempts += 1; - } - } - .boxed() - } -} - -fn reject_too_many_calls(id: Id) -> MethodResponse { - MethodResponse::error(id, ErrorObject::owned(-32999, "RPC rate limit exceeded", None::<()>)) -}