Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rpc: re-use server builder per rpc interface #6652

Merged
merged 6 commits into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions prdoc/pr_6652.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0
# See doc at https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/prdoc/schema_user.json

title: "rpc server: re-use server builder per rpc interface"

doc:
- audience: Node Dev
description: |
This changes that the RPC server builder is re-used for each RPC interface which is more efficient than to build it for every connection.

crates:
- name: sc-rpc-server
bump: patch
101 changes: 50 additions & 51 deletions substrate/client/rpc-servers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,11 +144,56 @@ where
local_addrs.push(local_addr);
let cfg = cfg.clone();

let id_provider2 = id_provider.clone();
let RpcSettings {
batch_config,
max_connections,
max_payload_in_mb,
max_payload_out_mb,
max_buffer_capacity_per_connection,
max_subscriptions_per_connection,
rpc_methods,
rate_limit_trust_proxy_headers,
rate_limit_whitelisted_ips,
host_filter,
cors,
rate_limit,
} = listener.rpc_settings();

let http_middleware = tower::ServiceBuilder::new()
.option_layer(host_filter)
// Proxy `GET /health, /health/readiness` requests to the internal
// `system_health` method.
.layer(NodeHealthProxyLayer::default())
.layer(cors);

let mut builder = jsonrpsee::server::Server::builder()
.max_request_body_size(max_payload_in_mb.saturating_mul(MEGABYTE))
.max_response_body_size(max_payload_out_mb.saturating_mul(MEGABYTE))
.max_connections(max_connections)
.max_subscriptions_per_connection(max_subscriptions_per_connection)
.enable_ws_ping(
PingConfig::new()
.ping_interval(Duration::from_secs(30))
.inactive_limit(Duration::from_secs(60))
.max_failures(3),
)
.set_http_middleware(http_middleware)
.set_message_buffer_capacity(max_buffer_capacity_per_connection)
.set_batch_request_config(batch_config)
.custom_tokio_runtime(cfg.tokio_handle.clone());

if let Some(provider) = id_provider.clone() {
builder = builder.set_id_provider(provider);
} else {
builder = builder.set_id_provider(RandomStringIdProvider::new(16));
};

let service_builder = builder.to_service_builder();
let deny_unsafe = deny_unsafe(&local_addr, &rpc_methods);

tokio_handle.spawn(async move {
loop {
let (sock, remote_addr, rpc_cfg) = tokio::select! {
let (sock, remote_addr) = tokio::select! {
res = listener.accept() => {
match res {
Ok(s) => s,
Expand All @@ -161,56 +206,10 @@ where
_ = cfg.stop_handle.clone().shutdown() => break,
};

let RpcSettings {
batch_config,
max_connections,
max_payload_in_mb,
max_payload_out_mb,
max_buffer_capacity_per_connection,
max_subscriptions_per_connection,
rpc_methods,
rate_limit_trust_proxy_headers,
rate_limit_whitelisted_ips,
host_filter,
cors,
rate_limit,
} = rpc_cfg;

let http_middleware = tower::ServiceBuilder::new()
.option_layer(host_filter)
// Proxy `GET /health, /health/readiness` requests to the internal
// `system_health` method.
.layer(NodeHealthProxyLayer::default())
.layer(cors);

let mut builder = jsonrpsee::server::Server::builder()
.max_request_body_size(max_payload_in_mb.saturating_mul(MEGABYTE))
.max_response_body_size(max_payload_out_mb.saturating_mul(MEGABYTE))
.max_connections(max_connections)
.max_subscriptions_per_connection(max_subscriptions_per_connection)
.enable_ws_ping(
PingConfig::new()
.ping_interval(Duration::from_secs(30))
.inactive_limit(Duration::from_secs(60))
.max_failures(3),
)
.set_http_middleware(http_middleware)
.set_message_buffer_capacity(max_buffer_capacity_per_connection)
.set_batch_request_config(batch_config)
.custom_tokio_runtime(cfg.tokio_handle.clone());

if let Some(provider) = id_provider2.clone() {
builder = builder.set_id_provider(provider);
} else {
builder = builder.set_id_provider(RandomStringIdProvider::new(16));
};

let service_builder = builder.to_service_builder();
let deny_unsafe = deny_unsafe(&local_addr, &rpc_methods);

let ip = remote_addr.ip();
let cfg2 = cfg.clone();
let service_builder2 = service_builder.clone();
let rate_limit_whitelisted_ips2 = rate_limit_whitelisted_ips.clone();

let svc =
tower::service_fn(move |mut req: http::Request<hyper::body::Incoming>| {
Expand All @@ -223,14 +222,14 @@ where
let proxy_ip =
if rate_limit_trust_proxy_headers { get_proxy_ip(&req) } else { None };

let rate_limit_cfg = if rate_limit_whitelisted_ips
let rate_limit_cfg = if rate_limit_whitelisted_ips2
.iter()
.any(|ips| ips.contains(proxy_ip.unwrap_or(ip)))
{
log::debug!(target: "rpc", "ip={ip}, proxy_ip={:?} is trusted, disabling rate-limit", proxy_ip);
None
} else {
if !rate_limit_whitelisted_ips.is_empty() {
if !rate_limit_whitelisted_ips2.is_empty() {
log::debug!(target: "rpc", "ip={ip}, proxy_ip={:?} is not trusted, rate-limit enabled", proxy_ip);
}
rate_limit
Expand Down
10 changes: 6 additions & 4 deletions substrate/client/rpc-servers/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,17 +176,19 @@ pub(crate) struct Listener {

impl Listener {
/// Accepts a new connection.
pub(crate) async fn accept(
&mut self,
) -> std::io::Result<(tokio::net::TcpStream, SocketAddr, RpcSettings)> {
pub(crate) async fn accept(&mut self) -> std::io::Result<(tokio::net::TcpStream, SocketAddr)> {
let (sock, remote_addr) = self.listener.accept().await?;
Ok((sock, remote_addr, self.cfg.clone()))
Ok((sock, remote_addr))
}

/// Returns the local address the listener is bound to.
pub fn local_addr(&self) -> SocketAddr {
self.local_addr
}

pub fn rpc_settings(&self) -> RpcSettings {
self.cfg.clone()
}
}

pub(crate) fn host_filtering(enabled: bool, addr: SocketAddr) -> Option<HostFilterLayer> {
Expand Down
Loading