Skip to content

Commit

Permalink
Don't kill SSE stream if channel fills up (#4500)
Browse files Browse the repository at this point in the history
## Issue Addressed

Closes #4245

## Proposed Changes

- If an SSE channel fills up, send a comment instead of terminating the stream.
- Add a CLI flag for scaling up the SSE buffer: `--http-sse-capacity-multiplier N`.

## Additional Info

~~Blocked on #4462. I haven't rebased on that PR yet for initial testing, because it still needs some more work to handle long-running HTTP threads.~~

- [x] Add CLI flag tests.
  • Loading branch information
michaelsproul committed Aug 17, 2023
1 parent 59c24bc commit ca1dc4f
Show file tree
Hide file tree
Showing 7 changed files with 67 additions and 21 deletions.
7 changes: 5 additions & 2 deletions beacon_node/beacon_chain/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@ pub struct ServerSentEventHandler<T: EthSpec> {
}

impl<T: EthSpec> ServerSentEventHandler<T> {
pub fn new(log: Logger) -> Self {
Self::new_with_capacity(log, DEFAULT_CHANNEL_CAPACITY)
pub fn new(log: Logger, capacity_multiplier: usize) -> Self {
Self::new_with_capacity(
log,
capacity_multiplier.saturating_mul(DEFAULT_CHANNEL_CAPACITY),
)
}

pub fn new_with_capacity(log: Logger, capacity: usize) -> Self {
Expand Down
5 changes: 4 additions & 1 deletion beacon_node/client/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,10 @@ where
let context = runtime_context.service_context("beacon".into());
let spec = chain_spec.ok_or("beacon_chain_start_method requires a chain spec")?;
let event_handler = if self.http_api_config.enabled {
Some(ServerSentEventHandler::new(context.log().clone()))
Some(ServerSentEventHandler::new(
context.log().clone(),
self.http_api_config.sse_capacity_multiplier,
))
} else {
None
};
Expand Down
48 changes: 30 additions & 18 deletions beacon_node/http_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,10 @@ use tokio::sync::{
mpsc::{Sender, UnboundedSender},
oneshot,
};
use tokio_stream::{wrappers::BroadcastStream, StreamExt};
use tokio_stream::{
wrappers::{errors::BroadcastStreamRecvError, BroadcastStream},
StreamExt,
};
use types::{
Attestation, AttestationData, AttestationShufflingId, AttesterSlashing, BeaconStateError,
BlindedPayload, CommitteeCache, ConfigAndPreset, Epoch, EthSpec, ForkName, FullPayload,
Expand Down Expand Up @@ -132,6 +135,7 @@ pub struct Config {
pub allow_sync_stalled: bool,
pub spec_fork_name: Option<ForkName>,
pub data_dir: PathBuf,
pub sse_capacity_multiplier: usize,
pub enable_beacon_processor: bool,
}

Expand All @@ -146,6 +150,7 @@ impl Default for Config {
allow_sync_stalled: false,
spec_fork_name: None,
data_dir: PathBuf::from(DEFAULT_ROOT_DIR),
sse_capacity_multiplier: 1,
enable_beacon_processor: true,
}
}
Expand Down Expand Up @@ -4348,22 +4353,29 @@ pub fn serve<T: BeaconChainTypes>(
}
};

receivers.push(BroadcastStream::new(receiver).map(|msg| {
match msg {
Ok(data) => Event::default()
.event(data.topic_name())
.json_data(data)
.map_err(|e| {
warp_utils::reject::server_sent_event_error(format!(
"{:?}",
e
))
}),
Err(e) => Err(warp_utils::reject::server_sent_event_error(
format!("{:?}", e),
)),
}
}));
receivers.push(
BroadcastStream::new(receiver)
.map(|msg| {
match msg {
Ok(data) => Event::default()
.event(data.topic_name())
.json_data(data)
.unwrap_or_else(|e| {
Event::default()
.comment(format!("error - bad json: {e:?}"))
}),
// Do not terminate the stream if the channel fills
// up. Just drop some messages and send a comment to
// the client.
Err(BroadcastStreamRecvError::Lagged(n)) => {
Event::default().comment(format!(
"error - dropped {n} messages"
))
}
}
})
.map(Ok::<_, std::convert::Infallible>),
);
}
} else {
return Err(warp_utils::reject::custom_server_error(
Expand All @@ -4373,7 +4385,7 @@ pub fn serve<T: BeaconChainTypes>(

let s = futures::stream::select_all(receivers);

Ok::<_, warp::Rejection>(warp::sse::reply(warp::sse::keep_alive().stream(s)))
Ok(warp::sse::reply(warp::sse::keep_alive().stream(s)))
})
},
);
Expand Down
1 change: 1 addition & 0 deletions beacon_node/http_api/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ pub async fn create_api_server_on_port<T: BeaconChainTypes>(
allow_sync_stalled: false,
data_dir: std::path::PathBuf::from(DEFAULT_ROOT_DIR),
spec_fork_name: None,
sse_capacity_multiplier: 1,
enable_beacon_processor: true,
},
chain: Some(chain),
Expand Down
9 changes: 9 additions & 0 deletions beacon_node/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,15 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
stalled. This is useful for very small testnets. TESTING ONLY. DO NOT USE ON \
MAINNET.")
)
.arg(
Arg::with_name("http-sse-capacity-multiplier")
.long("http-sse-capacity-multiplier")
.takes_value(true)
.default_value("1")
.value_name("N")
.help("Multiplier to apply to the length of HTTP server-sent-event (SSE) channels. \
Increasing this value can prevent messages from being dropped.")
)
.arg(
Arg::with_name("http-enable-beacon-processor")
.long("http-enable-beacon-processor")
Expand Down
3 changes: 3 additions & 0 deletions beacon_node/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ pub fn get_config<E: EthSpec>(
client_config.http_api.allow_sync_stalled = true;
}

client_config.http_api.sse_capacity_multiplier =
parse_required(cli_args, "http-sse-capacity-multiplier")?;

client_config.http_api.enable_beacon_processor =
parse_required(cli_args, "http-enable-beacon-processor")?;

Expand Down
15 changes: 15 additions & 0 deletions lighthouse/tests/beacon_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2349,3 +2349,18 @@ fn beacon_processor_zero_workers() {
.flag("beacon-processor-max-workers", Some("0"))
.run_with_zero_port();
}

#[test]
fn http_sse_capacity_multiplier_default() {
CommandLineTest::new()
.run_with_zero_port()
.with_config(|config| assert_eq!(config.http_api.sse_capacity_multiplier, 1));
}

#[test]
fn http_sse_capacity_multiplier_override() {
CommandLineTest::new()
.flag("http-sse-capacity-multiplier", Some("10"))
.run_with_zero_port()
.with_config(|config| assert_eq!(config.http_api.sse_capacity_multiplier, 10));
}

0 comments on commit ca1dc4f

Please sign in to comment.