Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rpc: add prometheus metrics for dropped subscriptions #4835

Closed
wants to merge 25 commits into from
Closed
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
12c57c9
rpc: upgrade jsonrpsee v0.23
niklasad1 May 27, 2024
0aadd3d
Merge remote-tracking branch 'origin/master' into na-jsonrpsee-v0.23
niklasad1 Jun 7, 2024
15691d6
cleanup
niklasad1 Jun 7, 2024
f681f0e
make it compile
niklasad1 Jun 7, 2024
bd74645
fix test build
niklasad1 Jun 7, 2024
88a7c00
Merge remote-tracking branch 'origin/master' into na-jsonrpsee-v0.23
niklasad1 Jun 7, 2024
369db7e
jsonrpsee v0.23.1
niklasad1 Jun 10, 2024
fa8cda3
Merge remote-tracking branch 'origin/master' into na-jsonrpsee-v0.23
niklasad1 Jun 10, 2024
a2418a2
remove needless deps
niklasad1 Jun 12, 2024
b198759
log conn_data when subscriptions lagged
niklasad1 Jun 18, 2024
ff100b8
Merge remote-tracking branch 'origin/master' into na-jsonrpsee-v0.23-…
niklasad1 Jun 18, 2024
c059759
rpc: add prometheous metrics dropped subscriptions
niklasad1 Jun 17, 2024
c463349
cleanup
niklasad1 Jun 19, 2024
c7be842
fix nit2
niklasad1 Jun 19, 2024
c0bd2ed
bridges: add `serde_json` dependency
niklasad1 Jun 19, 2024
d0ae05c
more nits
niklasad1 Jun 19, 2024
06fd1ba
fix tests
niklasad1 Jun 20, 2024
1e1b506
Merge remote-tracking branch 'origin/master' into na-jsonrpsee-v0.23-…
niklasad1 Jun 20, 2024
877fe59
fix more tests
niklasad1 Jun 20, 2024
a74655e
Update substrate/client/rpc/src/utils.rs
niklasad1 Jun 21, 2024
8e1678f
Update substrate/client/rpc/src/utils.rs
niklasad1 Jun 21, 2024
7889147
Merge remote-tracking branch 'origin/master' into na-jsonrpsee-v0.23-…
niklasad1 Jun 26, 2024
de221dc
Merge remote-tracking branch 'origin/master' into na-jsonrpsee-v0.23-…
niklasad1 Jun 26, 2024
81d555d
cargo fmt
niklasad1 Jun 26, 2024
7208c17
Merge branch 'master' into na-jsonrpsee-v0.23-with-ip-addr
niklasad1 Jul 5, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
361 changes: 284 additions & 77 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion bridges/relays/client-substrate/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@ async-std = { version = "1.9.0", features = ["attributes"] }
async-trait = "0.1.79"
codec = { package = "parity-scale-codec", version = "3.6.12" }
futures = "0.3.30"
jsonrpsee = { version = "0.22", features = ["macros", "ws-client"] }
jsonrpsee = { version = "0.23.1", features = ["macros", "ws-client"] }
log = { workspace = true }
num-traits = "0.2"
rand = "0.8.5"
serde_json = { workspace = true }
scale-info = { version = "2.11.1", features = ["derive"] }
tokio = { version = "1.37", features = ["rt-multi-thread"] }
thiserror = { workspace = true }
Expand Down
3 changes: 1 addition & 2 deletions bridges/relays/client-substrate/src/client/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use async_std::{
stream::StreamExt,
};
use futures::{FutureExt, Stream};
use jsonrpsee::core::ClientError;
use sp_runtime::DeserializeOwned;
use std::{
fmt::Debug,
Expand Down Expand Up @@ -143,7 +142,7 @@ impl<T: 'static + Clone + DeserializeOwned + Send> Subscription<T> {
/// Create new forwarded subscription.
pub fn new_forwarded(
desc: StreamDescription,
subscription: impl Stream<Item = StdResult<T, ClientError>> + Unpin + Send + 'static,
subscription: impl Stream<Item = StdResult<T, serde_json::Error>> + Unpin + Send + 'static,
) -> Self {
Self {
desc: desc.clone(),
Expand Down
2 changes: 1 addition & 1 deletion cumulus/client/relay-chain-interface/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,5 @@ sp-version = { path = "../../../substrate/primitives/version", default-features
futures = "0.3.28"
async-trait = "0.1.79"
thiserror = { workspace = true }
jsonrpsee-core = "0.22"
jsonrpsee-core = "0.23.1"
codec = { package = "parity-scale-codec", version = "3.6.12" }
2 changes: 1 addition & 1 deletion cumulus/client/relay-chain-rpc-interface/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ tokio-util = { version = "0.7.8", features = ["compat"] }
futures = "0.3.28"
futures-timer = "3.0.2"
codec = { package = "parity-scale-codec", version = "3.6.12" }
jsonrpsee = { version = "0.22", features = ["ws-client"] }
jsonrpsee = { version = "0.23.1", features = ["ws-client"] }
tracing = "0.1.37"
async-trait = "0.1.79"
url = "2.4.0"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

use futures::{channel::mpsc::Sender, prelude::*, stream::FuturesUnordered};
use jsonrpsee::core::client::{
Client as JsonRpseeClient, ClientBuilder, ClientT, Error, ReceivedMessage, TransportReceiverT,
Client as JsonRpseeClient, ClientBuilder, ClientT, ReceivedMessage, TransportReceiverT,
TransportSenderT,
};
use smoldot_light::{ChainId, Client as SmoldotClient, JsonRpcResponses};
Expand Down Expand Up @@ -124,7 +124,7 @@ pub struct LightClientRpcWorker {
}

fn handle_notification(
maybe_header: Option<Result<RelayHeader, Error>>,
maybe_header: Option<Result<RelayHeader, serde_json::Error>>,
senders: &mut Vec<Sender<RelayHeader>>,
) -> Result<(), ()> {
match maybe_header {
Expand Down
2 changes: 1 addition & 1 deletion cumulus/polkadot-parachain/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ coretime-rococo-runtime = { path = "../parachains/runtimes/coretime/coretime-roc
coretime-westend-runtime = { path = "../parachains/runtimes/coretime/coretime-westend" }
bridge-hub-westend-runtime = { path = "../parachains/runtimes/bridge-hubs/bridge-hub-westend" }
penpal-runtime = { path = "../parachains/runtimes/testing/penpal" }
jsonrpsee = { version = "0.22", features = ["server"] }
jsonrpsee = { version = "0.23.1", features = ["server"] }
people-rococo-runtime = { path = "../parachains/runtimes/people/people-rococo" }
people-westend-runtime = { path = "../parachains/runtimes/people/people-westend" }
parachains-common = { path = "../parachains/common" }
Expand Down
2 changes: 1 addition & 1 deletion cumulus/test/service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ async-trait = "0.1.79"
clap = { version = "4.5.3", features = ["derive"] }
codec = { package = "parity-scale-codec", version = "3.6.12" }
criterion = { version = "0.5.1", features = ["async_tokio"] }
jsonrpsee = { version = "0.22", features = ["server"] }
jsonrpsee = { version = "0.23.1", features = ["server"] }
rand = "0.8.5"
serde = { features = ["derive"], workspace = true, default-features = true }
serde_json = { workspace = true, default-features = true }
Expand Down
2 changes: 1 addition & 1 deletion polkadot/rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ description = "Polkadot specific RPC functionality."
workspace = true

[dependencies]
jsonrpsee = { version = "0.22", features = ["server"] }
jsonrpsee = { version = "0.23.1", features = ["server"] }
polkadot-primitives = { path = "../primitives" }
sc-client-api = { path = "../../substrate/client/api" }
sp-blockchain = { path = "../../substrate/primitives/blockchain" }
Expand Down
4 changes: 4 additions & 0 deletions polkadot/rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@ where
let chain_name = chain_spec.name().to_string();
let genesis_hash = client.hash(0).ok().flatten().expect("Genesis block exists; qed");
let properties = chain_spec.properties();
// TODO: This should be a shared metrics instance.
let metrics = sc_rpc::SubscriptionMetrics::disabled();

io.merge(ChainSpec::new(chain_name, genesis_hash, properties).into_rpc())?;
io.merge(StateMigration::new(client.clone(), backend.clone(), deny_unsafe).into_rpc())?;
Expand All @@ -167,6 +169,7 @@ where
shared_voter_state,
justification_stream,
finality_provider,
metrics.clone(),
)
.into_rpc(),
)?;
Expand All @@ -179,6 +182,7 @@ where
beefy.beefy_finality_proof_stream,
beefy.beefy_best_block_stream,
beefy.subscription_executor,
metrics,
)?
.into_rpc(),
)?;
Expand Down
2 changes: 1 addition & 1 deletion substrate/bin/node/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ array-bytes = "6.1"
clap = { version = "4.5.3", features = ["derive"], optional = true }
codec = { package = "parity-scale-codec", version = "3.6.12" }
serde = { features = ["derive"], workspace = true, default-features = true }
jsonrpsee = { version = "0.22", features = ["server"] }
jsonrpsee = { version = "0.23.1", features = ["server"] }
futures = "0.3.30"
log = { workspace = true, default-features = true }
rand = "0.8"
Expand Down
2 changes: 1 addition & 1 deletion substrate/bin/node/rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ workspace = true
targets = ["x86_64-unknown-linux-gnu"]

[dependencies]
jsonrpsee = { version = "0.22", features = ["server"] }
jsonrpsee = { version = "0.23.1", features = ["server"] }
node-primitives = { path = "../primitives" }
pallet-transaction-payment-rpc = { path = "../../../frame/transaction-payment/rpc" }
mmr-rpc = { path = "../../../client/merkle-mountain-range/rpc" }
Expand Down
4 changes: 4 additions & 0 deletions substrate/bin/node/rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,8 @@ where
let chain_name = chain_spec.name().to_string();
let genesis_hash = client.block_hash(0).ok().flatten().expect("Genesis block exists; qed");
let properties = chain_spec.properties();
let metrics = sc_rpc::SubscriptionMetrics::disabled();

io.merge(ChainSpec::new(chain_name, genesis_hash, properties).into_rpc())?;

io.merge(System::new(client.clone(), pool, deny_unsafe).into_rpc())?;
Expand Down Expand Up @@ -206,6 +208,7 @@ where
shared_voter_state,
justification_stream,
finality_provider,
metrics.clone(),
)
.into_rpc(),
)?;
Expand All @@ -231,6 +234,7 @@ where
beefy.beefy_finality_proof_stream,
beefy.beefy_best_block_stream,
beefy.subscription_executor,
metrics,
)?
.into_rpc(),
)?;
Expand Down
2 changes: 1 addition & 1 deletion substrate/client/consensus/babe/rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ workspace = true
targets = ["x86_64-unknown-linux-gnu"]

