Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rpc: upgrade jsonrpsee v0.23 #4730

Merged
merged 32 commits into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
12c57c9
rpc: upgrade jsonrpsee v0.23
niklasad1 May 27, 2024
0aadd3d
Merge remote-tracking branch 'origin/master' into na-jsonrpsee-v0.23
niklasad1 Jun 7, 2024
15691d6
cleanup
niklasad1 Jun 7, 2024
f681f0e
make it compile
niklasad1 Jun 7, 2024
bd74645
fix test build
niklasad1 Jun 7, 2024
88a7c00
Merge remote-tracking branch 'origin/master' into na-jsonrpsee-v0.23
niklasad1 Jun 7, 2024
369db7e
jsonrpsee v0.23.1
niklasad1 Jun 10, 2024
fa8cda3
Merge remote-tracking branch 'origin/master' into na-jsonrpsee-v0.23
niklasad1 Jun 10, 2024
a2418a2
remove needless deps
niklasad1 Jun 12, 2024
489a598
Merge branch 'master' into na-jsonrpsee-v0.23
niklasad1 Jun 18, 2024
f3df0da
fix bad merge
niklasad1 Jun 19, 2024
0b0cabe
add prdoc
niklasad1 Jun 19, 2024
0237951
bridges: add `serde_json` dependency
niklasad1 Jun 19, 2024
0f90c4e
fix prdoc
niklasad1 Jun 19, 2024
3115029
fix prdoc again
niklasad1 Jun 19, 2024
d969257
Merge remote-tracking branch 'origin/master' into na-jsonrpsee-v0.23
niklasad1 Jun 20, 2024
f61b3ce
Update substrate/client/rpc-spec-v2/src/transaction/transaction_broad…
niklasad1 Jun 21, 2024
e3f02ce
Update substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs
niklasad1 Jun 21, 2024
45667bc
Update substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs
niklasad1 Jun 21, 2024
919065c
Update substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs
niklasad1 Jun 21, 2024
30dc44a
Update substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs
niklasad1 Jun 21, 2024
b83f1f9
Update substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs
niklasad1 Jun 21, 2024
fc80004
Update substrate/client/rpc-spec-v2/src/transaction/transaction_broad…
niklasad1 Jun 21, 2024
526ff76
Update substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs
niklasad1 Jun 21, 2024
cc99e40
Update substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs
niklasad1 Jun 21, 2024
2c8349a
Merge branch 'master' into na-jsonrpsee-v0.23
niklasad1 Jun 24, 2024
3002cd1
Merge remote-tracking branch 'origin/master' into na-jsonrpsee-v0.23
niklasad1 Jun 25, 2024
1ec6466
fix bad merge
niklasad1 Jun 25, 2024
2d310ff
fix taplo fmt
niklasad1 Jun 26, 2024
b63698c
move more deps to workspace
niklasad1 Jun 26, 2024
48d3d70
Merge remote-tracking branch 'origin/master' into na-jsonrpsee-v0.23
niklasad1 Jun 26, 2024
f9081de
jsonrpsee v0.23.2
niklasad1 Jun 26, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
400 changes: 303 additions & 97 deletions Cargo.lock

Large diffs are not rendered by default.

