Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement new JSON-RPC API #1687

Merged
merged 32 commits into from
Dec 17, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
64736c6
Implement new JSON-RPC API
tomaka Nov 26, 2021
768ba6c
Merge branch 'main' into new-json-rpc-api-impl
tomaka Nov 29, 2021
41b0293
Update to changes
tomaka Nov 29, 2021
732e1d9
Merge branch 'main' into new-json-rpc-api-impl
tomaka Nov 30, 2021
2d8f497
WIP
tomaka Nov 30, 2021
ac37235
WIP
tomaka Nov 30, 2021
51e3b8e
WIP
tomaka Nov 30, 2021
d285eac
WIP
tomaka Dec 1, 2021
ef26bb2
Merge branch 'main' into new-json-rpc-api-impl
tomaka Dec 1, 2021
70050b0
Merge branch 'main' into new-json-rpc-api-impl
tomaka Dec 15, 2021
9b60fd2
Fix compilation
tomaka Dec 15, 2021
e3f3564
Update link
tomaka Dec 15, 2021
c1fdbfc
Add the chainSpec methods
tomaka Dec 15, 2021
ae88a0c
Whoops, wrong variable
tomaka Dec 15, 2021
78a7c35
Add tests for chainSpec functions
tomaka Dec 15, 2021
b3253f4
Add toLowerCase() just in case, huhu
tomaka Dec 15, 2021
ccb2ea9
Add test for chainHead
tomaka Dec 15, 2021
32b3300
Duplicate westend.json for the tests
tomaka Dec 15, 2021
66a7992
Fix the camelCase issue
tomaka Dec 15, 2021
3347e07
More camelCase fixing
tomaka Dec 15, 2021
0a99ae8
Better sudo_unstable_version
tomaka Dec 16, 2021
4540c3c
Add sudo_unstable_p2pDiscover and tests
tomaka Dec 16, 2021
b4ceaf2
Formatting of methods
tomaka Dec 16, 2021
91580fe
Merge branch 'main' into new-json-rpc-api-impl
tomaka Dec 16, 2021
842dba0
WIP
tomaka Dec 16, 2021
02486ee
Merge branch 'main' into new-json-rpc-api-impl
tomaka Dec 17, 2021
063e876
Way better follow implementation
tomaka Dec 17, 2021
4c0f11d
Remove bootnodes from test Westend
tomaka Dec 17, 2021
7a36b4a
Merge branch 'main' into new-json-rpc-api-impl
tomaka Dec 17, 2021
2536930
Finish follow implementation
tomaka Dec 17, 2021
2a2646b
Add test for chainHead_unstable_follow
tomaka Dec 17, 2021
9afd917
Rustfmt
tomaka Dec 17, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 115 additions & 0 deletions bin/wasm-node/rust/src/json_rpc_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,7 @@ enum SubscriptionTy {
Storage,
Transaction,
RuntimeSpec,
Follow,
}

struct Blocks {
Expand Down Expand Up @@ -1276,6 +1277,24 @@ impl Background {
)
.await;
}

methods::MethodCall::chainHead_follow_unstable { runtimeUpdates } => {
self.chain_head_follow(request_id, runtimeUpdates).await;
}
methods::MethodCall::chainHead_genesisHash_unstable {} => {
let _ = self
.responses_sender
.lock()
.await
.send(
methods::Response::chainHead_genesisHash_unstable(methods::HashHexString(
self.genesis_block,
))
.to_json_response(request_id),
)
.await;
}

_method => {
log::error!(target: &self.log_target, "JSON-RPC call not supported yet: {:?}", _method);
let _ = self
Expand Down Expand Up @@ -1874,6 +1893,102 @@ impl Background {
}
}