[dependencies]
jsonrpsee = { version = "0.22.5", features = ["client-core", "macros", "server-core"] }
jsonrpsee = { version = "0.23.1", features = ["client-core", "macros", "server-core"] }
futures = "0.3.30"
serde = { features = ["derive"], workspace = true, default-features = true }
thiserror = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion substrate/client/consensus/beefy/rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ workspace = true
[dependencies]
codec = { package = "parity-scale-codec", version = "3.6.12", features = ["derive"] }
futures = "0.3.30"
jsonrpsee = { version = "0.22.5", features = ["client-core", "macros", "server-core"] }
jsonrpsee = { version = "0.23.1", features = ["client-core", "macros", "server-core"] }
log = { workspace = true, default-features = true }
parking_lot = "0.12.1"
serde = { features = ["derive"], workspace = true, default-features = true }
Expand Down
36 changes: 28 additions & 8 deletions substrate/client/consensus/beefy/rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ use parking_lot::RwLock;
use sp_consensus_beefy::AuthorityIdBound;
use std::sync::Arc;

use sc_rpc::{utils::pipe_from_stream, SubscriptionTaskExecutor};
use sc_rpc::{
utils::pipe_from_stream, SubscriptionMetrics, SubscriptionParams, SubscriptionTaskExecutor,
};
use sp_application_crypto::RuntimeAppPublic;
use sp_runtime::traits::Block as BlockT;

