Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Improve dispute-coordinator message burstiness handling (#5471)
Browse files Browse the repository at this point in the history
* Increase message channel size to 2048

Signed-off-by: Andrei Sandu <[email protected]>

* Use unbounded channel for reading data

Signed-off-by: Andrei Sandu <[email protected]>
  • Loading branch information
sandreim authored May 9, 2022
1 parent 7166068 commit 80532b5
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 11 deletions.
10 changes: 6 additions & 4 deletions node/core/provisioner/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -599,7 +599,8 @@ async fn request_disputes(
RequestType::Recent => DisputeCoordinatorMessage::RecentDisputes(tx),
RequestType::Active => DisputeCoordinatorMessage::ActiveDisputes(tx),
};
sender.send_message(msg.into()).await;
// Bounded by block production - `ProvisionerMessage::RequestInherentData`.
sender.send_unbounded_message(msg.into());

let recent_disputes = match rx.await {
Ok(r) => r,
Expand All @@ -617,9 +618,10 @@ async fn request_votes(
disputes_to_query: Vec<(SessionIndex, CandidateHash)>,
) -> Vec<(SessionIndex, CandidateHash, CandidateVotes)> {
let (tx, rx) = oneshot::channel();
sender
.send_message(DisputeCoordinatorMessage::QueryCandidateVotes(disputes_to_query, tx).into())
.await;
// Bounded by block production - `ProvisionerMessage::RequestInherentData`.
sender.send_unbounded_message(
DisputeCoordinatorMessage::QueryCandidateVotes(disputes_to_query, tx).into(),
);

match rx.await {
Ok(v) => v,
Expand Down
14 changes: 7 additions & 7 deletions node/network/dispute-distribution/src/sender/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,10 +340,10 @@ async fn get_active_disputes<Context: SubsystemContext>(
ctx: &mut Context,
) -> JfyiErrorResult<Vec<(SessionIndex, CandidateHash)>> {
let (tx, rx) = oneshot::channel();
ctx.send_message(AllMessages::DisputeCoordinator(DisputeCoordinatorMessage::ActiveDisputes(
tx,
)))
.await;
// Caller scope is in `update_leaves` and this is bounded by fork count.
ctx.send_unbounded_message(AllMessages::DisputeCoordinator(
DisputeCoordinatorMessage::ActiveDisputes(tx),
));
rx.await.map_err(|_| JfyiError::AskActiveDisputesCanceled)
}

Expand All @@ -354,10 +354,10 @@ async fn get_candidate_votes<Context: SubsystemContext>(
candidate_hash: CandidateHash,
) -> JfyiErrorResult<Option<CandidateVotes>> {
let (tx, rx) = oneshot::channel();
ctx.send_message(AllMessages::DisputeCoordinator(
// Caller scope is in `update_leaves` and this is bounded by fork count.
ctx.send_unbounded_message(AllMessages::DisputeCoordinator(
DisputeCoordinatorMessage::QueryCandidateVotes(vec![(session_index, candidate_hash)], tx),
))
.await;
));
rx.await
.map(|v| v.get(0).map(|inner| inner.to_owned().2))
.map_err(|_| JfyiError::AskCandidateVotesCanceled)
Expand Down
1 change: 1 addition & 0 deletions node/overseer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,7 @@ pub async fn forward_events<P: BlockchainEvents<Block>>(client: Arc<P>, mut hand
signal=OverseerSignal,
error=SubsystemError,
network=NetworkBridgeEvent<VersionedValidationProtocol>,
message_capacity=2048,
)]
pub struct Overseer<SupportsParachains> {
#[subsystem(no_dispatch, CandidateValidationMessage)]
Expand Down

0 comments on commit 80532b5

Please sign in to comment.