Skip to content

Commit

Permalink
Add an option to disable inbound rate limiter (#4327)
Browse files Browse the repository at this point in the history
## Issue Addressed

On deneb devnetv5, lighthouse keeps rate limiting peers which makes it harder to bootstrap new nodes as there are very few peers in the network. This PR adds an option to disable the inbound rate limiter for testnets.

Added an option to configure inbound rate limits as well.

Co-authored-by: Diva M <[email protected]>
  • Loading branch information
pawanjay176 and divagant-martian committed Jun 2, 2023
1 parent 7b1a0d4 commit 1bb0ddc
Show file tree
Hide file tree
Showing 9 changed files with 188 additions and 117 deletions.
6 changes: 5 additions & 1 deletion beacon_node/lighthouse_network/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::listen_addr::{ListenAddr, ListenAddress};
use crate::rpc::config::OutboundRateLimiterConfig;
use crate::rpc::config::{InboundRateLimiterConfig, OutboundRateLimiterConfig};
use crate::types::GossipKind;
use crate::{Enr, PeerIdSerialized};
use directory::{
Expand Down Expand Up @@ -148,6 +148,9 @@ pub struct Config {

/// Configures if/where invalid blocks should be stored.
pub invalid_block_storage: Option<PathBuf>,

/// Configuration for the inbound rate limiter (requests received by this node).
pub inbound_rate_limiter_config: Option<InboundRateLimiterConfig>,
}

impl Config {
Expand Down Expand Up @@ -333,6 +336,7 @@ impl Default for Config {
enable_light_client_server: false,
outbound_rate_limiter_config: None,
invalid_block_storage: None,
inbound_rate_limiter_config: None,
}
}
}
Expand Down
51 changes: 41 additions & 10 deletions beacon_node/lighthouse_network/src/rpc/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,41 +58,66 @@ impl FromStr for ProtocolQuota {
}
}

/// Configurations for the rate limiter applied to outbound requests (made by the node itself).
#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Debug, Default)]
pub struct OutboundRateLimiterConfig(pub RateLimiterConfig);

#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Debug, Default)]
pub struct InboundRateLimiterConfig(pub RateLimiterConfig);

impl FromStr for OutboundRateLimiterConfig {
type Err = &'static str;

fn from_str(s: &str) -> Result<Self, Self::Err> {
RateLimiterConfig::from_str(s).map(Self)
}
}

impl FromStr for InboundRateLimiterConfig {
type Err = &'static str;

fn from_str(s: &str) -> Result<Self, Self::Err> {
RateLimiterConfig::from_str(s).map(Self)
}
}

/// Configurations for the rate limiter.
#[derive(Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct OutboundRateLimiterConfig {
pub struct RateLimiterConfig {
pub(super) ping_quota: Quota,
pub(super) meta_data_quota: Quota,
pub(super) status_quota: Quota,
pub(super) goodbye_quota: Quota,
pub(super) blocks_by_range_quota: Quota,
pub(super) blocks_by_root_quota: Quota,
pub(super) light_client_bootstrap_quota: Quota,
}