Expand All @@ -33,7 +35,7 @@ use jsonrpsee::{
core::async_trait,
proc_macros::rpc,
types::{ErrorObject, ErrorObjectOwned},
PendingSubscriptionSink,
ConnectionId, Extensions, PendingSubscriptionSink,
};
use log::warn;

Expand Down Expand Up @@ -87,6 +89,7 @@ pub trait BeefyApi<Notification, Hash> {
name = "beefy_subscribeJustifications" => "beefy_justifications",
unsubscribe = "beefy_unsubscribeJustifications",
item = Notification,
with_extensions
)]
fn subscribe_justifications(&self);

Expand All @@ -104,6 +107,7 @@ pub struct Beefy<Block: BlockT, AuthorityId: AuthorityIdBound> {
finality_proof_stream: BeefyVersionedFinalityProofStream<Block, AuthorityId>,
beefy_best_block: Arc<RwLock<Option<Block::Hash>>>,
executor: SubscriptionTaskExecutor,
metrics: SubscriptionMetrics,
}

impl<Block, AuthorityId> Beefy<Block, AuthorityId>
Expand All @@ -116,6 +120,7 @@ where
finality_proof_stream: BeefyVersionedFinalityProofStream<Block, AuthorityId>,
best_block_stream: BeefyBestBlockStream<Block>,
executor: SubscriptionTaskExecutor,
metrics: SubscriptionMetrics,
) -> Result<Self, Error> {
let beefy_best_block = Arc::new(RwLock::new(None));

Expand All @@ -127,7 +132,7 @@ where
});

