Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Rate limit improvements (#6315)
Browse files Browse the repository at this point in the history
* We actually don't need to rate limit redundant requests.

Those redundant requests should not actually happen, but still.

* Add some logging.

* Also log message when the receiving side hit the rate limit.

* Update node/network/dispute-distribution/src/sender/mod.rs

Co-authored-by: Alexandru Vasile <[email protected]>

Co-authored-by: eskimor <[email protected]>
Co-authored-by: Alexandru Vasile <[email protected]>
  • Loading branch information
3 people authored Nov 23, 2022
1 parent cfb10d4 commit b5e498c
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 6 deletions.
6 changes: 6 additions & 0 deletions node/network/dispute-distribution/src/receiver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,12 @@ where

// Queue request:
if let Err((authority_id, req)) = self.peer_queues.push_req(authority_id, req) {
gum::debug!(
target: LOG_TARGET,
?authority_id,
?peer,
"Peer hit the rate limit - dropping message."
);
req.send_outgoing_response(OutgoingResponse {
result: Err(()),
reputation_changes: vec![COST_APPARENT_FLOOD],
Expand Down
18 changes: 12 additions & 6 deletions node/network/dispute-distribution/src/sender/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,6 @@ impl DisputeSender {
runtime: &mut RuntimeInfo,
msg: DisputeMessage,
) -> Result<()> {
self.rate_limit.limit().await;

let req: DisputeRequest = msg.into();
let candidate_hash = req.0.candidate_receipt.hash();
match self.disputes.entry(candidate_hash) {
Expand All @@ -118,6 +116,8 @@ impl DisputeSender {
return Ok(())
},
Entry::Vacant(vacant) => {
self.rate_limit.limit("in start_sender", candidate_hash).await;

let send_task = SendTask::new(
ctx,
runtime,
Expand Down Expand Up @@ -169,10 +169,12 @@ impl DisputeSender {

// Iterates in order of insertion:
let mut should_rate_limit = true;
for dispute in self.disputes.values_mut() {
for (candidate_hash, dispute) in self.disputes.iter_mut() {
if have_new_sessions || dispute.has_failed_sends() {
if should_rate_limit {
self.rate_limit.limit().await;
self.rate_limit
.limit("while going through new sessions/failed sends", *candidate_hash)
.await;
}
let sends_happened = dispute
.refresh_sends(ctx, runtime, &self.active_sessions, &self.metrics)
Expand All @@ -193,7 +195,7 @@ impl DisputeSender {
// recovered at startup will be relatively "old" anyway and we assume that no more than a
// third of the validators will go offline at any point in time anyway.
for dispute in unknown_disputes {
self.rate_limit.limit().await;
self.rate_limit.limit("while going through unknown disputes", dispute.1).await;
self.start_send_for_dispute(ctx, runtime, dispute).await?;
}
Ok(())
Expand Down Expand Up @@ -383,14 +385,18 @@ impl RateLimit {
}

/// Wait until ready and prepare for next call.
async fn limit(&mut self) {
///
/// String given as occasion and candidate hash are logged in case the rate limit hit.
async fn limit(&mut self, occasion: &'static str, candidate_hash: CandidateHash) {
// Wait for rate limit and add some logging:
poll_fn(|cx| {
let old_limit = Pin::new(&mut self.limit);
match old_limit.poll(cx) {
Poll::Pending => {
gum::debug!(
target: LOG_TARGET,
?occasion,
?candidate_hash,
"Sending rate limit hit, slowing down requests"
);
Poll::Pending
Expand Down

0 comments on commit b5e498c

Please sign in to comment.