7 changes: 4 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -780,6 +780,7 @@ hkdf = { version = "0.12.0" }
hmac = { version = "0.12.1" }
honggfuzz = { version = "0.5.55" }
http = { version = "0.2.8" }
http-body-util = { version = "0.1.2" }
hyper = { version = "0.14.27", default-features = false }
hyper-rustls = { version = "0.24.0" }
impl-serde = { version = "0.4.0", default-features = false }
Expand All @@ -793,8 +794,8 @@ is_executable = { version = "1.0.1" }
isahc = { version = "1.2" }
itertools = { version = "0.11" }
jsonpath_lib = { version = "0.3" }
jsonrpsee = { version = "0.22.5" }
jsonrpsee-core = { version = "0.22" }
jsonrpsee = { version = "0.23.2" }
jsonrpsee-core = { version = "0.23.2" }
k256 = { version = "0.13.3", default-features = false }
kitchensink-runtime = { path = "substrate/bin/node/runtime" }
kvdb = { version = "0.13.0" }
Expand Down Expand Up @@ -1298,7 +1299,7 @@ tokio-util = { version = "0.7.8" }
toml = { version = "0.8.8" }
toml_edit = { version = "0.19" }
tower = { version = "0.4.13" }
tower-http = { version = "0.4.0" }
tower-http = { version = "0.5.2" }
tracing = { version = "0.1.37", default-features = false }
tracing-core = { version = "0.1.32", default-features = false }
tracing-futures = { version = "0.2.4" }
Expand Down
1 change: 1 addition & 0 deletions bridges/relays/client-substrate/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ jsonrpsee = { features = ["macros", "ws-client"], workspace = true }
log = { workspace = true }
num-traits = { workspace = true, default-features = true }
rand = { workspace = true, default-features = true }
serde_json = { workspace = true }
scale-info = { features = ["derive"], workspace = true, default-features = true }
tokio = { features = ["rt-multi-thread"], workspace = true, default-features = true }
thiserror = { workspace = true }
Expand Down
3 changes: 1 addition & 2 deletions bridges/relays/client-substrate/src/client/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use async_std::{
stream::StreamExt,
};
use futures::{FutureExt, Stream};
use jsonrpsee::core::ClientError;
use sp_runtime::DeserializeOwned;
use std::{
fmt::Debug,
Expand Down Expand Up @@ -143,7 +142,7 @@ impl<T: 'static + Clone + DeserializeOwned + Send> Subscription<T> {
/// Create new forwarded subscription.
pub fn new_forwarded(
desc: StreamDescription,
subscription: impl Stream<Item = StdResult<T, ClientError>> + Unpin + Send + 'static,
subscription: impl Stream<Item = StdResult<T, serde_json::Error>> + Unpin + Send + 'static,
) -> Self {
Self {
desc: desc.clone(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

use futures::{channel::mpsc::Sender, prelude::*, stream::FuturesUnordered};
use jsonrpsee::core::client::{
Client as JsonRpseeClient, ClientBuilder, ClientT, Error, ReceivedMessage, TransportReceiverT,
Client as JsonRpseeClient, ClientBuilder, ClientT, ReceivedMessage, TransportReceiverT,
TransportSenderT,
};
use smoldot_light::{ChainId, Client as SmoldotClient, JsonRpcResponses};
Expand Down Expand Up @@ -124,7 +124,7 @@ pub struct LightClientRpcWorker {
}

fn handle_notification(
maybe_header: Option<Result<RelayHeader, Error>>,
maybe_header: Option<Result<RelayHeader, serde_json::Error>>,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

//cc @skunert

This is slightly modified and only decoding can cause this error now.
Since this API was confusing before i.e, the subscription could be closed because it lagged (i.e, couldn't keep with server) or just closed.

We have added another API https://docs.rs/jsonrpsee-core/0.23.1/jsonrpsee_core/client/struct.Subscription.html#method.close_reason but not sure whether you care here....

senders: &mut Vec<Sender<RelayHeader>>,
) -> Result<(), ()> {
match maybe_header {
Expand Down
25 changes: 25 additions & 0 deletions prdoc/pr_4730.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0
# See doc at https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/prdoc/schema_user.json

title: rpc upgrade jsonrpsee to v0.23.1

doc:
- audience: Node Dev
description: |
Upgrade the rpc library jsonrpsee to v0.23.1 to utilize:

- Add Extensions which we now is using to get the connection id (used by the rpc spec v2)
- Update hyper to v1.0, http v1.0, soketto and related crates (hyper::service::make_service_fn is removed)
- The subscription API for the client is modified to know why a subscription was closed.

crates:
- name: sc-rpc-spec-v2
bump: patch
- name: sc-rpc
bump: patch
- name: sc-rpc-server
bump: patch
- name: cumulus-relay-chain-rpc-interface
bump: patch
- name: frame-remote-externalities
bump: patch
12 changes: 8 additions & 4 deletions substrate/client/rpc-servers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,18 @@ targets = ["x86_64-unknown-linux-gnu"]
forwarded-header-value = { workspace = true }
futures = { workspace = true }
governor = { workspace = true }
http = { workspace = true }
hyper = { workspace = true, default-features = true }
http-body-util = { workspace = true }
ip_network = { workspace = true }
jsonrpsee = { features = ["server"], workspace = true }
log = { workspace = true, default-features = true }
prometheus-endpoint = { workspace = true, default-features = true }
serde = { workspace = true }
serde_json = { workspace = true, default-features = true }
tokio = { features = ["parking_lot"], workspace = true, default-features = true }
tower = { features = ["util"], workspace = true }
tower-http = { features = ["cors"], workspace = true }
tower = { workspace = true, features = ["util"] }
tower-http = { workspace = true, features = ["cors"] }

# Dependencies outside of the polkadot-sdk workspace
# which requires hyper v1 and http v1
http = "1.1"
hyper = "1.3"
82 changes: 48 additions & 34 deletions substrate/client/rpc-servers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,13 @@
pub mod middleware;
pub mod utils;

use std::{
convert::Infallible, error::Error as StdError, net::SocketAddr, num::NonZeroU32, time::Duration,
};
use std::{error::Error as StdError, net::SocketAddr, num::NonZeroU32, sync::Arc, time::Duration};

use hyper::{
server::conn::AddrStream,
service::{make_service_fn, service_fn},
};
use jsonrpsee::{
server::{stop_channel, ws, PingConfig, StopHandle, TowerServiceBuilder},
core::BoxError,
server::{
serve_with_graceful_shutdown, stop_channel, ws, PingConfig, StopHandle, TowerServiceBuilder,
},
Methods, RpcModule,
};
use middleware::NodeHealthProxyLayer;
Expand Down Expand Up @@ -97,6 +94,7 @@ struct PerConnection<RpcMiddleware, HttpMiddleware> {
metrics: Option<RpcMetrics>,
tokio_handle: tokio::runtime::Handle,
service_builder: TowerServiceBuilder<RpcMiddleware, HttpMiddleware>,
rate_limit_whitelisted_ips: Arc<Vec<IpNetwork>>,
}

/// Start RPC server listening on given address.
Expand Down Expand Up @@ -124,8 +122,8 @@ where
rate_limit_trust_proxy_headers,
} = config;

let std_listener = TcpListener::bind(addrs.as_slice()).await?.into_std()?;
let local_addr = std_listener.local_addr().ok();
let listener = TcpListener::bind(addrs.as_slice()).await?;
let local_addr = listener.local_addr().ok();
let host_filter = host_filtering(cors.is_some(), local_addr);

let http_middleware = tower::ServiceBuilder::new()
Expand Down Expand Up @@ -161,20 +159,38 @@ where
methods: build_rpc_api(rpc_api).into(),
service_builder: builder.to_service_builder(),
metrics,
tokio_handle,
stop_handle: stop_handle.clone(),
tokio_handle: tokio_handle.clone(),
stop_handle,
rate_limit_whitelisted_ips: Arc::new(rate_limit_whitelisted_ips),
};

let make_service = make_service_fn(move |addr: &AddrStream| {
Copy link
Member Author

@niklasad1 niklasad1 Jun 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hyper::service::make_service_fn is removed and we need to manage to socket ourselves.

Using to low-level API should easy to add support for ipv6 sockets but I'll fix it another PR.

let cfg = cfg.clone();
let rate_limit_whitelisted_ips = rate_limit_whitelisted_ips.clone();
let ip = addr.remote_addr().ip();

async move {
let cfg = cfg.clone();
let rate_limit_whitelisted_ips = rate_limit_whitelisted_ips.clone();
tokio_handle.spawn(async move {
loop {
let (sock, remote_addr) = tokio::select! {
res = listener.accept() => {
match res {
Ok(s) => s,
Err(e) => {
log::debug!(target: "rpc", "Failed to accept ipv4 connection: {:?}", e);
continue;
}
}
}
_ = cfg.stop_handle.clone().shutdown() => break,
};

let ip = remote_addr.ip();
let cfg2 = cfg.clone();
let svc = tower::service_fn(move |req: http::Request<hyper::body::Incoming>| {
let PerConnection {
methods,
service_builder,
metrics,
tokio_handle,
stop_handle,
rate_limit_whitelisted_ips,
} = cfg2.clone();

Ok::<_, Infallible>(service_fn(move |req| {
let proxy_ip =
if rate_limit_trust_proxy_headers { get_proxy_ip(&req) } else { None };

Expand All @@ -191,9 +207,6 @@ where
rate_limit
};

let PerConnection { service_builder, metrics, tokio_handle, stop_handle, methods } =
cfg.clone();

let is_websocket = ws::is_upgrade_request(&req);
let transport_label = if is_websocket { "ws" } else { "http" };

Expand All @@ -213,7 +226,6 @@ where

let rpc_middleware =
RpcServiceBuilder::new().option_layer(middleware_layer.clone());

let mut svc =
service_builder.set_rpc_middleware(rpc_middleware).build(methods, stop_handle);

Expand All @@ -230,17 +242,19 @@ where
});
}

svc.call(req).await
// https://github.com/rust-lang/rust/issues/102211 the error type can't be inferred
// to be `Box<dyn std::error::Error + Send + Sync>` so we need to convert it to
// a concrete type as workaround.
svc.call(req).await.map_err(|e| BoxError::from(e))
}
}))
}
});

let server = hyper::Server::from_tcp(std_listener)?.serve(make_service);
});

tokio::spawn(async move {
let graceful = server.with_graceful_shutdown(async move { stop_handle.shutdown().await });
let _ = graceful.await;
cfg.tokio_handle.spawn(serve_with_graceful_shutdown(
sock,
svc,
cfg.stop_handle.clone().shutdown(),
));
}
});

log::info!(
Expand Down
42 changes: 23 additions & 19 deletions substrate/client/rpc-servers/src/middleware/node_health.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@ use std::{

use futures::future::FutureExt;
use http::{HeaderValue, Method, StatusCode, Uri};
use hyper::Body;
use jsonrpsee::types::{Response as RpcResponse, ResponseSuccess as RpcResponseSuccess};
use jsonrpsee::{
server::{HttpBody, HttpRequest, HttpResponse},
types::{Response as RpcResponse, ResponseSuccess as RpcResponseSuccess},
};
use tower::Service;

const RPC_SYSTEM_HEALTH_CALL: &str = r#"{"jsonrpc":"2.0","method":"system_health","id":0}"#;
Expand Down Expand Up @@ -57,9 +59,9 @@ impl<S> NodeHealthProxy<S> {
}
}

impl<S> tower::Service<http::Request<Body>> for NodeHealthProxy<S>
impl<S> tower::Service<http::Request<hyper::body::Incoming>> for NodeHealthProxy<S>
where
S: Service<http::Request<Body>, Response = http::Response<Body>>,
S: Service<HttpRequest, Response = HttpResponse>,
S::Response: 'static,
S::Error: Into<Box<dyn Error + Send + Sync>> + 'static,
S::Future: Send + 'static,
Expand All @@ -73,7 +75,8 @@ where
self.0.poll_ready(cx).map_err(Into::into)
}

fn call(&mut self, mut req: http::Request<Body>) -> Self::Future {
fn call(&mut self, req: http::Request<hyper::body::Incoming>) -> Self::Future {
let mut req = req.map(|body| HttpBody::new(body));
let maybe_intercept = InterceptRequest::from_http(&req);

// Modify the request and proxy it to `system_health`
Expand All @@ -88,7 +91,7 @@ where
req.headers_mut().insert(http::header::ACCEPT, HEADER_VALUE_JSON);

// Adjust the body to reflect the method call.
req = req.map(|_| Body::from(RPC_SYSTEM_HEALTH_CALL));
req = req.map(|_| HttpBody::from(RPC_SYSTEM_HEALTH_CALL));
}

// Call the inner service and get a future that resolves to the response.
Expand All @@ -99,7 +102,7 @@ where

Ok(match maybe_intercept {
InterceptRequest::Deny =>
http_response(StatusCode::METHOD_NOT_ALLOWED, Body::empty()),
http_response(StatusCode::METHOD_NOT_ALLOWED, HttpBody::empty()),
InterceptRequest::No => res,
InterceptRequest::Health => {
let health = parse_rpc_response(res.into_body()).await?;
Expand All @@ -108,7 +111,7 @@ where
InterceptRequest::Readiness => {
let health = parse_rpc_response(res.into_body()).await?;
if (!health.is_syncing && health.peers > 0) || !health.should_have_peers {
http_ok_response(Body::empty())
http_ok_response(HttpBody::empty())
} else {
http_internal_error()
}
Expand All @@ -133,27 +136,28 @@ struct Health {
pub should_have_peers: bool,
}

fn http_ok_response<S: Into<hyper::Body>>(body: S) -> hyper::Response<hyper::Body> {
fn http_ok_response<S: Into<HttpBody>>(body: S) -> HttpResponse {
http_response(StatusCode::OK, body)
}

fn http_response<S: Into<hyper::Body>>(
status_code: StatusCode,
body: S,
) -> hyper::Response<hyper::Body> {
hyper::Response::builder()
fn http_response<S: Into<HttpBody>>(status_code: StatusCode, body: S) -> HttpResponse {
HttpResponse::builder()
.status(status_code)
.header(http::header::CONTENT_TYPE, HEADER_VALUE_JSON)
.body(body.into())
.expect("Header is valid; qed")
}

fn http_internal_error() -> hyper::Response<hyper::Body> {
http_response(hyper::StatusCode::INTERNAL_SERVER_ERROR, Body::empty())
fn http_internal_error() -> HttpResponse {
http_response(hyper::StatusCode::INTERNAL_SERVER_ERROR, HttpBody::empty())
}

async fn parse_rpc_response(body: Body) -> Result<Health, Box<dyn Error + Send + Sync + 'static>> {
let bytes = hyper::body::to_bytes(body).await?;
async fn parse_rpc_response(
body: HttpBody,
) -> Result<Health, Box<dyn Error + Send + Sync + 'static>> {
use http_body_util::BodyExt;

let bytes = body.collect().await?.to_bytes();

let raw_rp = serde_json::from_slice::<RpcResponse<Health>>(&bytes)?;
let rp = RpcResponseSuccess::<Health>::try_from(raw_rp)?;
Expand All @@ -178,7 +182,7 @@ enum InterceptRequest {
}

impl InterceptRequest {
fn from_http(req: &http::Request<Body>) -> InterceptRequest {
fn from_http(req: &HttpRequest) -> InterceptRequest {
match req.uri().path() {
"/health" =>
if req.method() == http::Method::GET {
Expand Down
Loading
Loading