From 848601ac72ad4a77f2e0b150568857b9b4d634af Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Thu, 11 Apr 2024 11:32:46 +0300 Subject: [PATCH 01/10] Sketch extracting JSON-RPC params --- .../web3/backend_jsonrpsee/metadata.rs | 16 ++++-- .../web3/backend_jsonrpsee/middleware.rs | 30 ++++++++--- .../api_server/web3/backend_jsonrpsee/mod.rs | 50 +++++++++++++++++++ 3 files changed, 84 insertions(+), 12 deletions(-) diff --git a/core/lib/zksync_core/src/api_server/web3/backend_jsonrpsee/metadata.rs b/core/lib/zksync_core/src/api_server/web3/backend_jsonrpsee/metadata.rs index ab898fcac2e0..6e07dbfc3a69 100644 --- a/core/lib/zksync_core/src/api_server/web3/backend_jsonrpsee/metadata.rs +++ b/core/lib/zksync_core/src/api_server/web3/backend_jsonrpsee/metadata.rs @@ -11,6 +11,7 @@ use zksync_web3_decl::{ #[cfg(test)] use super::testonly::RecordedMethodCalls; +use super::RawParamsWithBorrow; use crate::api_server::web3::metrics::API_METRICS; /// Metadata assigned to a JSON-RPC method call. @@ -88,9 +89,14 @@ impl MethodTracer { } } - pub(super) fn new_call(self: &Arc, name: &'static str) -> MethodCall { + pub(super) fn new_call<'a>( + self: &Arc, + name: &'static str, + raw_params: RawParamsWithBorrow<'a>, + ) -> MethodCall<'a> { MethodCall { tracer: self.clone(), + raw_params, meta: MethodMetadata::new(name), is_completed: false, } @@ -118,21 +124,23 @@ impl MethodTracer { } #[derive(Debug)] -pub(super) struct MethodCall { +pub(super) struct MethodCall<'a> { tracer: Arc, meta: MethodMetadata, + raw_params: RawParamsWithBorrow<'a>, is_completed: bool, } -impl Drop for MethodCall { +impl Drop for MethodCall<'_> { fn drop(&mut self) { + dbg!(&self.meta, self.raw_params.get()); if !self.is_completed { API_METRICS.observe_dropped_call(&self.meta); } } } -impl MethodCall { +impl MethodCall<'_> { pub(super) fn set_as_current(&mut self) -> CurrentMethodGuard<'_> { let meta = &mut self.meta; let cell = self.tracer.inner.get_or_default(); diff --git a/core/lib/zksync_core/src/api_server/web3/backend_jsonrpsee/middleware.rs b/core/lib/zksync_core/src/api_server/web3/backend_jsonrpsee/middleware.rs index a2d1a1069742..ad6616e37bfb 100644 --- a/core/lib/zksync_core/src/api_server/web3/backend_jsonrpsee/middleware.rs +++ b/core/lib/zksync_core/src/api_server/web3/backend_jsonrpsee/middleware.rs @@ -26,7 +26,10 @@ use zksync_web3_decl::jsonrpsee::{ MethodResponse, }; -use super::metadata::{MethodCall, MethodTracer}; +use super::{ + metadata::{MethodCall, MethodTracer}, + RawParamsWithBorrow, +}; use crate::api_server::web3::metrics::API_METRICS; #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EncodeLabelValue, EncodeLabelSet)] @@ -134,9 +137,9 @@ impl<'a, S> RpcServiceT<'a> for MetadataMiddleware where S: Send + Sync + RpcServiceT<'a>, { - type Future = WithMethodCall; + type Future = WithMethodCall<'a, S::Future>; - fn call(&self, request: Request<'a>) -> Self::Future { + fn call(&self, mut request: Request<'a>) -> Self::Future { // "Normalize" the method name by searching it in the set of all registered methods. This extends the lifetime // of the name to `'static` and maps unknown methods to "", so that method name metric labels don't have unlimited cardinality. let method_name = self @@ -145,23 +148,34 @@ where .copied() .unwrap_or(""); + let original_params = unsafe { + // SAFETY: as per `BorrowedRawParams` contract, `original_params` outlive `request.params`: + // + // - `request` is sent to `self.inner.call(_)` and lives at most as long as the returned `Future` + // (i.e., `WithMethodCall.inner`) + // - `original_params` is a part of `call` and is thus dropped after `WithMethodCall.inner` + // (fields in structs are dropped in the declaration order) + RawParamsWithBorrow::new(&mut request.params) + }; + let call = self.method_tracer.new_call(method_name, original_params); + WithMethodCall { - call: self.method_tracer.new_call(method_name), inner: self.inner.call(request), + call, } } } pin_project! { #[derive(Debug)] - pub(crate) struct WithMethodCall { - call: MethodCall, + pub(crate) struct WithMethodCall<'a, F> { #[pin] inner: F, + call: MethodCall<'a>, } } -impl> Future for WithMethodCall { +impl> Future for WithMethodCall<'_, F> { type Output = MethodResponse; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { @@ -294,7 +308,7 @@ mod tests { }; WithMethodCall { - call: method_tracer.new_call("test"), + call: method_tracer.new_call("test", RawParamsWithBorrow(None)), inner, } }); diff --git a/core/lib/zksync_core/src/api_server/web3/backend_jsonrpsee/mod.rs b/core/lib/zksync_core/src/api_server/web3/backend_jsonrpsee/mod.rs index ec9228b15210..01b9d28edc5f 100644 --- a/core/lib/zksync_core/src/api_server/web3/backend_jsonrpsee/mod.rs +++ b/core/lib/zksync_core/src/api_server/web3/backend_jsonrpsee/mod.rs @@ -2,6 +2,8 @@ //! Consists mostly of boilerplate code implementing the `jsonrpsee` server traits for the corresponding //! namespace structures defined in `zksync_core`. +use std::{borrow::Cow, fmt, mem}; + use zksync_web3_decl::{ error::Web3Error, jsonrpsee::types::{error::ErrorCode, ErrorObjectOwned}, @@ -19,6 +21,54 @@ pub mod namespaces; #[cfg(test)] pub(crate) mod testonly; +type RawParams<'a> = Option>; + +/// Version of `RawParams` (i.e., params of a JSON-RPC request) that is statically known to provide a borrow +/// to another `RawParams` with the same lifetime. +/// +/// # Why? +/// +/// We need all this complexity because we'd like to access request params in a generic way without an overhead +/// (i.e., cloning strings for each request – which would hit performance and fragment the heap). +/// One could think that `jsonrpsee` should borrow params by default; +/// if that were the case, we could opportunistically expect `RawParams` to be `None` or `Some(Cow::Borrowed(_))` +/// and just copy `&RawValue` in the latter case. In practice, `RawParams` are *never* `Some(Cow::Borrowed(_))` +/// because of a bug (?) with deserializing `Cow<'_, RawValue>`: https://github.com/serde-rs/json/issues/1076 +struct RawParamsWithBorrow<'a>(RawParams<'a>); + +impl fmt::Debug for RawParamsWithBorrow<'_> { + fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result { + formatter + .debug_tuple("RawParamsWithBorrow") + .field(&self.get()) + .finish() + } +} + +impl<'a> RawParamsWithBorrow<'a> { + /// SAFETY: the returned params must outlive `raw_params`. + unsafe fn new(raw_params: &mut RawParams<'a>) -> Self { + Self(match &*raw_params { + None => None, + Some(Cow::Borrowed(raw_value)) => Some(Cow::Borrowed(*raw_value)), + Some(Cow::Owned(raw_value)) => { + let raw_value_ref: &serde_json::value::RawValue = raw_value; + // SAFETY: We extend the lifetime to 'a. This is only safe under the following conditions: + // + // - The reference points to a stable memory location (it is; `raw_value` is `&Box`, i.e., heap-allocated) + // - `raw_value` outlives the reference (guaranteed by the method contract) + // - `raw_value` is never mutated or provides a mutable reference (guaranteed by the `BorrowingRawParams` API) + let raw_value_ref: &'a serde_json::value::RawValue = mem::transmute(raw_value_ref); + mem::replace(raw_params, Some(Cow::Borrowed(raw_value_ref))) + } + }) + } + + fn get(&self) -> Option<&str> { + self.0.as_deref().map(serde_json::value::RawValue::get) + } +} + impl MethodTracer { pub(crate) fn map_err(&self, err: Web3Error) -> ErrorObjectOwned { self.observe_error(&err); From 880ad5ac5631fec7f3003e0a294e072c931553be Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Fri, 12 Apr 2024 14:12:59 +0300 Subject: [PATCH 02/10] Refactor `Web3Error` observation --- .../web3/backend_jsonrpsee/metadata.rs | 27 ++-- .../src/api_server/web3/metrics.rs | 128 ++++++++++++++++-- .../src/api_server/web3/tests/mod.rs | 6 +- 3 files changed, 132 insertions(+), 29 deletions(-) diff --git a/core/lib/zksync_core/src/api_server/web3/backend_jsonrpsee/metadata.rs b/core/lib/zksync_core/src/api_server/web3/backend_jsonrpsee/metadata.rs index 6e07dbfc3a69..b069c644794d 100644 --- a/core/lib/zksync_core/src/api_server/web3/backend_jsonrpsee/metadata.rs +++ b/core/lib/zksync_core/src/api_server/web3/backend_jsonrpsee/metadata.rs @@ -12,7 +12,7 @@ use zksync_web3_decl::{ #[cfg(test)] use super::testonly::RecordedMethodCalls; use super::RawParamsWithBorrow; -use crate::api_server::web3::metrics::API_METRICS; +use crate::api_server::web3::metrics::{ObservedWeb3Error, API_METRICS}; /// Metadata assigned to a JSON-RPC method call. #[derive(Debug, Clone)] @@ -23,8 +23,8 @@ pub(crate) struct MethodMetadata { pub block_id: Option, /// Difference between the latest block number and the requested block ID. pub block_diff: Option, - /// Did this call return an app-level error? - pub has_app_error: bool, + /// Information about an app-level error, if any. + pub app_error: Option, } impl MethodMetadata { @@ -34,7 +34,7 @@ impl MethodMetadata { started_at: Instant::now(), block_id: None, block_diff: None, - has_app_error: false, + app_error: None, } } } @@ -105,8 +105,7 @@ impl MethodTracer { pub(super) fn observe_error(&self, err: &Web3Error) { let cell = self.inner.get_or_default(); if let Some(metadata) = &mut *cell.borrow_mut() { - API_METRICS.observe_web3_error(metadata.name, err); - metadata.has_app_error = true; + metadata.app_error = Some(ObservedWeb3Error::new(err)); } } } @@ -133,9 +132,9 @@ pub(super) struct MethodCall<'a> { impl Drop for MethodCall<'_> { fn drop(&mut self) { - dbg!(&self.meta, self.raw_params.get()); if !self.is_completed { - API_METRICS.observe_dropped_call(&self.meta); + let raw_params = self.raw_params.get().unwrap_or("[]"); + API_METRICS.observe_dropped_call(&self.meta, raw_params); } } } @@ -155,15 +154,21 @@ impl MethodCall<'_> { pub(super) fn observe_response(&mut self, response: &MethodResponse) { self.is_completed = true; let meta = &self.meta; + let raw_params = self.raw_params.get().unwrap_or("[]"); match response.success_or_error { MethodResponseResult::Success => { - API_METRICS.observe_response_size(meta.name, response.result.len()); + API_METRICS.observe_response_size(meta.name, raw_params, response.result.len()); } MethodResponseResult::Failed(error_code) => { - API_METRICS.observe_protocol_error(meta.name, error_code, meta.has_app_error); + API_METRICS.observe_protocol_error( + meta.name, + raw_params, + error_code, + meta.app_error.as_ref(), + ); } } - API_METRICS.observe_latency(meta); + API_METRICS.observe_latency(meta, raw_params); #[cfg(test)] self.tracer.recorder.observe_response(meta, response); } diff --git a/core/lib/zksync_core/src/api_server/web3/metrics.rs b/core/lib/zksync_core/src/api_server/web3/metrics.rs index 02c42f4589d2..75dd5820e028 100644 --- a/core/lib/zksync_core/src/api_server/web3/metrics.rs +++ b/core/lib/zksync_core/src/api_server/web3/metrics.rs @@ -1,6 +1,10 @@ //! Metrics for the JSON-RPC server. -use std::{fmt, time::Duration}; +use std::{ + fmt, + sync::{Mutex, PoisonError}, + time::{Duration, Instant}, +}; use vise::{ Buckets, Counter, DurationAsSecs, EncodeLabelSet, EncodeLabelValue, Family, Gauge, Histogram, @@ -14,6 +18,37 @@ use super::{ TypedFilter, }; +/// Allows filtering events (e.g., for logging) so that they are reported no more frequently than with a configurable interval. +#[derive(Debug)] +struct ReportFilter { + interval: Duration, + last_timestamp: Mutex>, +} + +impl ReportFilter { + const fn new(interval: Duration) -> Self { + Self { + interval, + last_timestamp: Mutex::new(None), + } + } + + /// Should be called sparingly, since it involves moderately heavy operations (locking a mutex and getting current time). + fn should_report(&self) -> bool { + let now = Instant::now(); + let mut timestamp = self + .last_timestamp + .lock() + .unwrap_or_else(PoisonError::into_inner); + if timestamp.map_or(true, |ts| now - ts > self.interval) { + *timestamp = Some(now); + true + } else { + false + } + } +} + #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EncodeLabelValue, EncodeLabelSet)] #[metrics(label = "scheme", rename_all = "UPPERCASE")] pub(in crate::api_server) enum ApiTransportLabel { @@ -125,6 +160,30 @@ impl Web3ErrorKind { } } +/// Information about a [`Web3Error`] observed via metrics / logs. +// Necessary to persist `Web3Error` information before the end of the method, in order to be able +// to have call params available. +#[derive(Debug, Clone)] +pub(crate) struct ObservedWeb3Error { + kind: Web3ErrorKind, + // Error message produced using `ToString`. Only set for error kinds that we log (for now: internal errors + // and main node proxy errors). + message: Option, +} + +impl ObservedWeb3Error { + pub fn new(err: &Web3Error) -> Self { + Self { + kind: Web3ErrorKind::new(err), + message: match err { + Web3Error::InternalError(err) => Some(err.to_string()), + Web3Error::ProxyError(err) => Some(err.to_string()), + _ => None, + }, + } + } +} + #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EncodeLabelValue)] #[metrics(rename_all = "snake_case")] enum ProtocolErrorOrigin { @@ -231,30 +290,61 @@ impl ApiMetrics { } /// Observes latency of a finished RPC call. - pub fn observe_latency(&self, meta: &MethodMetadata) { + pub fn observe_latency(&self, meta: &MethodMetadata, raw_params: &str) { + static FILTER: ReportFilter = ReportFilter::new(Duration::from_secs(10)); + const MIN_REPORTED_LATENCY: Duration = Duration::from_secs(5); + let latency = meta.started_at.elapsed(); self.web3_call[&MethodLabels::from(meta)].observe(latency); if let Some(block_diff) = meta.block_diff { self.web3_call_block_diff[&meta.name].observe(block_diff.into()); } + if latency >= MIN_REPORTED_LATENCY && FILTER.should_report() { + tracing::info!( + "Long call to `{}` with params {raw_params}: {latency:?}", + meta.name + ); + } } /// Observes latency of a dropped RPC call. - pub fn observe_dropped_call(&self, meta: &MethodMetadata) { + pub fn observe_dropped_call(&self, meta: &MethodMetadata, raw_params: &str) { + static FILTER: ReportFilter = ReportFilter::new(Duration::from_secs(10)); + let latency = meta.started_at.elapsed(); self.web3_dropped_call_latency[&MethodLabels::from(meta)].observe(latency); + if FILTER.should_report() { + tracing::info!( + "Call to `{}` with params {raw_params} was dropped by client after {latency:?}", + meta.name + ); + } } /// Observes serialized size of a response. - pub fn observe_response_size(&self, method: &'static str, size: usize) { + pub fn observe_response_size(&self, method: &'static str, raw_params: &str, size: usize) { + static FILTER: ReportFilter = ReportFilter::new(Duration::from_secs(10)); + const MIN_REPORTED_SIZE: usize = 10 * 1_024 * 1_024; // 10 MiB + self.web3_call_response_size[&method].observe(size); + if size >= MIN_REPORTED_SIZE && FILTER.should_report() { + tracing::info!( + "Call to `{method}` with params {raw_params} has resulted in large response: {size}B" + ); + } } - pub fn observe_protocol_error(&self, method: &'static str, error_code: i32, app_error: bool) { + pub fn observe_protocol_error( + &self, + method: &'static str, + raw_params: &str, + error_code: i32, + app_error: Option<&ObservedWeb3Error>, + ) { let labels = ProtocolErrorLabels { method, error_code, - origin: if app_error { + origin: if app_error.is_some() { ProtocolErrorOrigin::App } else { ProtocolErrorOrigin::Framework @@ -267,31 +357,39 @@ impl ApiMetrics { origin, } = &labels; tracing::info!( - "Observed new error code for method `{method}`: {error_code}, origin: {origin:?}" + "Observed new error code for method `{method}`, with params {raw_params}: {error_code}, origin: {origin:?}" ); } + + if let Some(err) = app_error { + self.observe_web3_error(method, raw_params, err); + } } - pub fn observe_web3_error(&self, method: &'static str, err: &Web3Error) { + fn observe_web3_error(&self, method: &'static str, raw_params: &str, err: &ObservedWeb3Error) { // Log internal error details. - match err { - Web3Error::InternalError(err) => { - tracing::error!("Internal error in method `{method}`: {err}"); + match err.kind { + Web3ErrorKind::Internal => { + let message = err.message.as_deref().unwrap_or(""); + tracing::error!( + "Internal error in method `{method}` with params {raw_params}: {message}" + ); } - Web3Error::ProxyError(err) => { - tracing::warn!("Error proxying call to main node in method `{method}`: {err}"); + Web3ErrorKind::Proxy => { + let message = err.message.as_deref().unwrap_or(""); + tracing::warn!("Error proxying call to main node in method `{method}` with params {raw_params}: {message}"); } _ => { /* do nothing */ } } let labels = Web3ErrorLabels { method, - kind: Web3ErrorKind::new(err), + kind: err.kind, }; if self.web3_errors[&labels].inc() == 0 { // Only log the first error with the label to not spam logs. tracing::info!( - "Observed new error type for method `{}`: {:?}", + "Observed new error type for method `{}` with params {raw_params}: {:?}", labels.method, labels.kind ); diff --git a/core/lib/zksync_core/src/api_server/web3/tests/mod.rs b/core/lib/zksync_core/src/api_server/web3/tests/mod.rs index dea40651717f..bd01f6eb25a7 100644 --- a/core/lib/zksync_core/src/api_server/web3/tests/mod.rs +++ b/core/lib/zksync_core/src/api_server/web3/tests/mod.rs @@ -1024,7 +1024,7 @@ impl HttpTest for RpcCallsTracingTest { calls[0].response.as_error_code(), Some(ErrorCode::MethodNotFound.code()) ); - assert!(!calls[0].metadata.has_app_error); + assert!(calls[0].metadata.app_error.is_none()); client .request::("eth_getBlockByNumber", jsonrpsee::rpc_params![0]) @@ -1037,7 +1037,7 @@ impl HttpTest for RpcCallsTracingTest { calls[0].response.as_error_code(), Some(ErrorCode::InvalidParams.code()) ); - assert!(!calls[0].metadata.has_app_error); + assert!(calls[0].metadata.app_error.is_none()); // Check app-level error. client @@ -1054,7 +1054,7 @@ impl HttpTest for RpcCallsTracingTest { calls[0].response.as_error_code(), Some(ErrorCode::InvalidParams.code()) ); - assert!(calls[0].metadata.has_app_error); + assert!(calls[0].metadata.app_error.is_some()); // Check batch RPC request. let mut batch = BatchRequestBuilder::new(); From 1fcc2428afef4ef2249f69e87609b15605c5ddcf Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Mon, 15 Apr 2024 15:12:28 +0300 Subject: [PATCH 03/10] Revert Web3 error handling ...and refactor filtering to use thread-local storage --- .../web3/backend_jsonrpsee/metadata.rs | 13 ++- .../web3/backend_jsonrpsee/middleware.rs | 34 ++++-- .../src/api_server/web3/metrics.rs | 103 +++++++----------- .../src/api_server/web3/tests/mod.rs | 6 +- 4 files changed, 76 insertions(+), 80 deletions(-) diff --git a/core/lib/zksync_core/src/api_server/web3/backend_jsonrpsee/metadata.rs b/core/lib/zksync_core/src/api_server/web3/backend_jsonrpsee/metadata.rs index b069c644794d..0299b481bf25 100644 --- a/core/lib/zksync_core/src/api_server/web3/backend_jsonrpsee/metadata.rs +++ b/core/lib/zksync_core/src/api_server/web3/backend_jsonrpsee/metadata.rs @@ -12,7 +12,7 @@ use zksync_web3_decl::{ #[cfg(test)] use super::testonly::RecordedMethodCalls; use super::RawParamsWithBorrow; -use crate::api_server::web3::metrics::{ObservedWeb3Error, API_METRICS}; +use crate::api_server::web3::metrics::API_METRICS; /// Metadata assigned to a JSON-RPC method call. #[derive(Debug, Clone)] @@ -23,8 +23,8 @@ pub(crate) struct MethodMetadata { pub block_id: Option, /// Difference between the latest block number and the requested block ID. pub block_diff: Option, - /// Information about an app-level error, if any. - pub app_error: Option, + /// Did this call return an app-level error? + pub has_app_error: bool, } impl MethodMetadata { @@ -34,7 +34,7 @@ impl MethodMetadata { started_at: Instant::now(), block_id: None, block_diff: None, - app_error: None, + has_app_error: false, } } } @@ -105,7 +105,8 @@ impl MethodTracer { pub(super) fn observe_error(&self, err: &Web3Error) { let cell = self.inner.get_or_default(); if let Some(metadata) = &mut *cell.borrow_mut() { - metadata.app_error = Some(ObservedWeb3Error::new(err)); + API_METRICS.observe_web3_error(metadata.name, err); + metadata.has_app_error = true; } } } @@ -164,7 +165,7 @@ impl MethodCall<'_> { meta.name, raw_params, error_code, - meta.app_error.as_ref(), + meta.has_app_error, ); } } diff --git a/core/lib/zksync_core/src/api_server/web3/backend_jsonrpsee/middleware.rs b/core/lib/zksync_core/src/api_server/web3/backend_jsonrpsee/middleware.rs index ad6616e37bfb..36f793007055 100644 --- a/core/lib/zksync_core/src/api_server/web3/backend_jsonrpsee/middleware.rs +++ b/core/lib/zksync_core/src/api_server/web3/backend_jsonrpsee/middleware.rs @@ -1,4 +1,5 @@ use std::{ + cell::RefCell, collections::HashSet, future::Future, num::NonZeroU32, @@ -16,6 +17,7 @@ use governor::{ }; use once_cell::sync::OnceCell; use pin_project_lite::pin_project; +use rand::{rngs::SmallRng, RngCore, SeedableRng}; use tokio::sync::watch; use vise::{ Buckets, Counter, EncodeLabelSet, EncodeLabelValue, Family, GaugeGuard, Histogram, Metrics, @@ -158,11 +160,7 @@ where RawParamsWithBorrow::new(&mut request.params) }; let call = self.method_tracer.new_call(method_name, original_params); - - WithMethodCall { - inner: self.inner.call(request), - call, - } + WithMethodCall::new(self.inner.call(request), call) } } @@ -172,6 +170,18 @@ pin_project! { #[pin] inner: F, call: MethodCall<'a>, + call_span: tracing::Span, + } +} + +impl<'a, F> WithMethodCall<'a, F> { + fn new(inner: F, call: MethodCall<'a>) -> Self { + let call_span = tracing::debug_span!("rpc_call", trace_id = generate_trace_id()); + Self { + inner, + call, + call_span, + } } } @@ -180,6 +190,7 @@ impl> Future for WithMethodCall<'_, F> { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let projection = self.project(); + let _span_guard = projection.call_span.enter(); let guard = projection.call.set_as_current(); match projection.inner.poll(cx) { Poll::Pending => Poll::Pending, @@ -264,6 +275,13 @@ where } } +fn generate_trace_id() -> u64 { + thread_local! { + static TRACE_ID_RNG: RefCell = RefCell::new(SmallRng::from_entropy()); + } + TRACE_ID_RNG.with(|rng| rng.borrow_mut().next_u64()) +} + #[cfg(test)] mod tests { use std::time::Duration; @@ -307,10 +325,10 @@ mod tests { } }; - WithMethodCall { - call: method_tracer.new_call("test", RawParamsWithBorrow(None)), + WithMethodCall::new( inner, - } + method_tracer.new_call("test", RawParamsWithBorrow(None)), + ) }); if spawn_tasks { diff --git a/core/lib/zksync_core/src/api_server/web3/metrics.rs b/core/lib/zksync_core/src/api_server/web3/metrics.rs index 75dd5820e028..3f864a30c526 100644 --- a/core/lib/zksync_core/src/api_server/web3/metrics.rs +++ b/core/lib/zksync_core/src/api_server/web3/metrics.rs @@ -1,8 +1,8 @@ //! Metrics for the JSON-RPC server. use std::{ - fmt, - sync::{Mutex, PoisonError}, + cell::Cell, + fmt, thread, time::{Duration, Instant}, }; @@ -19,29 +19,23 @@ use super::{ }; /// Allows filtering events (e.g., for logging) so that they are reported no more frequently than with a configurable interval. +/// +/// Current implementation uses thread-local vars in order to not rely on mutexes or other cross-thread primitives. +/// I.e., it only really works if the number of threads accessing it is limited (which is the case for the API server; +/// the number of worker threads is congruent to the number of CPUs). #[derive(Debug)] struct ReportFilter { interval: Duration, - last_timestamp: Mutex>, + last_timestamp: &'static thread::LocalKey>>, } impl ReportFilter { - const fn new(interval: Duration) -> Self { - Self { - interval, - last_timestamp: Mutex::new(None), - } - } - - /// Should be called sparingly, since it involves moderately heavy operations (locking a mutex and getting current time). + /// Should be called sparingly, since it involves moderately heavy operations (getting current time). fn should_report(&self) -> bool { + let timestamp = self.last_timestamp.get(); let now = Instant::now(); - let mut timestamp = self - .last_timestamp - .lock() - .unwrap_or_else(PoisonError::into_inner); if timestamp.map_or(true, |ts| now - ts > self.interval) { - *timestamp = Some(now); + self.last_timestamp.set(Some(now)); true } else { false @@ -49,6 +43,19 @@ impl ReportFilter { } } +/// Creates a new filter with the specified reporting interval *per thread*. +macro_rules! report_filter { + ($interval:expr) => {{ + thread_local! { + static LAST_TIMESTAMP: Cell> = Cell::new(None); + } + ReportFilter { + interval: $interval, + last_timestamp: &LAST_TIMESTAMP, + } + }}; +} + #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EncodeLabelValue, EncodeLabelSet)] #[metrics(label = "scheme", rename_all = "UPPERCASE")] pub(in crate::api_server) enum ApiTransportLabel { @@ -160,30 +167,6 @@ impl Web3ErrorKind { } } -/// Information about a [`Web3Error`] observed via metrics / logs. -// Necessary to persist `Web3Error` information before the end of the method, in order to be able -// to have call params available. -#[derive(Debug, Clone)] -pub(crate) struct ObservedWeb3Error { - kind: Web3ErrorKind, - // Error message produced using `ToString`. Only set for error kinds that we log (for now: internal errors - // and main node proxy errors). - message: Option, -} - -impl ObservedWeb3Error { - pub fn new(err: &Web3Error) -> Self { - Self { - kind: Web3ErrorKind::new(err), - message: match err { - Web3Error::InternalError(err) => Some(err.to_string()), - Web3Error::ProxyError(err) => Some(err.to_string()), - _ => None, - }, - } - } -} - #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EncodeLabelValue)] #[metrics(rename_all = "snake_case")] enum ProtocolErrorOrigin { @@ -291,7 +274,7 @@ impl ApiMetrics { /// Observes latency of a finished RPC call. pub fn observe_latency(&self, meta: &MethodMetadata, raw_params: &str) { - static FILTER: ReportFilter = ReportFilter::new(Duration::from_secs(10)); + static FILTER: ReportFilter = report_filter!(Duration::from_secs(1)); const MIN_REPORTED_LATENCY: Duration = Duration::from_secs(5); let latency = meta.started_at.elapsed(); @@ -309,7 +292,7 @@ impl ApiMetrics { /// Observes latency of a dropped RPC call. pub fn observe_dropped_call(&self, meta: &MethodMetadata, raw_params: &str) { - static FILTER: ReportFilter = ReportFilter::new(Duration::from_secs(10)); + static FILTER: ReportFilter = report_filter!(Duration::from_secs(1)); let latency = meta.started_at.elapsed(); self.web3_dropped_call_latency[&MethodLabels::from(meta)].observe(latency); @@ -323,7 +306,7 @@ impl ApiMetrics { /// Observes serialized size of a response. pub fn observe_response_size(&self, method: &'static str, raw_params: &str, size: usize) { - static FILTER: ReportFilter = ReportFilter::new(Duration::from_secs(10)); + static FILTER: ReportFilter = report_filter!(Duration::from_secs(1)); const MIN_REPORTED_SIZE: usize = 10 * 1_024 * 1_024; // 10 MiB self.web3_call_response_size[&method].observe(size); @@ -339,57 +322,51 @@ impl ApiMetrics { method: &'static str, raw_params: &str, error_code: i32, - app_error: Option<&ObservedWeb3Error>, + has_app_error: bool, ) { + static FILTER: ReportFilter = report_filter!(Duration::from_millis(100)); + let labels = ProtocolErrorLabels { method, error_code, - origin: if app_error.is_some() { + origin: if has_app_error { ProtocolErrorOrigin::App } else { ProtocolErrorOrigin::Framework }, }; - if self.web3_rpc_errors[&labels].inc() == 0 { + if self.web3_rpc_errors[&labels].inc() == 0 || FILTER.should_report() { let ProtocolErrorLabels { method, error_code, origin, } = &labels; tracing::info!( - "Observed new error code for method `{method}`, with params {raw_params}: {error_code}, origin: {origin:?}" + "Observed error code {error_code} (origin: {origin:?}) for method `{method}`, with params {raw_params}" ); } - - if let Some(err) = app_error { - self.observe_web3_error(method, raw_params, err); - } } - fn observe_web3_error(&self, method: &'static str, raw_params: &str, err: &ObservedWeb3Error) { + pub fn observe_web3_error(&self, method: &'static str, err: &Web3Error) { // Log internal error details. - match err.kind { - Web3ErrorKind::Internal => { - let message = err.message.as_deref().unwrap_or(""); - tracing::error!( - "Internal error in method `{method}` with params {raw_params}: {message}" - ); + match err { + Web3Error::InternalError(err) => { + tracing::error!("Internal error in method `{method}`: {err}"); } - Web3ErrorKind::Proxy => { - let message = err.message.as_deref().unwrap_or(""); - tracing::warn!("Error proxying call to main node in method `{method}` with params {raw_params}: {message}"); + Web3Error::ProxyError(err) => { + tracing::warn!("Error proxying call to main node in method `{method}`: {err}"); } _ => { /* do nothing */ } } let labels = Web3ErrorLabels { method, - kind: err.kind, + kind: Web3ErrorKind::new(err), }; if self.web3_errors[&labels].inc() == 0 { // Only log the first error with the label to not spam logs. tracing::info!( - "Observed new error type for method `{}` with params {raw_params}: {:?}", + "Observed new error type for method `{}`: {:?}", labels.method, labels.kind ); diff --git a/core/lib/zksync_core/src/api_server/web3/tests/mod.rs b/core/lib/zksync_core/src/api_server/web3/tests/mod.rs index bd01f6eb25a7..dea40651717f 100644 --- a/core/lib/zksync_core/src/api_server/web3/tests/mod.rs +++ b/core/lib/zksync_core/src/api_server/web3/tests/mod.rs @@ -1024,7 +1024,7 @@ impl HttpTest for RpcCallsTracingTest { calls[0].response.as_error_code(), Some(ErrorCode::MethodNotFound.code()) ); - assert!(calls[0].metadata.app_error.is_none()); + assert!(!calls[0].metadata.has_app_error); client .request::("eth_getBlockByNumber", jsonrpsee::rpc_params![0]) @@ -1037,7 +1037,7 @@ impl HttpTest for RpcCallsTracingTest { calls[0].response.as_error_code(), Some(ErrorCode::InvalidParams.code()) ); - assert!(calls[0].metadata.app_error.is_none()); + assert!(!calls[0].metadata.has_app_error); // Check app-level error. client @@ -1054,7 +1054,7 @@ impl HttpTest for RpcCallsTracingTest { calls[0].response.as_error_code(), Some(ErrorCode::InvalidParams.code()) ); - assert!(calls[0].metadata.app_error.is_some()); + assert!(calls[0].metadata.has_app_error); // Check batch RPC request. let mut batch = BatchRequestBuilder::new(); From 61f1ebc1875373336ad88bab812c48677a7c3062 Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Tue, 16 Apr 2024 10:48:14 +0300 Subject: [PATCH 04/10] Rework `ObservedRpcParams` --- .../web3/backend_jsonrpsee/metadata.rs | 20 ++--- .../web3/backend_jsonrpsee/middleware.rs | 23 ++--- .../api_server/web3/backend_jsonrpsee/mod.rs | 50 ----------- .../src/api_server/web3/metrics.rs | 87 +++++++++++++++++-- 4 files changed, 95 insertions(+), 85 deletions(-) diff --git a/core/lib/zksync_core/src/api_server/web3/backend_jsonrpsee/metadata.rs b/core/lib/zksync_core/src/api_server/web3/backend_jsonrpsee/metadata.rs index 0299b481bf25..6fb1975bb242 100644 --- a/core/lib/zksync_core/src/api_server/web3/backend_jsonrpsee/metadata.rs +++ b/core/lib/zksync_core/src/api_server/web3/backend_jsonrpsee/metadata.rs @@ -11,8 +11,7 @@ use zksync_web3_decl::{ #[cfg(test)] use super::testonly::RecordedMethodCalls; -use super::RawParamsWithBorrow; -use crate::api_server::web3::metrics::API_METRICS; +use crate::api_server::web3::metrics::{ObservedRpcParams, API_METRICS}; /// Metadata assigned to a JSON-RPC method call. #[derive(Debug, Clone)] @@ -92,11 +91,11 @@ impl MethodTracer { pub(super) fn new_call<'a>( self: &Arc, name: &'static str, - raw_params: RawParamsWithBorrow<'a>, + raw_params: ObservedRpcParams<'a>, ) -> MethodCall<'a> { MethodCall { tracer: self.clone(), - raw_params, + params: raw_params, meta: MethodMetadata::new(name), is_completed: false, } @@ -127,15 +126,14 @@ impl MethodTracer { pub(super) struct MethodCall<'a> { tracer: Arc, meta: MethodMetadata, - raw_params: RawParamsWithBorrow<'a>, + params: ObservedRpcParams<'a>, is_completed: bool, } impl Drop for MethodCall<'_> { fn drop(&mut self) { if !self.is_completed { - let raw_params = self.raw_params.get().unwrap_or("[]"); - API_METRICS.observe_dropped_call(&self.meta, raw_params); + API_METRICS.observe_dropped_call(&self.meta, &self.params); } } } @@ -155,21 +153,21 @@ impl MethodCall<'_> { pub(super) fn observe_response(&mut self, response: &MethodResponse) { self.is_completed = true; let meta = &self.meta; - let raw_params = self.raw_params.get().unwrap_or("[]"); + let params = &self.params; match response.success_or_error { MethodResponseResult::Success => { - API_METRICS.observe_response_size(meta.name, raw_params, response.result.len()); + API_METRICS.observe_response_size(meta.name, params, response.result.len()); } MethodResponseResult::Failed(error_code) => { API_METRICS.observe_protocol_error( meta.name, - raw_params, + params, error_code, meta.has_app_error, ); } } - API_METRICS.observe_latency(meta, raw_params); + API_METRICS.observe_latency(meta, params); #[cfg(test)] self.tracer.recorder.observe_response(meta, response); } diff --git a/core/lib/zksync_core/src/api_server/web3/backend_jsonrpsee/middleware.rs b/core/lib/zksync_core/src/api_server/web3/backend_jsonrpsee/middleware.rs index 36f793007055..7cd9aa826237 100644 --- a/core/lib/zksync_core/src/api_server/web3/backend_jsonrpsee/middleware.rs +++ b/core/lib/zksync_core/src/api_server/web3/backend_jsonrpsee/middleware.rs @@ -28,11 +28,8 @@ use zksync_web3_decl::jsonrpsee::{ MethodResponse, }; -use super::{ - metadata::{MethodCall, MethodTracer}, - RawParamsWithBorrow, -}; -use crate::api_server::web3::metrics::API_METRICS; +use super::metadata::{MethodCall, MethodTracer}; +use crate::api_server::web3::metrics::{ObservedRpcParams, API_METRICS}; #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EncodeLabelValue, EncodeLabelSet)] #[metrics(label = "transport", rename_all = "snake_case")] @@ -141,7 +138,7 @@ where { type Future = WithMethodCall<'a, S::Future>; - fn call(&self, mut request: Request<'a>) -> Self::Future { + fn call(&self, request: Request<'a>) -> Self::Future { // "Normalize" the method name by searching it in the set of all registered methods. This extends the lifetime // of the name to `'static` and maps unknown methods to "", so that method name metric labels don't have unlimited cardinality. let method_name = self @@ -150,16 +147,8 @@ where .copied() .unwrap_or(""); - let original_params = unsafe { - // SAFETY: as per `BorrowedRawParams` contract, `original_params` outlive `request.params`: - // - // - `request` is sent to `self.inner.call(_)` and lives at most as long as the returned `Future` - // (i.e., `WithMethodCall.inner`) - // - `original_params` is a part of `call` and is thus dropped after `WithMethodCall.inner` - // (fields in structs are dropped in the declaration order) - RawParamsWithBorrow::new(&mut request.params) - }; - let call = self.method_tracer.new_call(method_name, original_params); + let observed_params = ObservedRpcParams::new(request.params.as_ref()); + let call = self.method_tracer.new_call(method_name, observed_params); WithMethodCall::new(self.inner.call(request), call) } } @@ -327,7 +316,7 @@ mod tests { WithMethodCall::new( inner, - method_tracer.new_call("test", RawParamsWithBorrow(None)), + method_tracer.new_call("test", ObservedRpcParams::None), ) }); diff --git a/core/lib/zksync_core/src/api_server/web3/backend_jsonrpsee/mod.rs b/core/lib/zksync_core/src/api_server/web3/backend_jsonrpsee/mod.rs index 01b9d28edc5f..ec9228b15210 100644 --- a/core/lib/zksync_core/src/api_server/web3/backend_jsonrpsee/mod.rs +++ b/core/lib/zksync_core/src/api_server/web3/backend_jsonrpsee/mod.rs @@ -2,8 +2,6 @@ //! Consists mostly of boilerplate code implementing the `jsonrpsee` server traits for the corresponding //! namespace structures defined in `zksync_core`. -use std::{borrow::Cow, fmt, mem}; - use zksync_web3_decl::{ error::Web3Error, jsonrpsee::types::{error::ErrorCode, ErrorObjectOwned}, @@ -21,54 +19,6 @@ pub mod namespaces; #[cfg(test)] pub(crate) mod testonly; -type RawParams<'a> = Option>; - -/// Version of `RawParams` (i.e., params of a JSON-RPC request) that is statically known to provide a borrow -/// to another `RawParams` with the same lifetime. -/// -/// # Why? -/// -/// We need all this complexity because we'd like to access request params in a generic way without an overhead -/// (i.e., cloning strings for each request – which would hit performance and fragment the heap). -/// One could think that `jsonrpsee` should borrow params by default; -/// if that were the case, we could opportunistically expect `RawParams` to be `None` or `Some(Cow::Borrowed(_))` -/// and just copy `&RawValue` in the latter case. In practice, `RawParams` are *never* `Some(Cow::Borrowed(_))` -/// because of a bug (?) with deserializing `Cow<'_, RawValue>`: https://github.com/serde-rs/json/issues/1076 -struct RawParamsWithBorrow<'a>(RawParams<'a>); - -impl fmt::Debug for RawParamsWithBorrow<'_> { - fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result { - formatter - .debug_tuple("RawParamsWithBorrow") - .field(&self.get()) - .finish() - } -} - -impl<'a> RawParamsWithBorrow<'a> { - /// SAFETY: the returned params must outlive `raw_params`. - unsafe fn new(raw_params: &mut RawParams<'a>) -> Self { - Self(match &*raw_params { - None => None, - Some(Cow::Borrowed(raw_value)) => Some(Cow::Borrowed(*raw_value)), - Some(Cow::Owned(raw_value)) => { - let raw_value_ref: &serde_json::value::RawValue = raw_value; - // SAFETY: We extend the lifetime to 'a. This is only safe under the following conditions: - // - // - The reference points to a stable memory location (it is; `raw_value` is `&Box`, i.e., heap-allocated) - // - `raw_value` outlives the reference (guaranteed by the method contract) - // - `raw_value` is never mutated or provides a mutable reference (guaranteed by the `BorrowingRawParams` API) - let raw_value_ref: &'a serde_json::value::RawValue = mem::transmute(raw_value_ref); - mem::replace(raw_params, Some(Cow::Borrowed(raw_value_ref))) - } - }) - } - - fn get(&self) -> Option<&str> { - self.0.as_deref().map(serde_json::value::RawValue::get) - } -} - impl MethodTracer { pub(crate) fn map_err(&self, err: Web3Error) -> ErrorObjectOwned { self.observe_error(&err); diff --git a/core/lib/zksync_core/src/api_server/web3/metrics.rs b/core/lib/zksync_core/src/api_server/web3/metrics.rs index 3f864a30c526..8a694b632cdf 100644 --- a/core/lib/zksync_core/src/api_server/web3/metrics.rs +++ b/core/lib/zksync_core/src/api_server/web3/metrics.rs @@ -1,6 +1,7 @@ //! Metrics for the JSON-RPC server. use std::{ + borrow::Cow, cell::Cell, fmt, thread, time::{Duration, Instant}, @@ -56,6 +57,65 @@ macro_rules! report_filter { }}; } +/// Observed version of RPC parameters. Have a bounded upper-limit size (256 bytes), so that we don't over-allocate. +#[derive(Debug)] +pub(super) enum ObservedRpcParams<'a> { + None, + Borrowed(&'a serde_json::value::RawValue), + Owned { start: Box, total_len: usize }, +} + +// FIXME: test +impl fmt::Display for ObservedRpcParams<'_> { + fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result { + let (start, total_len) = match self { + Self::None => return formatter.write_str("[]"), + Self::Borrowed(params) => (Self::maybe_shorten(params), params.get().len()), + Self::Owned { start, total_len } => (start.as_ref(), *total_len), + }; + + if total_len == start.len() { + formatter.write_str(start) + } else { + let skipped_bytes = total_len - start.len(); + // Since params is a JSON array, we add a closing ']' at the end + write!(formatter, "{start} ...({skipped_bytes} bytes skipped)]") + } + } +} + +impl<'a> ObservedRpcParams<'a> { + const MAX_LEN: usize = 256; + + fn maybe_shorten(raw_value: &serde_json::value::RawValue) -> &str { + let raw_str = raw_value.get(); + if raw_str.len() <= Self::MAX_LEN { + raw_str + } else { + // Truncate `params_str` to be no longer than `MAX_LEN`. + let mut pos = Self::MAX_LEN; + while !raw_str.is_char_boundary(pos) { + pos -= 1; // Shouldn't underflow; the char boundary is at most 3 bytes away + } + &raw_str[..pos] + } + } + + pub fn new(raw: Option<&Cow<'a, serde_json::value::RawValue>>) -> Self { + match raw { + None => Self::None, + // In practice, `jsonrpsee` never returns `Some(Cow::Borrowed(_))` because of a `serde` / `serde_json` flaw (?) + // when deserializing `Cow<'_, RawValue`: https://github.com/serde-rs/json/issues/1076. Thus, each `new()` call + // in which params are actually specified will allocate, but this allocation is quite small. + Some(Cow::Borrowed(params)) => Self::Borrowed(params), + Some(Cow::Owned(params)) => Self::Owned { + start: Self::maybe_shorten(params).into(), + total_len: params.get().len(), + }, + } + } +} + #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EncodeLabelValue, EncodeLabelSet)] #[metrics(label = "scheme", rename_all = "UPPERCASE")] pub(in crate::api_server) enum ApiTransportLabel { @@ -273,7 +333,11 @@ impl ApiMetrics { } /// Observes latency of a finished RPC call. - pub fn observe_latency(&self, meta: &MethodMetadata, raw_params: &str) { + pub(super) fn observe_latency( + &self, + meta: &MethodMetadata, + raw_params: &ObservedRpcParams<'_>, + ) { static FILTER: ReportFilter = report_filter!(Duration::from_secs(1)); const MIN_REPORTED_LATENCY: Duration = Duration::from_secs(5); @@ -291,7 +355,11 @@ impl ApiMetrics { } /// Observes latency of a dropped RPC call. - pub fn observe_dropped_call(&self, meta: &MethodMetadata, raw_params: &str) { + pub(super) fn observe_dropped_call( + &self, + meta: &MethodMetadata, + raw_params: &ObservedRpcParams<'_>, + ) { static FILTER: ReportFilter = report_filter!(Duration::from_secs(1)); let latency = meta.started_at.elapsed(); @@ -305,9 +373,14 @@ impl ApiMetrics { } /// Observes serialized size of a response. - pub fn observe_response_size(&self, method: &'static str, raw_params: &str, size: usize) { + pub(super) fn observe_response_size( + &self, + method: &'static str, + raw_params: &ObservedRpcParams<'_>, + size: usize, + ) { static FILTER: ReportFilter = report_filter!(Duration::from_secs(1)); - const MIN_REPORTED_SIZE: usize = 10 * 1_024 * 1_024; // 10 MiB + const MIN_REPORTED_SIZE: usize = 4 * 1_024 * 1_024; // 4 MiB self.web3_call_response_size[&method].observe(size); if size >= MIN_REPORTED_SIZE && FILTER.should_report() { @@ -317,10 +390,10 @@ impl ApiMetrics { } } - pub fn observe_protocol_error( + pub(super) fn observe_protocol_error( &self, method: &'static str, - raw_params: &str, + raw_params: &ObservedRpcParams<'_>, error_code: i32, has_app_error: bool, ) { @@ -347,7 +420,7 @@ impl ApiMetrics { } } - pub fn observe_web3_error(&self, method: &'static str, err: &Web3Error) { + pub(super) fn observe_web3_error(&self, method: &'static str, err: &Web3Error) { // Log internal error details. match err { Web3Error::InternalError(err) => { From ffacb81f9c3ea75f36dcaf35bc6a243dc094c5e9 Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Tue, 16 Apr 2024 11:15:49 +0300 Subject: [PATCH 05/10] Test `ObservedRpcParams` --- .../src/api_server/web3/metrics.rs | 55 ++++++++++++++++++- 1 file changed, 52 insertions(+), 3 deletions(-) diff --git a/core/lib/zksync_core/src/api_server/web3/metrics.rs b/core/lib/zksync_core/src/api_server/web3/metrics.rs index 8a694b632cdf..9f9960131e07 100644 --- a/core/lib/zksync_core/src/api_server/web3/metrics.rs +++ b/core/lib/zksync_core/src/api_server/web3/metrics.rs @@ -65,7 +65,6 @@ pub(super) enum ObservedRpcParams<'a> { Owned { start: Box, total_len: usize }, } -// FIXME: test impl fmt::Display for ObservedRpcParams<'_> { fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result { let (start, total_len) = match self { @@ -77,9 +76,8 @@ impl fmt::Display for ObservedRpcParams<'_> { if total_len == start.len() { formatter.write_str(start) } else { - let skipped_bytes = total_len - start.len(); // Since params is a JSON array, we add a closing ']' at the end - write!(formatter, "{start} ...({skipped_bytes} bytes skipped)]") + write!(formatter, "{start} ...({total_len} bytes)]") } } } @@ -538,3 +536,54 @@ pub(super) struct MempoolCacheMetrics { #[vise::register] pub(super) static MEMPOOL_CACHE_METRICS: vise::Global = vise::Global::new(); + +#[cfg(test)] +mod tests { + use assert_matches::assert_matches; + use serde_json::value::RawValue; + + use super::*; + + #[test] + fn observing_rpc_params() { + let rpc_params = ObservedRpcParams::new(None); + assert_matches!(rpc_params, ObservedRpcParams::None); + assert_eq!(rpc_params.to_string(), "[]"); + + let raw_params = RawValue::from_string(r#"["0x1"]"#.into()).unwrap(); + let rpc_params = ObservedRpcParams::new(Some(&Cow::Borrowed(&raw_params))); + assert_matches!(rpc_params, ObservedRpcParams::Borrowed(_)); + assert_eq!(rpc_params.to_string(), r#"["0x1"]"#); + + let rpc_params = ObservedRpcParams::new(Some(&Cow::Owned(raw_params))); + assert_matches!(rpc_params, ObservedRpcParams::Owned { .. }); + assert_eq!(rpc_params.to_string(), r#"["0x1"]"#); + + let raw_params = [zksync_types::Bytes(vec![0xff; 512])]; + let raw_params = serde_json::value::to_raw_value(&raw_params).unwrap(); + assert_eq!(raw_params.get().len(), 1_030); // 1_024 'f' chars + '0x' + '[]' + '""' + let rpc_params = ObservedRpcParams::new(Some(&Cow::Borrowed(&raw_params))); + assert_matches!(rpc_params, ObservedRpcParams::Borrowed(_)); + let rpc_params_str = rpc_params.to_string(); + assert!( + rpc_params_str.starts_with(r#"["0xffff"#), + "{rpc_params_str}" + ); + assert!( + rpc_params_str.ends_with("ff ...(1030 bytes)]"), + "{rpc_params_str}" + ); + + let rpc_params = ObservedRpcParams::new(Some(&Cow::Owned(raw_params))); + assert_matches!(rpc_params, ObservedRpcParams::Owned { .. }); + let rpc_params_str = rpc_params.to_string(); + assert!( + rpc_params_str.starts_with(r#"["0xffff"#), + "{rpc_params_str}" + ); + assert!( + rpc_params_str.ends_with("ff ...(1030 bytes)]"), + "{rpc_params_str}" + ); + } +} From a9288ef6ffbaf7491ee061e2b67189c3cfc77ae9 Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Tue, 16 Apr 2024 11:20:27 +0300 Subject: [PATCH 06/10] Rename "trace ID" to "correlation ID" for clarity --- .../api_server/web3/backend_jsonrpsee/middleware.rs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/core/lib/zksync_core/src/api_server/web3/backend_jsonrpsee/middleware.rs b/core/lib/zksync_core/src/api_server/web3/backend_jsonrpsee/middleware.rs index 7cd9aa826237..3cdf84862ecd 100644 --- a/core/lib/zksync_core/src/api_server/web3/backend_jsonrpsee/middleware.rs +++ b/core/lib/zksync_core/src/api_server/web3/backend_jsonrpsee/middleware.rs @@ -165,7 +165,10 @@ pin_project! { impl<'a, F> WithMethodCall<'a, F> { fn new(inner: F, call: MethodCall<'a>) -> Self { - let call_span = tracing::debug_span!("rpc_call", trace_id = generate_trace_id()); + // Wrap a call into a span with unique correlation ID, so that events occurring in the span can be easily filtered. + // This works as a cheap alternative to OpenTelemetry tracing with its trace / span IDs. + let call_span = + tracing::debug_span!("rpc_call", correlation_id = generate_correlation_id()); Self { inner, call, @@ -264,11 +267,11 @@ where } } -fn generate_trace_id() -> u64 { +fn generate_correlation_id() -> u64 { thread_local! { - static TRACE_ID_RNG: RefCell = RefCell::new(SmallRng::from_entropy()); + static CORRELATION_ID_RNG: RefCell = RefCell::new(SmallRng::from_entropy()); } - TRACE_ID_RNG.with(|rng| rng.borrow_mut().next_u64()) + CORRELATION_ID_RNG.with(|rng| rng.borrow_mut().next_u64()) } #[cfg(test)] From 5752afc4f1d973a6f2e5ce3d05d6b92f876653e5 Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Tue, 16 Apr 2024 11:21:48 +0300 Subject: [PATCH 07/10] Fix spelling --- .../src/api_server/web3/backend_jsonrpsee/middleware.rs | 2 +- core/lib/zksync_core/src/api_server/web3/metrics.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/core/lib/zksync_core/src/api_server/web3/backend_jsonrpsee/middleware.rs b/core/lib/zksync_core/src/api_server/web3/backend_jsonrpsee/middleware.rs index 3cdf84862ecd..e91601fc8018 100644 --- a/core/lib/zksync_core/src/api_server/web3/backend_jsonrpsee/middleware.rs +++ b/core/lib/zksync_core/src/api_server/web3/backend_jsonrpsee/middleware.rs @@ -166,7 +166,7 @@ pin_project! { impl<'a, F> WithMethodCall<'a, F> { fn new(inner: F, call: MethodCall<'a>) -> Self { // Wrap a call into a span with unique correlation ID, so that events occurring in the span can be easily filtered. - // This works as a cheap alternative to OpenTelemetry tracing with its trace / span IDs. + // This works as a cheap alternative to Open Telemetry tracing with its trace / span IDs. let call_span = tracing::debug_span!("rpc_call", correlation_id = generate_correlation_id()); Self { diff --git a/core/lib/zksync_core/src/api_server/web3/metrics.rs b/core/lib/zksync_core/src/api_server/web3/metrics.rs index 9f9960131e07..6711f869e1a4 100644 --- a/core/lib/zksync_core/src/api_server/web3/metrics.rs +++ b/core/lib/zksync_core/src/api_server/web3/metrics.rs @@ -23,7 +23,7 @@ use super::{ /// /// Current implementation uses thread-local vars in order to not rely on mutexes or other cross-thread primitives. /// I.e., it only really works if the number of threads accessing it is limited (which is the case for the API server; -/// the number of worker threads is congruent to the number of CPUs). +/// the number of worker threads is congruent to the CPU count). #[derive(Debug)] struct ReportFilter { interval: Duration, @@ -561,7 +561,7 @@ mod tests { let raw_params = [zksync_types::Bytes(vec![0xff; 512])]; let raw_params = serde_json::value::to_raw_value(&raw_params).unwrap(); - assert_eq!(raw_params.get().len(), 1_030); // 1_024 'f' chars + '0x' + '[]' + '""' + assert_eq!(raw_params.get().len(), 1_030); // 1024 'f' chars + '0x' + '[]' + '""' let rpc_params = ObservedRpcParams::new(Some(&Cow::Borrowed(&raw_params))); assert_matches!(rpc_params, ObservedRpcParams::Borrowed(_)); let rpc_params_str = rpc_params.to_string(); From c486463a349a62b42c34bd110f9ca39f9e7537d6 Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Fri, 19 Apr 2024 13:13:48 +0300 Subject: [PATCH 08/10] Make extended tracing optional --- .../web3/backend_jsonrpsee/middleware.rs | 113 ++++++++++++------ .../api_server/web3/backend_jsonrpsee/mod.rs | 4 +- .../src/api_server/web3/metrics.rs | 28 +++-- .../zksync_core/src/api_server/web3/mod.rs | 28 ++++- 4 files changed, 120 insertions(+), 53 deletions(-) diff --git a/core/lib/zksync_core/src/api_server/web3/backend_jsonrpsee/middleware.rs b/core/lib/zksync_core/src/api_server/web3/backend_jsonrpsee/middleware.rs index e91601fc8018..3fa0efa84624 100644 --- a/core/lib/zksync_core/src/api_server/web3/backend_jsonrpsee/middleware.rs +++ b/core/lib/zksync_core/src/api_server/web3/backend_jsonrpsee/middleware.rs @@ -19,6 +19,7 @@ use once_cell::sync::OnceCell; use pin_project_lite::pin_project; use rand::{rngs::SmallRng, RngCore, SeedableRng}; use tokio::sync::watch; +use tracing::instrument::{Instrument, Instrumented}; use vise::{ Buckets, Counter, EncodeLabelSet, EncodeLabelValue, Family, GaugeGuard, Histogram, Metrics, }; @@ -112,27 +113,13 @@ where /// /// As an example, a method handler can set the requested block ID, which would then be used in relevant metric labels. #[derive(Debug)] -pub(crate) struct MetadataMiddleware { +pub(crate) struct MetadataMiddleware { inner: S, registered_method_names: Arc>, method_tracer: Arc, } -impl MetadataMiddleware { - pub fn new( - inner: S, - registered_method_names: Arc>, - method_tracer: Arc, - ) -> Self { - Self { - inner, - registered_method_names, - method_tracer, - } - } -} - -impl<'a, S> RpcServiceT<'a> for MetadataMiddleware +impl<'a, S, const TRACE_PARAMS: bool> RpcServiceT<'a> for MetadataMiddleware where S: Send + Sync + RpcServiceT<'a>, { @@ -147,7 +134,11 @@ where .copied() .unwrap_or(""); - let observed_params = ObservedRpcParams::new(request.params.as_ref()); + let observed_params = if TRACE_PARAMS { + ObservedRpcParams::new(request.params.as_ref()) + } else { + ObservedRpcParams::Unknown + }; let call = self.method_tracer.new_call(method_name, observed_params); WithMethodCall::new(self.inner.call(request), call) } @@ -159,21 +150,12 @@ pin_project! { #[pin] inner: F, call: MethodCall<'a>, - call_span: tracing::Span, } } impl<'a, F> WithMethodCall<'a, F> { fn new(inner: F, call: MethodCall<'a>) -> Self { - // Wrap a call into a span with unique correlation ID, so that events occurring in the span can be easily filtered. - // This works as a cheap alternative to Open Telemetry tracing with its trace / span IDs. - let call_span = - tracing::debug_span!("rpc_call", correlation_id = generate_correlation_id()); - Self { - inner, - call, - call_span, - } + Self { inner, call } } } @@ -182,7 +164,6 @@ impl> Future for WithMethodCall<'_, F> { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let projection = self.project(); - let _span_guard = projection.call_span.enter(); let guard = projection.call.set_as_current(); match projection.inner.poll(cx) { Poll::Pending => Poll::Pending, @@ -195,6 +176,75 @@ impl> Future for WithMethodCall<'_, F> { } } +#[derive(Debug, Clone)] +pub(crate) struct MetadataLayer { + registered_method_names: Arc>, + method_tracer: Arc, +} + +impl MetadataLayer { + pub fn new( + registered_method_names: Arc>, + method_tracer: Arc, + ) -> Self { + Self { + registered_method_names, + method_tracer, + } + } + + pub fn with_param_tracing(self) -> MetadataLayer { + MetadataLayer { + registered_method_names: self.registered_method_names, + method_tracer: self.method_tracer, + } + } +} + +impl tower::Layer for MetadataLayer { + type Service = MetadataMiddleware; + + fn layer(&self, inner: Svc) -> Self::Service { + MetadataMiddleware { + inner, + registered_method_names: self.registered_method_names.clone(), + method_tracer: self.method_tracer.clone(), + } + } +} + +/// Middleware that adds tracing spans to each RPC call, so that logs belonging to the same call +/// can be easily filtered. +#[derive(Debug)] +pub(crate) struct CorrelationMiddleware { + inner: S, +} + +impl CorrelationMiddleware { + pub fn new(inner: S) -> Self { + Self { inner } + } +} + +impl<'a, S> RpcServiceT<'a> for CorrelationMiddleware +where + S: RpcServiceT<'a>, +{ + type Future = Instrumented; + + fn call(&self, request: Request<'a>) -> Self::Future { + thread_local! { + static CORRELATION_ID_RNG: RefCell = RefCell::new(SmallRng::from_entropy()); + } + + // Wrap a call into a span with unique correlation ID, so that events occurring in the span can be easily filtered. + // This works as a cheap alternative to Open Telemetry tracing with its trace / span IDs. + let correlation_id = CORRELATION_ID_RNG.with(|rng| rng.borrow_mut().next_u64()); + let call_span = tracing::debug_span!("rpc_call", correlation_id); + self.inner.call(request).instrument(call_span) + } +} + /// Tracks the timestamp of the last call to the RPC. Used during server shutdown to start dropping new traffic /// only after this is coordinated by the external load balancer. #[derive(Debug, Clone, Default)] @@ -267,13 +317,6 @@ where } } -fn generate_correlation_id() -> u64 { - thread_local! { - static CORRELATION_ID_RNG: RefCell = RefCell::new(SmallRng::from_entropy()); - } - CORRELATION_ID_RNG.with(|rng| rng.borrow_mut().next_u64()) -} - #[cfg(test)] mod tests { use std::time::Duration; diff --git a/core/lib/zksync_core/src/api_server/web3/backend_jsonrpsee/mod.rs b/core/lib/zksync_core/src/api_server/web3/backend_jsonrpsee/mod.rs index ec9228b15210..060ee68d3bb7 100644 --- a/core/lib/zksync_core/src/api_server/web3/backend_jsonrpsee/mod.rs +++ b/core/lib/zksync_core/src/api_server/web3/backend_jsonrpsee/mod.rs @@ -9,7 +9,9 @@ use zksync_web3_decl::{ pub(crate) use self::{ metadata::{MethodMetadata, MethodTracer}, - middleware::{LimitMiddleware, MetadataMiddleware, ShutdownMiddleware, TrafficTracker}, + middleware::{ + CorrelationMiddleware, LimitMiddleware, MetadataLayer, ShutdownMiddleware, TrafficTracker, + }, }; use crate::api_server::tx_sender::SubmitTxError; diff --git a/core/lib/zksync_core/src/api_server/web3/metrics.rs b/core/lib/zksync_core/src/api_server/web3/metrics.rs index 6711f869e1a4..2b26d8018e0f 100644 --- a/core/lib/zksync_core/src/api_server/web3/metrics.rs +++ b/core/lib/zksync_core/src/api_server/web3/metrics.rs @@ -61,14 +61,21 @@ macro_rules! report_filter { #[derive(Debug)] pub(super) enum ObservedRpcParams<'a> { None, + Unknown, Borrowed(&'a serde_json::value::RawValue), Owned { start: Box, total_len: usize }, } impl fmt::Display for ObservedRpcParams<'_> { fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result { + if matches!(self, Self::Unknown) { + return Ok(()); + } + formatter.write_str(" with params ")?; + let (start, total_len) = match self { Self::None => return formatter.write_str("[]"), + Self::Unknown => unreachable!(), Self::Borrowed(params) => (Self::maybe_shorten(params), params.get().len()), Self::Owned { start, total_len } => (start.as_ref(), *total_len), }; @@ -345,10 +352,7 @@ impl ApiMetrics { self.web3_call_block_diff[&meta.name].observe(block_diff.into()); } if latency >= MIN_REPORTED_LATENCY && FILTER.should_report() { - tracing::info!( - "Long call to `{}` with params {raw_params}: {latency:?}", - meta.name - ); + tracing::info!("Long call to `{}`{raw_params}: {latency:?}", meta.name); } } @@ -364,7 +368,7 @@ impl ApiMetrics { self.web3_dropped_call_latency[&MethodLabels::from(meta)].observe(latency); if FILTER.should_report() { tracing::info!( - "Call to `{}` with params {raw_params} was dropped by client after {latency:?}", + "Call to `{}`{raw_params} was dropped by client after {latency:?}", meta.name ); } @@ -383,7 +387,7 @@ impl ApiMetrics { self.web3_call_response_size[&method].observe(size); if size >= MIN_REPORTED_SIZE && FILTER.should_report() { tracing::info!( - "Call to `{method}` with params {raw_params} has resulted in large response: {size}B" + "Call to `{method}`{raw_params} has resulted in large response: {size}B" ); } } @@ -413,7 +417,7 @@ impl ApiMetrics { origin, } = &labels; tracing::info!( - "Observed error code {error_code} (origin: {origin:?}) for method `{method}`, with params {raw_params}" + "Observed error code {error_code} (origin: {origin:?}) for method `{method}`{raw_params}" ); } } @@ -548,16 +552,16 @@ mod tests { fn observing_rpc_params() { let rpc_params = ObservedRpcParams::new(None); assert_matches!(rpc_params, ObservedRpcParams::None); - assert_eq!(rpc_params.to_string(), "[]"); + assert_eq!(rpc_params.to_string(), " with params []"); let raw_params = RawValue::from_string(r#"["0x1"]"#.into()).unwrap(); let rpc_params = ObservedRpcParams::new(Some(&Cow::Borrowed(&raw_params))); assert_matches!(rpc_params, ObservedRpcParams::Borrowed(_)); - assert_eq!(rpc_params.to_string(), r#"["0x1"]"#); + assert_eq!(rpc_params.to_string(), r#" with params ["0x1"]"#); let rpc_params = ObservedRpcParams::new(Some(&Cow::Owned(raw_params))); assert_matches!(rpc_params, ObservedRpcParams::Owned { .. }); - assert_eq!(rpc_params.to_string(), r#"["0x1"]"#); + assert_eq!(rpc_params.to_string(), r#" with params ["0x1"]"#); let raw_params = [zksync_types::Bytes(vec![0xff; 512])]; let raw_params = serde_json::value::to_raw_value(&raw_params).unwrap(); @@ -566,7 +570,7 @@ mod tests { assert_matches!(rpc_params, ObservedRpcParams::Borrowed(_)); let rpc_params_str = rpc_params.to_string(); assert!( - rpc_params_str.starts_with(r#"["0xffff"#), + rpc_params_str.starts_with(r#" with params ["0xffff"#), "{rpc_params_str}" ); assert!( @@ -578,7 +582,7 @@ mod tests { assert_matches!(rpc_params, ObservedRpcParams::Owned { .. }); let rpc_params_str = rpc_params.to_string(); assert!( - rpc_params_str.starts_with(r#"["0xffff"#), + rpc_params_str.starts_with(r#" with params ["0xffff"#), "{rpc_params_str}" ); assert!( diff --git a/core/lib/zksync_core/src/api_server/web3/mod.rs b/core/lib/zksync_core/src/api_server/web3/mod.rs index b02e23658d45..39de996d6e92 100644 --- a/core/lib/zksync_core/src/api_server/web3/mod.rs +++ b/core/lib/zksync_core/src/api_server/web3/mod.rs @@ -14,7 +14,9 @@ use zksync_health_check::{HealthStatus, HealthUpdater, ReactiveHealthCheck}; use zksync_types::L2BlockNumber; use zksync_web3_decl::{ jsonrpsee::{ - server::{BatchRequestConfig, RpcServiceBuilder, ServerBuilder}, + server::{ + middleware::rpc::either::Either, BatchRequestConfig, RpcServiceBuilder, ServerBuilder, + }, RpcModule, }, namespaces::{ @@ -26,7 +28,8 @@ use zksync_web3_decl::{ use self::{ backend_jsonrpsee::{ - LimitMiddleware, MetadataMiddleware, MethodTracer, ShutdownMiddleware, TrafficTracker, + CorrelationMiddleware, LimitMiddleware, MetadataLayer, MethodTracer, ShutdownMiddleware, + TrafficTracker, }, mempool_cache::MempoolCache, metrics::API_METRICS, @@ -129,6 +132,7 @@ struct OptionalApiParams { websocket_requests_per_minute_limit: Option, tree_api: Option>, mempool_cache: Option, + extended_tracing: bool, pub_sub_events_sender: Option>, } @@ -265,6 +269,11 @@ impl ApiBuilder { self } + pub fn with_extended_tracing(mut self, extended_tracing: bool) -> Self { + self.optional.extended_tracing = extended_tracing; + self + } + #[cfg(test)] fn with_pub_sub_events(mut self, sender: mpsc::UnboundedSender) -> Self { self.optional.pub_sub_events_sender = Some(sender); @@ -549,6 +558,7 @@ impl ApiServer { let vm_barrier = self.optional.vm_barrier.clone(); let health_updater = self.health_updater.clone(); let method_tracer = self.method_tracer.clone(); + let extended_tracing = self.optional.extended_tracing; let rpc = self .build_rpc_module(pub_sub, last_sealed_miniblock) @@ -587,15 +597,23 @@ impl ApiServer { .flatten() .unwrap_or(5_000); + let metadata_layer = MetadataLayer::new(registered_method_names, method_tracer); + let metadata_layer = if extended_tracing { + Either::Left(metadata_layer.with_param_tracing()) + } else { + Either::Right(metadata_layer) + }; let traffic_tracker = TrafficTracker::default(); let traffic_tracker_for_middleware = traffic_tracker.clone(); + let rpc_middleware = RpcServiceBuilder::new() .layer_fn(move |svc| { ShutdownMiddleware::new(svc, traffic_tracker_for_middleware.clone()) }) - .layer_fn(move |svc| { - MetadataMiddleware::new(svc, registered_method_names.clone(), method_tracer.clone()) - }) + .option_layer( + extended_tracing.then(|| tower::layer::layer_fn(CorrelationMiddleware::new)), + ) + .layer(metadata_layer) .option_layer((!is_http).then(|| { tower::layer::layer_fn(move |svc| { LimitMiddleware::new(svc, websocket_requests_per_minute_limit) From c1b03c657375652eed75c2dfe961296bd81d609a Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Fri, 19 Apr 2024 13:29:34 +0300 Subject: [PATCH 09/10] Switch on extended tracing on ENs --- core/bin/external_node/src/config/mod.rs | 8 ++++++++ core/bin/external_node/src/main.rs | 2 ++ core/lib/zksync_core/src/api_server/web3/mod.rs | 4 ++++ 3 files changed, 14 insertions(+) diff --git a/core/bin/external_node/src/config/mod.rs b/core/bin/external_node/src/config/mod.rs index 006047b511b3..b4157895063c 100644 --- a/core/bin/external_node/src/config/mod.rs +++ b/core/bin/external_node/src/config/mod.rs @@ -226,6 +226,10 @@ pub(crate) struct OptionalENConfig { /// Maximum number of transactions to be stored in the mempool cache. Default is 10000. #[serde(default = "OptionalENConfig::default_mempool_cache_size")] pub mempool_cache_size: usize, + /// Enables extended tracing of RPC calls. This may negatively impact performance for nodes under high load + /// (hundreds or thousands RPS). + #[serde(default = "OptionalENConfig::default_extended_api_tracing")] + pub extended_rpc_tracing: bool, // Health checks /// Time limit in milliseconds to mark a health check as slow and log the corresponding warning. @@ -459,6 +463,10 @@ impl OptionalENConfig { 10_000 } + const fn default_extended_api_tracing() -> bool { + true + } + fn default_main_node_rate_limit_rps() -> NonZeroUsize { NonZeroUsize::new(100).unwrap() } diff --git a/core/bin/external_node/src/main.rs b/core/bin/external_node/src/main.rs index 90151489d14c..1c3bafe4834f 100644 --- a/core/bin/external_node/src/main.rs +++ b/core/bin/external_node/src/main.rs @@ -521,6 +521,7 @@ async fn run_api( .with_vm_barrier(vm_barrier.clone()) .with_sync_state(sync_state.clone()) .with_mempool_cache(mempool_cache.clone()) + .with_extended_tracing(config.optional.extended_rpc_tracing) .enable_api_namespaces(config.optional.api_namespaces()); if let Some(tree_reader) = &tree_reader { builder = builder.with_tree_api(tree_reader.clone()); @@ -549,6 +550,7 @@ async fn run_api( .with_vm_barrier(vm_barrier) .with_sync_state(sync_state) .with_mempool_cache(mempool_cache) + .with_extended_tracing(config.optional.extended_rpc_tracing) .enable_api_namespaces(config.optional.api_namespaces()); if let Some(tree_reader) = tree_reader { builder = builder.with_tree_api(tree_reader); diff --git a/core/lib/zksync_core/src/api_server/web3/mod.rs b/core/lib/zksync_core/src/api_server/web3/mod.rs index 39de996d6e92..48bb60d88ac1 100644 --- a/core/lib/zksync_core/src/api_server/web3/mod.rs +++ b/core/lib/zksync_core/src/api_server/web3/mod.rs @@ -558,7 +558,11 @@ impl ApiServer { let vm_barrier = self.optional.vm_barrier.clone(); let health_updater = self.health_updater.clone(); let method_tracer = self.method_tracer.clone(); + let extended_tracing = self.optional.extended_tracing; + if extended_tracing { + tracing::info!("Enabled extended call tracing for {transport_str} API server; this might negatively affect performance"); + } let rpc = self .build_rpc_module(pub_sub, last_sealed_miniblock) From 28ffd10ec7ef8b9e91e3ad8844f902606528bcd8 Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Mon, 22 Apr 2024 19:40:05 +0300 Subject: [PATCH 10/10] Document new middleware --- .../api_server/web3/backend_jsonrpsee/middleware.rs | 12 ++++++++++++ core/lib/zksync_core/src/api_server/web3/mod.rs | 4 ++++ 2 files changed, 16 insertions(+) diff --git a/core/lib/zksync_core/src/api_server/web3/backend_jsonrpsee/middleware.rs b/core/lib/zksync_core/src/api_server/web3/backend_jsonrpsee/middleware.rs index 3fa0efa84624..6597c8eb9c25 100644 --- a/core/lib/zksync_core/src/api_server/web3/backend_jsonrpsee/middleware.rs +++ b/core/lib/zksync_core/src/api_server/web3/backend_jsonrpsee/middleware.rs @@ -112,6 +112,11 @@ where /// as metrics. /// /// As an example, a method handler can set the requested block ID, which would then be used in relevant metric labels. +/// +/// # Implementation notes +/// +/// We express `TRACE_PARAMS` as a const param rather than a field so that the Rust compiler has more room for optimizations in case tracing +/// is switched off. #[derive(Debug)] pub(crate) struct MetadataMiddleware { inner: S, @@ -176,6 +181,13 @@ impl> Future for WithMethodCall<'_, F> { } } +/// [`tower`] middleware layer that wraps services into [`MetadataMiddleware`]. Implemented as a named type +/// to simplify call sites. +/// +/// # Implementation notes +/// +/// We express `TRACE_PARAMS` as a const param rather than a field so that the Rust compiler has more room for optimizations in case tracing +/// is switched off. #[derive(Debug, Clone)] pub(crate) struct MetadataLayer { registered_method_names: Arc>, diff --git a/core/lib/zksync_core/src/api_server/web3/mod.rs b/core/lib/zksync_core/src/api_server/web3/mod.rs index f3094ec36d51..7a3a46f6d2f4 100644 --- a/core/lib/zksync_core/src/api_server/web3/mod.rs +++ b/core/lib/zksync_core/src/api_server/web3/mod.rs @@ -669,14 +669,18 @@ impl ApiServer { let traffic_tracker = TrafficTracker::default(); let traffic_tracker_for_middleware = traffic_tracker.clone(); + // **Important.** The ordering of layers matters! Layers added first will receive the request earlier + // (i.e., are outermost in the call chain). let rpc_middleware = RpcServiceBuilder::new() .layer_fn(move |svc| { ShutdownMiddleware::new(svc, traffic_tracker_for_middleware.clone()) }) + // We want to output method logs with a correlation ID; hence, `CorrelationMiddleware` must precede `metadata_layer`. .option_layer( extended_tracing.then(|| tower::layer::layer_fn(CorrelationMiddleware::new)), ) .layer(metadata_layer) + // We want to capture limit middleware errors with `metadata_layer`; hence, `LimitMiddleware` is placed after it. .option_layer((!is_http).then(|| { tower::layer::layer_fn(move |svc| { LimitMiddleware::new(svc, websocket_requests_per_minute_limit)