executor.spawn("substrate-rpc-subscription", Some("rpc"), future.map(drop).boxed());
Ok(Self { finality_proof_stream, beefy_best_block, executor })
Ok(Self { finality_proof_stream, beefy_best_block, executor, metrics })
}
}

Expand All @@ -139,13 +144,23 @@ where
AuthorityId: AuthorityIdBound,
<AuthorityId as RuntimeAppPublic>::Signature: Send + Sync,
{
fn subscribe_justifications(&self, pending: PendingSubscriptionSink) {
fn subscribe_justifications(&self, pending: PendingSubscriptionSink, ext: &Extensions) {
let params = SubscriptionParams {
conn_id: *ext.get::<ConnectionId>().expect("ConnectionId is set"),
ip_addr: *ext.get::<std::net::IpAddr>().expect("IpAddr is set"),
method: "beefy_subscribeJustifications",
metrics: self.metrics.clone(),
};

let stream = self
.finality_proof_stream
.subscribe(100_000)
.map(|vfp| notification::EncodedVersionedFinalityProof::new::<Block, AuthorityId>(vfp));

sc_rpc::utils::spawn_subscription_task(&self.executor, pipe_from_stream(pending, stream));
sc_rpc::utils::spawn_subscription_task(
&self.executor,
pipe_from_stream(pending, stream, params),
);
}

async fn latest_finalized(&self) -> Result<Block::Hash, Error> {
Expand All @@ -163,6 +178,7 @@ mod tests {
communication::notification::BeefyVersionedFinalityProofSender,
justification::BeefyVersionedFinalityProof,
};
use sc_rpc::SubscriptionMetrics;
use sp_consensus_beefy::{ecdsa_crypto, known_payloads, Payload, SignedCommitment};
use sp_runtime::traits::{BlakeTwo256, Hash};
use substrate_test_runtime_client::runtime::Block;
Expand All @@ -184,9 +200,13 @@ mod tests {
let (finality_proof_sender, finality_proof_stream) =
BeefyVersionedFinalityProofStream::<Block, ecdsa_crypto::AuthorityId>::channel();

let handler =
Beefy::new(finality_proof_stream, best_block_stream, sc_rpc::testing::test_executor())
.expect("Setting up the BEEFY RPC handler works");
let handler = Beefy::new(
finality_proof_stream,
best_block_stream,
sc_rpc::testing::test_executor(),
SubscriptionMetrics::disabled()
)
.expect("Setting up the BEEFY RPC handler works");

(handler.into_rpc(), finality_proof_sender)
}
Expand Down
2 changes: 1 addition & 1 deletion substrate/client/consensus/grandpa/rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ workspace = true
[dependencies]
finality-grandpa = { version = "0.16.2", features = ["derive-codec"] }
futures = "0.3.30"
jsonrpsee = { version = "0.22.5", features = ["client-core", "macros", "server-core"] }
jsonrpsee = { version = "0.23.1", features = ["client-core", "macros", "server-core"] }
log = { workspace = true, default-features = true }
codec = { package = "parity-scale-codec", version = "3.6.12", features = ["derive"] }
serde = { features = ["derive"], workspace = true, default-features = true }
Expand Down
34 changes: 29 additions & 5 deletions substrate/client/consensus/grandpa/rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use std::sync::Arc;
use jsonrpsee::{
core::{async_trait, server::PendingSubscriptionSink},
proc_macros::rpc,
ConnectionId, Extensions,
};

mod error;
Expand All @@ -38,7 +39,9 @@ use finality::{EncodedFinalityProof, RpcFinalityProofProvider};
use notification::JustificationNotification;
use report::{ReportAuthoritySet, ReportVoterState, ReportedRoundStates};
use sc_consensus_grandpa::GrandpaJustificationStream;
use sc_rpc::{utils::pipe_from_stream, SubscriptionTaskExecutor};
use sc_rpc::{
utils::pipe_from_stream, SubscriptionMetrics, SubscriptionParams, SubscriptionTaskExecutor,
};
use sp_runtime::traits::{Block as BlockT, NumberFor};

/// Provides RPC methods for interacting with GRANDPA.
Expand All @@ -54,7 +57,8 @@ pub trait GrandpaApi<Notification, Hash, Number> {
#[subscription(
name = "grandpa_subscribeJustifications" => "grandpa_justifications",
unsubscribe = "grandpa_unsubscribeJustifications",
item = Notification
item = Notification,
with_extensions
)]
fn subscribe_justifications(&self);

Expand All @@ -71,6 +75,7 @@ pub struct Grandpa<AuthoritySet, VoterState, Block: BlockT, ProofProvider> {
voter_state: VoterState,
justification_stream: GrandpaJustificationStream<Block>,
finality_proof_provider: Arc<ProofProvider>,
metrics: SubscriptionMetrics,
}
impl<AuthoritySet, VoterState, Block: BlockT, ProofProvider>
Grandpa<AuthoritySet, VoterState, Block, ProofProvider>
Expand All @@ -82,8 +87,16 @@ impl<AuthoritySet, VoterState, Block: BlockT, ProofProvider>
voter_state: VoterState,
justification_stream: GrandpaJustificationStream<Block>,
finality_proof_provider: Arc<ProofProvider>,
metrics: SubscriptionMetrics,
) -> Self {
Self { executor, authority_set, voter_state, justification_stream, finality_proof_provider }
Self {
executor,
authority_set,
voter_state,
justification_stream,
finality_proof_provider,
metrics,
}
}
}

