Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
add backwards compability
Browse files Browse the repository at this point in the history
  • Loading branch information
niklasad1 committed Nov 10, 2022
1 parent 0ef7b68 commit a527da9
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 25 deletions.
4 changes: 2 additions & 2 deletions bin/node/cli/tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,14 +163,14 @@ pub fn find_ws_url_from_output(read: impl Read + Send) -> (String, String) {
data.push_str(&line);

// does the line contain our port (we expect this specific output from substrate).
let sock_addr = match line.split_once("Running JSON-RPC server: addr=") {
let sock_addr = match line.split_once("Running JSON-RPC WS server: addr=") {
None => return None,
Some((_, after)) => after.split_once(",").unwrap().0,
};

Some(format!("ws://{}", sock_addr))
})
.expect("We should get an address");
.expect("We should get a WebSocket address");

(ws_url, data)
}
39 changes: 24 additions & 15 deletions client/rpc-servers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ impl Config {
}
}

/// Start WS server listening on given address.
/// Start JSON-RPC server listening on given address.
pub async fn start_server<M: Send + Sync + 'static>(
addrs: [SocketAddr; 2],
cors: Option<&Vec<String>>,
Expand All @@ -89,20 +89,23 @@ pub async fn start_server<M: Send + Sync + 'static>(
rpc_api: RpcModule<M>,
rt: tokio::runtime::Handle,
id_provider: Option<Box<dyn IdProvider>>,
transport_label: &str,
) -> Result<ServerHandle, Box<dyn StdError + Send + Sync>> {
let (max_payload_in, max_payload_out, max_connections, max_subs_per_conn) =
config.deconstruct();

let (allow_hosts, cors) = {
let host_filter = hosts_filter(cors.is_some(), &addrs);

let cors = {
if let Some(cors) = cors {
let mut list = Vec::new();
for origin in cors {
list.push(HeaderValue::from_str(origin.as_str())?);
}
(allowed_hosts(&addrs), CorsLayer::new().allow_origin(AllowOrigin::list(list)))
CorsLayer::new().allow_origin(AllowOrigin::list(list))
} else {
// allow all cors
(AllowHosts::Any, CorsLayer::permissive())
CorsLayer::permissive()
}
};

Expand All @@ -117,7 +120,7 @@ pub async fn start_server<M: Send + Sync + 'static>(
.max_connections(max_connections)
.max_subscriptions_per_connection(max_subs_per_conn)
.ping_interval(std::time::Duration::from_secs(30))
.set_host_filtering(allow_hosts)
.set_host_filtering(host_filter)
.set_middleware(middleware)
.custom_tokio_runtime(rt);

Expand All @@ -140,23 +143,15 @@ pub async fn start_server<M: Send + Sync + 'static>(
};

log::info!(
"Running JSON-RPC server: addr={}, cors={:?}",
"Running JSON-RPC {} server: addr={}, cors={:?}",
transport_label,
addr.map_or_else(|_| "unknown".to_string(), |a| a.to_string()),
cors
);

Ok(handle)
}

fn allowed_hosts(addrs: &[SocketAddr]) -> AllowHosts {
let mut hosts = Vec::with_capacity(addrs.len() * 2);
for addr in addrs {
hosts.push(format!("localhost:{}", addr.port()).into());
hosts.push(format!("127.0.0.1:{}", addr.port()).into());
}
AllowHosts::Only(hosts)
}

fn build_rpc_api<M: Send + Sync + 'static>(mut rpc_api: RpcModule<M>) -> RpcModule<M> {
let mut available_methods = rpc_api.method_names().collect::<Vec<_>>();
available_methods.sort();
Expand All @@ -175,3 +170,17 @@ fn build_rpc_api<M: Send + Sync + 'static>(mut rpc_api: RpcModule<M>) -> RpcModu
fn payload_size_or_default(size_mb: Option<usize>) -> usize {
size_mb.map_or(RPC_MAX_PAYLOAD_DEFAULT, |mb| mb.saturating_mul(MEGABYTE))
}

fn hosts_filter(enabled: bool, addrs: &[SocketAddr]) -> AllowHosts {
if enabled {
// NOTE The listening addresses are whitelisted by default.
let mut hosts = Vec::with_capacity(addrs.len() * 2);
for addr in addrs {
hosts.push(format!("localhost:{}", addr.port()).into());
hosts.push(format!("127.0.0.1:{}", addr.port()).into());
}
AllowHosts::Only(hosts)
} else {
AllowHosts::Any
}
}
40 changes: 32 additions & 8 deletions client/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,10 +329,15 @@ where
addr
};

let addr = config
let ws_addr = config
.rpc_ws
.unwrap_or_else(|| "127.0.0.1:9944".parse().expect("valid sockaddr; qed"));
let addr2 = random_port(addr);
let ws_addr2 = random_port(ws_addr);

let http_addr = config
.rpc_http
.unwrap_or_else(|| "127.0.0.1:9933".parse().expect("valid sockaddr; qed"));
let http_addr2 = random_port(http_addr);

let metrics = sc_rpc_server::RpcMetrics::new(config.prometheus_registry())?;

Expand All @@ -343,18 +348,37 @@ where
max_subs_per_conn: config.rpc_max_subs_per_conn,
};

let server_fut = sc_rpc_server::start_server(
[addr, addr2],
let ws_fut = sc_rpc_server::start_server(
[ws_addr, ws_addr2],
config.rpc_cors.as_ref(),
server_config.clone(),
metrics.clone(),
gen_rpc_module(deny_unsafe(ws_addr, &config.rpc_methods))?,
config.tokio_handle.clone(),
rpc_id_provider,
"WS",
);

let http_fut = sc_rpc_server::start_server(
[http_addr, http_addr2],
config.rpc_cors.as_ref(),
server_config,
metrics,
gen_rpc_module(deny_unsafe(addr, &config.rpc_methods))?,
gen_rpc_module(deny_unsafe(http_addr, &config.rpc_methods))?,
config.tokio_handle.clone(),
rpc_id_provider,
// NOTE: this is hack to be backwards compatible because the `RpcSubscriptionIdProvider`
// isn't cloneable and we don't expect ws connections to the old http server.
//
// The only downside to that is if one configures a custom subscription ID generator
// then this will use the default one.
None,
"HTTP",
);

match tokio::task::block_in_place(|| config.tokio_handle.block_on(server_fut)) {
Ok(server) => Ok(Box::new(waiting::Server(Some(server)))),
match tokio::task::block_in_place(|| {
config.tokio_handle.block_on(futures::future::try_join(http_fut, ws_fut))
}) {
Ok((http, ws)) => Ok(Box::new((waiting::Server(Some(http)), waiting::Server(Some(ws))))),
Err(e) => Err(Error::Application(e)),
}
}
Expand Down

0 comments on commit a527da9

Please sign in to comment.