Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(api): Track params for RPC methods #1673

Merged
merged 13 commits into from
Apr 24, 2024
8 changes: 8 additions & 0 deletions core/bin/external_node/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,10 @@ pub(crate) struct OptionalENConfig {
/// Maximum number of transactions to be stored in the mempool cache. Default is 10000.
#[serde(default = "OptionalENConfig::default_mempool_cache_size")]
pub mempool_cache_size: usize,
/// Enables extended tracing of RPC calls. This may negatively impact performance for nodes under high load
/// (hundreds or thousands RPS).
#[serde(default = "OptionalENConfig::default_extended_api_tracing")]
pub extended_rpc_tracing: bool,

// Health checks
/// Time limit in milliseconds to mark a health check as slow and log the corresponding warning.
Expand Down Expand Up @@ -456,6 +460,10 @@ impl OptionalENConfig {
10_000
}

const fn default_extended_api_tracing() -> bool {
true
}

fn default_main_node_rate_limit_rps() -> NonZeroUsize {
NonZeroUsize::new(100).unwrap()
}
Expand Down
2 changes: 2 additions & 0 deletions core/bin/external_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,7 @@ async fn run_api(
.with_vm_barrier(vm_barrier.clone())
.with_sync_state(sync_state.clone())
.with_mempool_cache(mempool_cache.clone())
.with_extended_tracing(config.optional.extended_rpc_tracing)
.enable_api_namespaces(config.optional.api_namespaces());
if let Some(tree_reader) = &tree_reader {
builder = builder.with_tree_api(tree_reader.clone());
Expand Down Expand Up @@ -546,6 +547,7 @@ async fn run_api(
.with_vm_barrier(vm_barrier)
.with_sync_state(sync_state)
.with_mempool_cache(mempool_cache)
.with_extended_tracing(config.optional.extended_rpc_tracing)
.enable_api_namespaces(config.optional.api_namespaces());
if let Some(tree_reader) = tree_reader {
builder = builder.with_tree_api(tree_reader);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use zksync_web3_decl::{

#[cfg(test)]
use super::testonly::RecordedMethodCalls;
use crate::api_server::web3::metrics::API_METRICS;
use crate::api_server::web3::metrics::{ObservedRpcParams, API_METRICS};

/// Metadata assigned to a JSON-RPC method call.
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -88,9 +88,14 @@ impl MethodTracer {
}
}

pub(super) fn new_call(self: &Arc<Self>, name: &'static str) -> MethodCall {
pub(super) fn new_call<'a>(
self: &Arc<Self>,
name: &'static str,
raw_params: ObservedRpcParams<'a>,
) -> MethodCall<'a> {
MethodCall {
tracer: self.clone(),
params: raw_params,
meta: MethodMetadata::new(name),
is_completed: false,
}
Expand Down Expand Up @@ -118,21 +123,22 @@ impl MethodTracer {
}

#[derive(Debug)]
pub(super) struct MethodCall {
pub(super) struct MethodCall<'a> {
tracer: Arc<MethodTracer>,
meta: MethodMetadata,
params: ObservedRpcParams<'a>,
is_completed: bool,
}

impl Drop for MethodCall {
impl Drop for MethodCall<'_> {
fn drop(&mut self) {
if !self.is_completed {
API_METRICS.observe_dropped_call(&self.meta);
API_METRICS.observe_dropped_call(&self.meta, &self.params);
}
}
}

impl MethodCall {
impl MethodCall<'_> {
pub(super) fn set_as_current(&mut self) -> CurrentMethodGuard<'_> {
let meta = &mut self.meta;
let cell = self.tracer.inner.get_or_default();
Expand All @@ -147,15 +153,21 @@ impl MethodCall {
pub(super) fn observe_response(&mut self, response: &MethodResponse) {
self.is_completed = true;
let meta = &self.meta;
let params = &self.params;
match response.success_or_error {
MethodResponseResult::Success => {
API_METRICS.observe_response_size(meta.name, response.result.len());
API_METRICS.observe_response_size(meta.name, params, response.result.len());
}
MethodResponseResult::Failed(error_code) => {
API_METRICS.observe_protocol_error(meta.name, error_code, meta.has_app_error);
API_METRICS.observe_protocol_error(
meta.name,
params,
error_code,
meta.has_app_error,
);
}
}
API_METRICS.observe_latency(meta);
API_METRICS.observe_latency(meta, params);
#[cfg(test)]
self.tracer.recorder.observe_response(meta, response);
}
Expand Down
135 changes: 107 additions & 28 deletions core/lib/zksync_core/src/api_server/web3/backend_jsonrpsee/middleware.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::{
cell::RefCell,
collections::HashSet,
future::Future,
num::NonZeroU32,
Expand All @@ -16,7 +17,9 @@ use governor::{
};
use once_cell::sync::OnceCell;
use pin_project_lite::pin_project;
use rand::{rngs::SmallRng, RngCore, SeedableRng};
use tokio::sync::watch;
use tracing::instrument::{Instrument, Instrumented};
use vise::{
Buckets, Counter, EncodeLabelSet, EncodeLabelValue, Family, GaugeGuard, Histogram, Metrics,
};
Expand All @@ -27,7 +30,7 @@ use zksync_web3_decl::jsonrpsee::{
};

use super::metadata::{MethodCall, MethodTracer};
use crate::api_server::web3::metrics::API_METRICS;
use crate::api_server::web3::metrics::{ObservedRpcParams, API_METRICS};

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EncodeLabelValue, EncodeLabelSet)]
#[metrics(label = "transport", rename_all = "snake_case")]
Expand Down Expand Up @@ -109,32 +112,23 @@ where
/// as metrics.
///
/// As an example, a method handler can set the requested block ID, which would then be used in relevant metric labels.
///
/// # Implementation notes
///
/// We express `TRACE_PARAMS` as a const param rather than a field so that the Rust compiler has more room for optimizations in case tracing
/// is switched off.
#[derive(Debug)]
pub(crate) struct MetadataMiddleware<S> {
pub(crate) struct MetadataMiddleware<S, const TRACE_PARAMS: bool> {
inner: S,
registered_method_names: Arc<HashSet<&'static str>>,
method_tracer: Arc<MethodTracer>,
}

impl<S> MetadataMiddleware<S> {
pub fn new(
inner: S,
registered_method_names: Arc<HashSet<&'static str>>,
method_tracer: Arc<MethodTracer>,
) -> Self {
Self {
inner,
registered_method_names,
method_tracer,
}
}
}

impl<'a, S> RpcServiceT<'a> for MetadataMiddleware<S>
impl<'a, S, const TRACE_PARAMS: bool> RpcServiceT<'a> for MetadataMiddleware<S, TRACE_PARAMS>
where
S: Send + Sync + RpcServiceT<'a>,
{
type Future = WithMethodCall<S::Future>;
type Future = WithMethodCall<'a, S::Future>;

fn call(&self, request: Request<'a>) -> Self::Future {
// "Normalize" the method name by searching it in the set of all registered methods. This extends the lifetime
Expand All @@ -145,23 +139,32 @@ where
.copied()
.unwrap_or("");

WithMethodCall {
call: self.method_tracer.new_call(method_name),
inner: self.inner.call(request),
}
let observed_params = if TRACE_PARAMS {
ObservedRpcParams::new(request.params.as_ref())
} else {
ObservedRpcParams::Unknown
};
let call = self.method_tracer.new_call(method_name, observed_params);
WithMethodCall::new(self.inner.call(request), call)
}
}

pin_project! {
#[derive(Debug)]
pub(crate) struct WithMethodCall<F> {
call: MethodCall,
pub(crate) struct WithMethodCall<'a, F> {
#[pin]
inner: F,
call: MethodCall<'a>,
}
}

impl<F: Future<Output = MethodResponse>> Future for WithMethodCall<F> {
impl<'a, F> WithMethodCall<'a, F> {
fn new(inner: F, call: MethodCall<'a>) -> Self {
Self { inner, call }
}
}

impl<F: Future<Output = MethodResponse>> Future for WithMethodCall<'_, F> {
type Output = MethodResponse;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Expand All @@ -178,6 +181,82 @@ impl<F: Future<Output = MethodResponse>> Future for WithMethodCall<F> {
}
}

/// [`tower`] middleware layer that wraps services into [`MetadataMiddleware`]. Implemented as a named type
/// to simplify call sites.
///
/// # Implementation notes
///
/// We express `TRACE_PARAMS` as a const param rather than a field so that the Rust compiler has more room for optimizations in case tracing
/// is switched off.
#[derive(Debug, Clone)]
pub(crate) struct MetadataLayer<const TRACE_PARAMS: bool> {
registered_method_names: Arc<HashSet<&'static str>>,
method_tracer: Arc<MethodTracer>,
}

impl MetadataLayer<false> {
pub fn new(
registered_method_names: Arc<HashSet<&'static str>>,
method_tracer: Arc<MethodTracer>,
) -> Self {
Self {
registered_method_names,
method_tracer,
}
}

pub fn with_param_tracing(self) -> MetadataLayer<true> {
MetadataLayer {
registered_method_names: self.registered_method_names,
method_tracer: self.method_tracer,
}
}
}

impl<Svc, const TRACE_PARAMS: bool> tower::Layer<Svc> for MetadataLayer<TRACE_PARAMS> {
type Service = MetadataMiddleware<Svc, TRACE_PARAMS>;

fn layer(&self, inner: Svc) -> Self::Service {
MetadataMiddleware {
inner,
registered_method_names: self.registered_method_names.clone(),
method_tracer: self.method_tracer.clone(),
}
}
}

/// Middleware that adds tracing spans to each RPC call, so that logs belonging to the same call
/// can be easily filtered.
#[derive(Debug)]
pub(crate) struct CorrelationMiddleware<S> {
inner: S,
}

impl<S> CorrelationMiddleware<S> {
pub fn new(inner: S) -> Self {
Self { inner }
}
}

impl<'a, S> RpcServiceT<'a> for CorrelationMiddleware<S>
where
S: RpcServiceT<'a>,
{
type Future = Instrumented<S::Future>;

fn call(&self, request: Request<'a>) -> Self::Future {
thread_local! {
static CORRELATION_ID_RNG: RefCell<SmallRng> = RefCell::new(SmallRng::from_entropy());
}

// Wrap a call into a span with unique correlation ID, so that events occurring in the span can be easily filtered.
// This works as a cheap alternative to Open Telemetry tracing with its trace / span IDs.
let correlation_id = CORRELATION_ID_RNG.with(|rng| rng.borrow_mut().next_u64());
let call_span = tracing::debug_span!("rpc_call", correlation_id);
self.inner.call(request).instrument(call_span)
}
}

/// Tracks the timestamp of the last call to the RPC. Used during server shutdown to start dropping new traffic
/// only after this is coordinated by the external load balancer.
#[derive(Debug, Clone, Default)]
Expand Down Expand Up @@ -293,10 +372,10 @@ mod tests {
}
};

WithMethodCall {
call: method_tracer.new_call("test"),
WithMethodCall::new(
inner,
}
method_tracer.new_call("test", ObservedRpcParams::None),
)
});

if spawn_tasks {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ use zksync_web3_decl::{

pub(crate) use self::{
metadata::{MethodMetadata, MethodTracer},
middleware::{LimitMiddleware, MetadataMiddleware, ShutdownMiddleware, TrafficTracker},
middleware::{
CorrelationMiddleware, LimitMiddleware, MetadataLayer, ShutdownMiddleware, TrafficTracker,
},
};
use crate::api_server::tx_sender::SubmitTxError;

Expand Down
Loading
Loading