impl OutboundRateLimiterConfig {
impl RateLimiterConfig {
pub const DEFAULT_PING_QUOTA: Quota = Quota::n_every(2, 10);
pub const DEFAULT_META_DATA_QUOTA: Quota = Quota::n_every(2, 5);
pub const DEFAULT_STATUS_QUOTA: Quota = Quota::n_every(5, 15);
pub const DEFAULT_GOODBYE_QUOTA: Quota = Quota::one_every(10);
pub const DEFAULT_BLOCKS_BY_RANGE_QUOTA: Quota =
Quota::n_every(methods::MAX_REQUEST_BLOCKS, 10);
pub const DEFAULT_BLOCKS_BY_ROOT_QUOTA: Quota = Quota::n_every(128, 10);
pub const DEFAULT_LIGHT_CLIENT_BOOTSTRAP_QUOTA: Quota = Quota::one_every(10);
}

impl Default for OutboundRateLimiterConfig {
impl Default for RateLimiterConfig {
fn default() -> Self {
OutboundRateLimiterConfig {
RateLimiterConfig {
ping_quota: Self::DEFAULT_PING_QUOTA,
meta_data_quota: Self::DEFAULT_META_DATA_QUOTA,
status_quota: Self::DEFAULT_STATUS_QUOTA,
goodbye_quota: Self::DEFAULT_GOODBYE_QUOTA,
blocks_by_range_quota: Self::DEFAULT_BLOCKS_BY_RANGE_QUOTA,
blocks_by_root_quota: Self::DEFAULT_BLOCKS_BY_ROOT_QUOTA,
light_client_bootstrap_quota: Self::DEFAULT_LIGHT_CLIENT_BOOTSTRAP_QUOTA,
}
}
}

impl Debug for OutboundRateLimiterConfig {
impl Debug for RateLimiterConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
macro_rules! fmt_q {
($quota:expr) => {
Expand All @@ -104,7 +129,7 @@ impl Debug for OutboundRateLimiterConfig {
};
}

f.debug_struct("OutboundRateLimiterConfig")
f.debug_struct("RateLimiterConfig")
.field("ping", fmt_q!(&self.ping_quota))
.field("metadata", fmt_q!(&self.meta_data_quota))
.field("status", fmt_q!(&self.status_quota))
Expand All @@ -119,7 +144,7 @@ impl Debug for OutboundRateLimiterConfig {
/// the default values. Protocol specified more than once use only the first given Quota.
///
/// The expected format is a ';' separated list of [`ProtocolQuota`].
impl FromStr for OutboundRateLimiterConfig {
impl FromStr for RateLimiterConfig {
type Err = &'static str;

fn from_str(s: &str) -> Result<Self, Self::Err> {
Expand All @@ -129,6 +154,8 @@ impl FromStr for OutboundRateLimiterConfig {
let mut goodbye_quota = None;
let mut blocks_by_range_quota = None;
let mut blocks_by_root_quota = None;
let mut light_client_bootstrap_quota = None;

for proto_def in s.split(';') {
let ProtocolQuota { protocol, quota } = proto_def.parse()?;
let quota = Some(quota);
Expand All @@ -139,10 +166,12 @@ impl FromStr for OutboundRateLimiterConfig {
Protocol::BlocksByRoot => blocks_by_root_quota = blocks_by_root_quota.or(quota),
Protocol::Ping => ping_quota = ping_quota.or(quota),
Protocol::MetaData => meta_data_quota = meta_data_quota.or(quota),
Protocol::LightClientBootstrap => return Err("Lighthouse does not send LightClientBootstrap requests. Quota should not be set."),
Protocol::LightClientBootstrap => {
light_client_bootstrap_quota = light_client_bootstrap_quota.or(quota)
}
}
}
Ok(OutboundRateLimiterConfig {
Ok(RateLimiterConfig {
ping_quota: ping_quota.unwrap_or(Self::DEFAULT_PING_QUOTA),
meta_data_quota: meta_data_quota.unwrap_or(Self::DEFAULT_META_DATA_QUOTA),
status_quota: status_quota.unwrap_or(Self::DEFAULT_STATUS_QUOTA),
Expand All @@ -151,6 +180,8 @@ impl FromStr for OutboundRateLimiterConfig {
.unwrap_or(Self::DEFAULT_BLOCKS_BY_RANGE_QUOTA),
blocks_by_root_quota: blocks_by_root_quota
.unwrap_or(Self::DEFAULT_BLOCKS_BY_ROOT_QUOTA),
light_client_bootstrap_quota: light_client_bootstrap_quota
.unwrap_or(Self::DEFAULT_LIGHT_CLIENT_BOOTSTRAP_QUOTA),
})
}
}
Expand Down
121 changes: 62 additions & 59 deletions beacon_node/lighthouse_network/src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use slog::{crit, debug, o};
use std::marker::PhantomData;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;
use types::{EthSpec, ForkContext};

pub(crate) use handler::HandlerErr;
Expand All @@ -32,7 +31,7 @@ pub use methods::{
pub(crate) use outbound::OutboundRequest;
pub use protocol::{max_rpc_size, Protocol, RPCError};

use self::config::OutboundRateLimiterConfig;
use self::config::{InboundRateLimiterConfig, OutboundRateLimiterConfig};
use self::self_limiter::SelfRateLimiter;

pub(crate) mod codec;
Expand Down Expand Up @@ -112,7 +111,7 @@ type BehaviourAction<Id, TSpec> =
/// logic.
pub struct RPC<Id: ReqId, TSpec: EthSpec> {
/// Rate limiter
limiter: RateLimiter,
limiter: Option<RateLimiter>,
/// Rate limiter for our own requests.
self_limiter: Option<SelfRateLimiter<Id, TSpec>>,
/// Queue of events to be processed.
Expand All @@ -127,32 +126,24 @@ impl<Id: ReqId, TSpec: EthSpec> RPC<Id, TSpec> {
pub fn new(
fork_context: Arc<ForkContext>,
enable_light_client_server: bool,
inbound_rate_limiter_config: Option<InboundRateLimiterConfig>,
outbound_rate_limiter_config: Option<OutboundRateLimiterConfig>,
log: slog::Logger,
) -> Self {
let log = log.new(o!("service" => "libp2p_rpc"));

let limiter = RateLimiter::builder()
.n_every(Protocol::MetaData, 2, Duration::from_secs(5))
.n_every(Protocol::Ping, 2, Duration::from_secs(10))
.n_every(Protocol::Status, 5, Duration::from_secs(15))
.one_every(Protocol::Goodbye, Duration::from_secs(10))
.one_every(Protocol::LightClientBootstrap, Duration::from_secs(10))
.n_every(
Protocol::BlocksByRange,
methods::MAX_REQUEST_BLOCKS,
Duration::from_secs(10),
)
.n_every(Protocol::BlocksByRoot, 128, Duration::from_secs(10))
.build()
.expect("Configuration parameters are valid");
let inbound_limiter = inbound_rate_limiter_config.map(|config| {
debug!(log, "Using inbound rate limiting params"; "config" => ?config);
RateLimiter::new_with_config(config.0)
.expect("Inbound limiter configuration parameters are valid")
});

let self_limiter = outbound_rate_limiter_config.map(|config| {
SelfRateLimiter::new(config, log.clone()).expect("Configuration parameters are valid")
});

RPC {
limiter,
limiter: inbound_limiter,
self_limiter,
events: Vec::new(),
fork_context,
Expand Down Expand Up @@ -242,50 +233,60 @@ where
event: <Self::ConnectionHandler as ConnectionHandler>::OutEvent,
) {
if let Ok(RPCReceived::Request(ref id, ref req)) = event {
// check if the request is conformant to the quota
match self.limiter.allows(&peer_id, req) {
Ok(()) => {
// send the event to the user
self.events
.push(NetworkBehaviourAction::GenerateEvent(RPCMessage {
if let Some(limiter) = self.limiter.as_mut() {
// check if the request is conformant to the quota
match limiter.allows(&peer_id, req) {
Ok(()) => {
// send the event to the user
self.events
.push(NetworkBehaviourAction::GenerateEvent(RPCMessage {
peer_id,
conn_id,
event,
}))
}
Err(RateLimitedErr::TooLarge) => {
// we set the batch sizes, so this is a coding/config err for most protocols
let protocol = req.protocol();
if matches!(protocol, Protocol::BlocksByRange) {
debug!(self.log, "Blocks by range request will never be processed"; "request" => %req);
} else {
crit!(self.log, "Request size too large to ever be processed"; "protocol" => %protocol);
}
// send an error code to the peer.
// the handler upon receiving the error code will send it back to the behaviour
self.send_response(
peer_id,
conn_id,
event,
}))
}
Err(RateLimitedErr::TooLarge) => {
// we set the batch sizes, so this is a coding/config err for most protocols
let protocol = req.protocol();
if matches!(protocol, Protocol::BlocksByRange) {
debug!(self.log, "Blocks by range request will never be processed"; "request" => %req);
} else {
crit!(self.log, "Request size too large to ever be processed"; "protocol" => %protocol);
(conn_id, *id),
RPCCodedResponse::Error(
RPCResponseErrorCode::RateLimited,
"Rate limited. Request too large".into(),
),
);
}
// send an error code to the peer.
// the handler upon receiving the error code will send it back to the behaviour
self.send_response(
peer_id,
(conn_id, *id),
RPCCodedResponse::Error(
RPCResponseErrorCode::RateLimited,
"Rate limited. Request too large".into(),
),
);
}
Err(RateLimitedErr::TooSoon(wait_time)) => {
debug!(self.log, "Request exceeds the rate limit";
Err(RateLimitedErr::TooSoon(wait_time)) => {
debug!(self.log, "Request exceeds the rate limit";
"request" => %req, "peer_id" => %peer_id, "wait_time_ms" => wait_time.as_millis());
// send an error code to the peer.
// the handler upon receiving the error code will send it back to the behaviour
self.send_response(
peer_id,
(conn_id, *id),
RPCCodedResponse::Error(
RPCResponseErrorCode::RateLimited,
format!("Wait {:?}", wait_time).into(),
),
);
// send an error code to the peer.
// the handler upon receiving the error code will send it back to the behaviour
self.send_response(
peer_id,
(conn_id, *id),
RPCCodedResponse::Error(
RPCResponseErrorCode::RateLimited,
format!("Wait {:?}", wait_time).into(),
),
);
}
}
} else {
// No rate limiting, send the event to the user
self.events
.push(NetworkBehaviourAction::GenerateEvent(RPCMessage {
peer_id,
conn_id,
event,
}))
}
} else {
self.events
Expand All @@ -303,7 +304,9 @@ where
_: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ConnectionHandler>> {
// let the rate limiter prune.
let _ = self.limiter.poll_unpin(cx);
if let Some(limiter) = self.limiter.as_mut() {
let _ = limiter.poll_unpin(cx);
}

if let Some(self_limiter) = self.self_limiter.as_mut() {
if let Poll::Ready(event) = self_limiter.poll_ready(cx) {
Expand Down
47 changes: 24 additions & 23 deletions beacon_node/lighthouse_network/src/rpc/rate_limiter.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use super::config::RateLimiterConfig;
use crate::rpc::Protocol;
use fnv::FnvHashMap;
use libp2p::PeerId;
Expand Down Expand Up @@ -141,29 +142,6 @@ impl RPCRateLimiterBuilder {
self
}

/// Allow one token every `time_period` to be used for this `protocol`.
/// This produces a hard limit.
pub fn one_every(self, protocol: Protocol, time_period: Duration) -> Self {
self.set_quota(
protocol,
Quota {
replenish_all_every: time_period,
max_tokens: 1,
},
)
}

/// Allow `n` tokens to be use used every `time_period` for this `protocol`.
pub fn n_every(self, protocol: Protocol, n: u64, time_period: Duration) -> Self {
self.set_quota(
protocol,
Quota {
max_tokens: n,
replenish_all_every: time_period,
},
)
}

pub fn build(self) -> Result<RPCRateLimiter, &'static str> {
// get our quotas
let ping_quota = self.ping_quota.ok_or("Ping quota not specified")?;
Expand Down Expand Up @@ -232,6 +210,29 @@ impl<T: EthSpec> RateLimiterItem for super::OutboundRequest<T> {
}
}
impl RPCRateLimiter {
pub fn new_with_config(config: RateLimiterConfig) -> Result<Self, &'static str> {
// Destructure to make sure every configuration value is used.
let RateLimiterConfig {
ping_quota,
meta_data_quota,
status_quota,
goodbye_quota,
blocks_by_range_quota,
blocks_by_root_quota,
light_client_bootstrap_quota,
} = config;

Self::builder()
.set_quota(Protocol::Ping, ping_quota)
.set_quota(Protocol::MetaData, meta_data_quota)
.set_quota(Protocol::Status, status_quota)
.set_quota(Protocol::Goodbye, goodbye_quota)
.set_quota(Protocol::BlocksByRange, blocks_by_range_quota)
.set_quota(Protocol::BlocksByRoot, blocks_by_root_quota)
.set_quota(Protocol::LightClientBootstrap, light_client_bootstrap_quota)
.build()
}

/// Get a builder instance.
pub fn builder() -> RPCRateLimiterBuilder {
RPCRateLimiterBuilder::default()
Expand Down
Loading

0 comments on commit 1bb0ddc

Please sign in to comment.