Expand All @@ -101,14 +114,24 @@ where
ReportedRoundStates::from(&self.authority_set, &self.voter_state)
}

fn subscribe_justifications(&self, pending: PendingSubscriptionSink) {
fn subscribe_justifications(&self, pending: PendingSubscriptionSink, ext: &Extensions) {
let params = SubscriptionParams {
conn_id: *ext.get::<ConnectionId>().expect("ConnectionId is set"),
ip_addr: *ext.get::<std::net::IpAddr>().expect("IpAddr is set"),
method: "grandpa_subscribeJustifications",
metrics: self.metrics.clone(),
};

let stream = self.justification_stream.subscribe(100_000).map(
|x: sc_consensus_grandpa::GrandpaJustification<Block>| {
JustificationNotification::from(x)
},
);

sc_rpc::utils::spawn_subscription_task(&self.executor, pipe_from_stream(pending, stream));
sc_rpc::utils::spawn_subscription_task(
&self.executor,
pipe_from_stream(pending, stream, params),
);
}

async fn prove_finality(
Expand Down Expand Up @@ -260,6 +283,7 @@ mod tests {
voter_state,
justification_stream,
finality_proof_provider,
SubscriptionMetrics::disabled(),
)
.into_rpc();

Expand Down
2 changes: 1 addition & 1 deletion substrate/client/consensus/manual-seal/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ workspace = true
targets = ["x86_64-unknown-linux-gnu"]

[dependencies]
jsonrpsee = { version = "0.22.5", features = ["client-core", "macros", "server-core"] }
jsonrpsee = { version = "0.23.1", features = ["client-core", "macros", "server-core"] }
assert_matches = "1.3.0"
async-trait = "0.1.79"
codec = { package = "parity-scale-codec", version = "3.6.12" }
Expand Down
2 changes: 1 addition & 1 deletion substrate/client/merkle-mountain-range/rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ targets = ["x86_64-unknown-linux-gnu"]

[dependencies]
codec = { package = "parity-scale-codec", version = "3.6.12" }
jsonrpsee = { version = "0.22.5", features = ["client-core", "macros", "server-core"] }
jsonrpsee = { version = "0.23.1", features = ["client-core", "macros", "server-core"] }
serde = { features = ["derive"], workspace = true, default-features = true }
sp-api = { path = "../../../primitives/api" }
sp-blockchain = { path = "../../../primitives/blockchain" }
Expand Down
Loading
Loading