From 9d41017c319f4468cb76623d6bcc373861b3130a Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Thu, 8 Nov 2018 07:06:32 -0800 Subject: [PATCH] Timeout batch downloads, not each download This commit switches the timeout logic implemented in #6130 to timeout an entire batch of downloads instead of each download individually. Previously if *any* pending download didn't receive data in 30s we would time out, or if *any* pending download didn't receive 10 bytes in 30s we would time out. On very slow network connections this is highly likely to happen as a trickle of incoming bytes may not be spread equally amongst all connections, and not all connections may actually be active at any one point in time. The fix is to instead apply timeout logic for an entire batch of downloads. Only if zero total data isn't received in the timeout window do we time out. Or in other words, if any data for any download is receive we consider it as not being timed out. Similarly any progress on any download counts as progress towards our speed limit. Closes #6284 --- src/cargo/core/package.rs | 81 +++++++++++++++++++-------------------- 1 file changed, 40 insertions(+), 41 deletions(-) diff --git a/src/cargo/core/package.rs b/src/cargo/core/package.rs index 171ac27e33e..89908ad5e53 100644 --- a/src/cargo/core/package.rs +++ b/src/cargo/core/package.rs @@ -267,6 +267,17 @@ pub struct Downloads<'a, 'cfg: 'a> { largest: (u64, String), start: Instant, success: bool, + + /// Timeout management, both of timeout thresholds as well as whether or not + /// our connection has timed out (and accompanying message if it has). + /// + /// Note that timeout management is done manually here instead of in libcurl + /// because we want to apply timeouts to an entire batch of operations, not + /// any one particular single operatino + timeout: ops::HttpTimeout, // timeout configuration + updated_at: Cell, // last time we received bytes + next_speed_check: Cell, // if threshold isn't 0 by this time, error + next_speed_check_bytes_threshold: Cell, // decremented when we receive bytes } struct Download<'cfg> { @@ -293,24 +304,7 @@ struct Download<'cfg> { /// The moment we started this transfer at start: Instant, - - /// Last time we noticed that we got some more data from libcurl - updated_at: Cell, - - /// Timeout management, both of timeout thresholds as well as whether or not - /// our connection has timed out (and accompanying message if it has). - /// - /// Note that timeout management is done manually here because we have a - /// `Multi` with a lot of active transfers but between transfers finishing - /// we perform some possibly slow synchronous work (like grabbing file - /// locks, extracting tarballs, etc). The default timers on our `Multi` keep - /// running during this work, but we don't want them to count towards timing - /// everythig out. As a result, we manage this manually and take the time - /// for synchronous work into account manually. - timeout: ops::HttpTimeout, timed_out: Cell>, - next_speed_check: Cell, - next_speed_check_bytes_threshold: Cell, /// Logic used to track retrying this download if it's a spurious failure. retry: Retry<'cfg>, @@ -359,6 +353,7 @@ impl<'cfg> PackageSet<'cfg> { pub fn enable_download<'a>(&'a self) -> CargoResult> { assert!(!self.downloading.replace(true)); + let timeout = ops::HttpTimeout::new(self.config)?; Ok(Downloads { start: Instant::now(), set: self, @@ -375,6 +370,10 @@ impl<'cfg> PackageSet<'cfg> { downloaded_bytes: 0, largest: (0, String::new()), success: false, + updated_at: Cell::new(Instant::now()), + timeout, + next_speed_check: Cell::new(Instant::now()), + next_speed_check_bytes_threshold: Cell::new(0), }) } @@ -446,7 +445,7 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> { debug!("downloading {} as {}", id, token); assert!(self.pending_ids.insert(id.clone())); - let (mut handle, timeout) = ops::http_handle_and_timeout(self.set.config)?; + let (mut handle, _timeout) = ops::http_handle_and_timeout(self.set.config)?; handle.get(true)?; handle.url(&url)?; handle.follow_location(true)?; // follow redirects @@ -501,7 +500,6 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> { self.set.config.shell().status("Downloading", "crates ...")?; } - let now = Instant::now(); let dl = Download { token, data: RefCell::new(Vec::new()), @@ -511,11 +509,7 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> { total: Cell::new(0), current: Cell::new(0), start: Instant::now(), - updated_at: Cell::new(now), - timeout, timed_out: Cell::new(None), - next_speed_check: Cell::new(now), - next_speed_check_bytes_threshold: Cell::new(0), retry: Retry::new(self.set.config)?, }; self.enqueue(dl, handle)?; @@ -638,10 +632,8 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> { // active downloads to make sure they don't fire because of a slowly // extracted tarball. let finish_dur = start.elapsed(); - for (dl, _) in self.pending.values_mut() { - dl.updated_at.set(dl.updated_at.get() + finish_dur); - dl.next_speed_check.set(dl.next_speed_check.get() + finish_dur); - } + self.updated_at.set(self.updated_at.get() + finish_dur); + self.next_speed_check.set(self.next_speed_check.get() + finish_dur); let slot = &self.set.packages[&dl.id]; assert!(slot.fill(pkg).is_ok()); @@ -652,12 +644,12 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> { let mut handle = self.set.multi.add(handle)?; let now = Instant::now(); handle.set_token(dl.token)?; + self.updated_at.set(now); + self.next_speed_check.set(now + self.timeout.dur); + self.next_speed_check_bytes_threshold.set(self.timeout.low_speed_limit as u64); dl.timed_out.set(None); - dl.updated_at.set(now); dl.current.set(0); dl.total.set(0); - dl.next_speed_check.set(now + dl.timeout.dur); - dl.next_speed_check_bytes_threshold.set(dl.timeout.low_speed_limit as u64); self.pending.insert(dl.token, (dl, handle)); Ok(()) } @@ -712,14 +704,19 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> { dl.total.set(total); let now = Instant::now(); if cur != dl.current.get() { + let delta = cur - dl.current.get(); + let threshold = self.next_speed_check_bytes_threshold.get(); + dl.current.set(cur); - dl.updated_at.set(now); + self.updated_at.set(now); - if dl.current.get() >= dl.next_speed_check_bytes_threshold.get() { - dl.next_speed_check.set(now + dl.timeout.dur); - dl.next_speed_check_bytes_threshold.set( - dl.current.get() + dl.timeout.low_speed_limit as u64, + if delta >= threshold { + self.next_speed_check.set(now + self.timeout.dur); + self.next_speed_check_bytes_threshold.set( + self.timeout.low_speed_limit as u64, ); + } else { + self.next_speed_check_bytes_threshold.set(threshold - delta); } } if !self.tick(WhyTick::DownloadUpdate).is_ok() { @@ -727,10 +724,11 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> { } // If we've spent too long not actually receiving any data we time out. - if now - dl.updated_at.get() > dl.timeout.dur { + if now - self.updated_at.get() > self.timeout.dur { + self.updated_at.set(now); let msg = format!("failed to download any data for `{}` within {}s", dl.id, - dl.timeout.dur.as_secs()); + self.timeout.dur.as_secs()); dl.timed_out.set(Some(msg)); return false } @@ -739,13 +737,14 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> { // limit, see if we've transferred enough data during this threshold. If // it fails this check then we fail because the download is going too // slowly. - if now >= dl.next_speed_check.get() { - assert!(dl.current.get() < dl.next_speed_check_bytes_threshold.get()); + if now >= self.next_speed_check.get() { + self.next_speed_check.set(now + self.timeout.dur); + assert!(self.next_speed_check_bytes_threshold.get() > 0); let msg = format!("download of `{}` failed to transfer more \ than {} bytes in {}s", dl.id, - dl.timeout.low_speed_limit, - dl.timeout.dur.as_secs()); + self.timeout.low_speed_limit, + self.timeout.dur.as_secs()); dl.timed_out.set(Some(msg)); return false }