/// Handles a call to [`methods::MethodCall::chainHead_follow_unstable`].
async fn chain_head_follow(&self, request_id: &str, runtime_updates: bool) {
assert!(!runtime_updates); // TODO: not supported yet

let (subscription, mut unsubscribe_rx) =
match self.alloc_subscription(SubscriptionTy::Follow).await {
Ok(v) => v,
Err(()) => {
let _ = self
.responses_sender
.lock()
.await
.send(json_rpc::parse::build_error_response(
request_id,
json_rpc::parse::ErrorResponse::ServerError(
-32000,
"Too many active subscriptions",
),
None,
))
.await;
return;
}
};

let mut subscribe_all = self.sync_service.subscribe_all(32).await;

let confirmation = methods::Response::chainHead_follow_unstable(methods::FollowResult {
subscription_id: subscription.clone(),
finalized_block_hash: methods::HashHexString(header::hash_from_scale_encoded_header(
&subscribe_all.finalized_block_scale_encoded_header[..],
)),
finalized_block_runtime: None,
})
.to_json_response(request_id);

let mut responses_sender = self.responses_sender.lock().await.clone();

// Spawn a separate task for the subscription.
self.new_child_tasks_tx
.lock()
.await
.unbounded_send(Box::pin(async move {
// Send back to the user the confirmation of the registration.
let _ = responses_sender.send(confirmation).await;

loop {
// Wait for either a new block, or for the subscription to be canceled.
let next_block = subscribe_all.new_blocks.next();
futures::pin_mut!(next_block);
match future::select(next_block, &mut unsubscribe_rx).await {
future::Either::Left((None, _)) => {
let _ = responses_sender
.send(json_rpc::parse::build_subscription_event(
"chainHead_followEvent_unstable",
&subscription,
&"{\"event\": \"stop\"}",
))
.await;
}
future::Either::Left((
Some(sync_service::Notification::Finalized {
hash,
best_block_hash,
}),
_,
)) => {}
future::Either::Left((
Some(sync_service::Notification::Block(block)),
_,
)) => {
let hash =
header::hash_from_scale_encoded_header(&block.scale_encoded_header);
let _ = responses_sender
.send(json_rpc::parse::build_subscription_event(
"chainHead_followEvent_unstable",
&subscription,
&"", // TODO:
))
.await;

// TODO: handle is_new_best
}
future::Either::Right((Ok(unsub_request_id), _)) => {
let response = methods::Response::chainHead_unfollow_unstable(())
.to_json_response(&unsub_request_id);
let _ = responses_sender.send(response).await;
break;
}
future::Either::Right((Err(_), _)) => break,
}
}
}))
.unwrap();
}

