From 3b21f8d90f79e332ce15cfb8a31d71aafb1d8452 Mon Sep 17 00:00:00 2001 From: Leonardo Yvens Date: Fri, 26 May 2023 14:01:22 +0100 Subject: [PATCH] fix(offchain): Avoid starvation in the offchain monitor This addresses both sides of the issue, by making sure the task holding the `CallAll` doesn't hang, and by removing the concurrency control done by `Buffer`, which may be the reason why PR #4570 didn't fully work. --- core/src/polling_monitor/ipfs_service.rs | 5 +++-- core/src/polling_monitor/mod.rs | 13 ++++++++----- core/src/subgraph/context.rs | 6 ++++-- 3 files changed, 15 insertions(+), 9 deletions(-) diff --git a/core/src/polling_monitor/ipfs_service.rs b/core/src/polling_monitor/ipfs_service.rs index f9436b851ca..d4fdeb7964a 100644 --- a/core/src/polling_monitor/ipfs_service.rs +++ b/core/src/polling_monitor/ipfs_service.rs @@ -30,8 +30,9 @@ pub fn ipfs_service( .service_fn(move |req| ipfs.cheap_clone().call_inner(req)) .boxed(); - // The `Buffer` makes it so the rate and concurrency limit are shared among clones. - Buffer::new(svc, 1) + // The `Buffer` makes it so the rate limit is shared among clones. + // Make it unbounded to avoid any risk of starvation. + Buffer::new(svc, u32::MAX as usize) } #[derive(Clone)] diff --git a/core/src/polling_monitor/mod.rs b/core/src/polling_monitor/mod.rs index 19f30f28cda..7ead8e57ebc 100644 --- a/core/src/polling_monitor/mod.rs +++ b/core/src/polling_monitor/mod.rs @@ -98,7 +98,7 @@ impl Queue { /// `Option`, to represent the object not being found. pub fn spawn_monitor( service: S, - response_sender: mpsc::Sender<(ID, Res)>, + response_sender: mpsc::UnboundedSender<(ID, Res)>, logger: Logger, metrics: PollingMonitorMetrics, ) -> PollingMonitor @@ -149,10 +149,13 @@ where let mut backoffs = Backoffs::new(); let mut responses = service.call_all(queue_to_stream).unordered().boxed(); while let Some(response) = responses.next().await { + // Note: Be careful not to `await` within this loop, as that could block requests in + // the `CallAll` from being polled. This can cause starvation as those requests may + // be holding on to resources such as slots for concurrent calls. match response { Ok((id, Some(response))) => { backoffs.remove(&id); - let send_result = response_sender.send((id, response)).await; + let send_result = response_sender.send((id, response)); if send_result.is_err() { // The receiver has been dropped, cancel this task. break; @@ -250,10 +253,10 @@ mod tests { fn setup() -> ( mock::Handle<&'static str, Option<&'static str>>, PollingMonitor<&'static str>, - mpsc::Receiver<(&'static str, &'static str)>, + mpsc::UnboundedReceiver<(&'static str, &'static str)>, ) { let (svc, handle) = mock::pair(); - let (tx, rx) = mpsc::channel(10); + let (tx, rx) = mpsc::unbounded_channel(); let monitor = spawn_monitor(svc, tx, log::discard(), PollingMonitorMetrics::mock()); (handle, monitor, rx) } @@ -263,7 +266,7 @@ mod tests { let (svc, mut handle) = mock::pair(); let shared_svc = tower::buffer::Buffer::new(tower::limit::ConcurrencyLimit::new(svc, 1), 1); let make_monitor = |svc| { - let (tx, rx) = mpsc::channel(10); + let (tx, rx) = mpsc::unbounded_channel(); let metrics = PollingMonitorMetrics::mock(); let monitor = spawn_monitor(svc, tx, log::discard(), metrics); (monitor, rx) diff --git a/core/src/subgraph/context.rs b/core/src/subgraph/context.rs index 547a83f7611..c87f03d2faf 100644 --- a/core/src/subgraph/context.rs +++ b/core/src/subgraph/context.rs @@ -185,7 +185,7 @@ impl> IndexingContext { pub struct OffchainMonitor { ipfs_monitor: PollingMonitor, - ipfs_monitor_rx: mpsc::Receiver<(CidFile, Bytes)>, + ipfs_monitor_rx: mpsc::UnboundedReceiver<(CidFile, Bytes)>, } impl OffchainMonitor { @@ -195,7 +195,9 @@ impl OffchainMonitor { subgraph_hash: &DeploymentHash, ipfs_service: IpfsService, ) -> Self { - let (ipfs_monitor_tx, ipfs_monitor_rx) = mpsc::channel(10); + // The channel is unbounded, as it is expected that `fn ready_offchain_events` is called + // frequently, or at least with the same frequency that requests are sent. + let (ipfs_monitor_tx, ipfs_monitor_rx) = mpsc::unbounded_channel(); let ipfs_monitor = spawn_monitor( ipfs_service, ipfs_monitor_tx,