/// Allocates a new subscription ID. Also checks the maximum number of subscriptions.
async fn alloc_subscription(
&self,
Expand Down
111 changes: 109 additions & 2 deletions src/json_rpc/methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use alloc::{
string::{String, ToString as _},
vec::Vec,
};
use hashbrown::HashMap;

/// Parses a JSON call (usually received from a JSON-RPC server).
///
Expand Down Expand Up @@ -150,7 +151,7 @@ macro_rules! define_methods {
$([$($alias:ident),*])*
,
)*) => {
#[allow(non_camel_case_types)]
#[allow(non_camel_case_types, non_snake_case)]
#[derive(Debug, Clone)]
pub enum $rq_name<'a> {
$(
Expand Down Expand Up @@ -341,9 +342,22 @@ define_methods! {
system_removeReservedPeer() -> (), // TODO:
/// Returns, as an opaque string, the version of the client serving these JSON-RPC requests.
system_version() -> &'a str,

// The functions below are experimental and are defined in the document https://hackmd.io/@JF-CdHdTQdSl-2XgOR4SnA/rJ8SaI5Pt
chainHead_body_unstable(followSubscriptionId: &'a str, hash: HashHexString, networkConfig: Option<NetworkConfig>) -> &'a str,
chainHead_bodyEnd_unstable(subscriptionId: &'a str) -> (),
chainHead_call_unstable(followSubscriptionId: &'a str, hash: HashHexString, function: &'a str, callParameters: Vec<HexString>, networkConfig: Option<NetworkConfig>) -> &'a str,
chainHead_callEnd_unstable(subscriptionId: &'a str) -> (),
chainHead_follow_unstable(runtimeUpdates: bool) -> FollowResult,
chainHead_genesisHash_unstable() -> HashHexString,
chainHead_header_unstable(followSubscriptionId: &'a str, hash: HashHexString) -> Option<&'a str>,
chainHead_storage_unstable(followSubscriptionId: &'a str, hash: HashHexString, key: HexString, childKey: Option<HexString>, r#type: StorageQueryType, networkConfig: Option<NetworkConfig>) -> &'a str,
chainHead_storageEnd_unstable(subscriptionId: &'a str) -> (),
chainHead_unfollow_unstable(followSubscriptionId: &'a str) -> (),
chainHead_unpin_unstable(followSubscriptionId: &'a str, hash: HashHexString) -> (),
}

#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct HexString(pub Vec<u8>);

// TODO: not great for type in public API
Expand Down Expand Up @@ -433,6 +447,19 @@ pub struct Block {
pub justification: Option<HexString>,
}

#[derive(Debug, Clone, serde::Serialize)]
pub struct FollowResult {
#[serde(rename = "subscriptionId")]
pub subscription_id: String,
#[serde(rename = "finalizedBlockHash")]
pub finalized_block_hash: HashHexString,
#[serde(
rename = "finalizedBlockRuntime",
skip_serializing_if = "Option::is_none"
)]
pub finalized_block_runtime: Option<MaybeRuntimeSpec>,
}

#[derive(Debug, Clone, serde::Serialize)]
pub struct Header {
#[serde(rename = "parentHash")]
Expand Down Expand Up @@ -478,12 +505,41 @@ pub struct HeaderDigest {
pub logs: Vec<HexString>,
}

#[derive(Debug, Clone, serde::Deserialize)]
pub struct NetworkConfig {
#[serde(rename = "totalAttempts")]
pub total_attempts: u32,
#[serde(rename = "maxParallel")]
pub max_parallel: u32,
#[serde(rename = "timeoutMs")]
pub timeout_ms: u32,
}

#[derive(Debug, Clone)]
pub struct RpcMethods {
pub version: u64,
pub methods: Vec<String>,
}

// TODO: more strongly typed
#[derive(Debug, Clone, serde::Serialize)]
pub struct MaybeRuntimeSpec {
pub r#type: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub spec: Option<RuntimeSpec>,
}

#[derive(Debug, Clone)]
pub struct RuntimeSpec {
pub spec_name: String,
pub impl_name: String,
pub authoring_version: u64,
pub spec_version: u64,
pub impl_version: u64,
pub transaction_version: Option<u64>,
pub apis: Vec<([u8; 8], u32)>,
}

#[derive(Debug, Clone)]
pub struct RuntimeVersion {
pub spec_name: String,
Expand Down Expand Up @@ -515,6 +571,16 @@ pub struct StorageChangeSet {
pub changes: Vec<(HexString, Option<HexString>)>,
}

#[derive(Debug, Clone, serde::Deserialize)]
pub enum StorageQueryType {
#[serde(rename = "value")]
Value,
#[serde(rename = "hash")]
Hash,
#[serde(rename = "size")]
Size,
}

#[derive(Debug, Clone)]
pub struct SystemHealth {
pub is_syncing: bool,
Expand Down Expand Up @@ -622,6 +688,47 @@ impl serde::Serialize for Block {
}
}

impl serde::Serialize for RuntimeSpec {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
#[derive(serde::Serialize)]
struct SerdeRuntimeVersion<'a> {
#[serde(rename = "specName")]
spec_name: &'a str,
#[serde(rename = "implName")]
impl_name: &'a str,
#[serde(rename = "authoringVersion")]
authoring_version: u64,
#[serde(rename = "specVersion")]
spec_version: u64,
#[serde(rename = "implVersion")]
impl_version: u64,
#[serde(rename = "transactionVersion", skip_serializing_if = "Option::is_none")]
transaction_version: Option<u64>,
// TODO: optimize?
apis: HashMap<HexString, u32>,
}

SerdeRuntimeVersion {
spec_name: &self.spec_name,
impl_name: &self.impl_name,
authoring_version: self.authoring_version,
spec_version: self.spec_version,
impl_version: self.impl_version,
transaction_version: self.transaction_version,
// TODO: optimize?
apis: self
.apis
.iter()
.map(|(name_hash, version)| (HexString(name_hash.to_vec()), *version))
.collect(),
}
.serialize(serializer)
}
}

impl serde::Serialize for RuntimeVersion {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
Expand Down