Skip to content

Commit

Permalink
Auto merge of #10470 - arlosi:http, r=Eh2406
Browse files Browse the repository at this point in the history
HTTP registry implementation

Implement HTTP registry support described in [RFC 2789](rust-lang/rfcs#2789).

Adds a new unstable flag `-Z http-registry` which allows cargo to interact with remote registries served over http rather than git. These registries can be identified by urls starting with `sparse+http://` or `sparse+https://`.

When fetching index metadata over http, cargo only downloads the metadata for needed crates, which can save significant time and bandwidth over git.

The format of the http index is identical to a checkout of a git-based index.

This change is based on `@jonhoo's` PR #8890.

cc `@Eh2406`

Remaining items:
- [x] Performance measurements
- [x] Make unstable only
- [x] Investigate unification of download system. Probably best done in separate change.
- [x] Unify registry tests (code duplication in `http_registry.rs`)
- [x] Use existing on-disk cache, rather than adding a new one.
  • Loading branch information
bors committed Mar 24, 2022
2 parents c5509f8 + 412b633 commit 1366225
Show file tree
Hide file tree
Showing 18 changed files with 1,807 additions and 323 deletions.
163 changes: 162 additions & 1 deletion crates/cargo-test-support/src/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ use std::collections::BTreeMap;
use std::fmt::Write as _;
use std::fs::{self, File};
use std::io::{BufRead, BufReader, Write};
use std::net::TcpListener;
use std::net::{SocketAddr, TcpListener};
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;
use tar::{Builder, Header};
use url::Url;
Expand Down Expand Up @@ -368,6 +370,165 @@ pub fn alt_init() {
RegistryBuilder::new().alternative(true).build();
}

pub struct RegistryServer {
done: Arc<AtomicBool>,
server: Option<thread::JoinHandle<()>>,
addr: SocketAddr,
}

impl RegistryServer {
pub fn addr(&self) -> SocketAddr {
self.addr
}
}

impl Drop for RegistryServer {
fn drop(&mut self) {
self.done.store(true, Ordering::SeqCst);
// NOTE: we can't actually await the server since it's blocked in accept()
let _ = self.server.take();
}
}

#[must_use]
pub fn serve_registry(registry_path: PathBuf) -> RegistryServer {
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
let addr = listener.local_addr().unwrap();
let done = Arc::new(AtomicBool::new(false));
let done2 = done.clone();

let t = thread::spawn(move || {
let mut line = String::new();
'server: while !done2.load(Ordering::SeqCst) {
let (socket, _) = listener.accept().unwrap();
// Let's implement a very naive static file HTTP server.
let mut buf = BufReader::new(socket);

// First, the request line:
// GET /path HTTPVERSION
line.clear();
if buf.read_line(&mut line).unwrap() == 0 {
// Connection terminated.
continue;
}

assert!(line.starts_with("GET "), "got non-GET request: {}", line);
let path = PathBuf::from(
line.split_whitespace()
.skip(1)
.next()
.unwrap()
.trim_start_matches('/'),
);

let file = registry_path.join(path);
if file.exists() {
// Grab some other headers we may care about.
let mut if_modified_since = None;
let mut if_none_match = None;
loop {
line.clear();
if buf.read_line(&mut line).unwrap() == 0 {
continue 'server;
}

if line == "\r\n" {
// End of headers.
line.clear();
break;
}

let value = line
.splitn(2, ':')
.skip(1)
.next()
.map(|v| v.trim())
.unwrap();

if line.starts_with("If-Modified-Since:") {
if_modified_since = Some(value.to_owned());
} else if line.starts_with("If-None-Match:") {
if_none_match = Some(value.trim_matches('"').to_owned());
}
}

// Now grab info about the file.
let data = fs::read(&file).unwrap();
let etag = Sha256::new().update(&data).finish_hex();
let last_modified = format!("{:?}", file.metadata().unwrap().modified().unwrap());

// Start to construct our response:
let mut any_match = false;
let mut all_match = true;
if let Some(expected) = if_none_match {
if etag != expected {
all_match = false;
} else {
any_match = true;
}
}
if let Some(expected) = if_modified_since {
// NOTE: Equality comparison is good enough for tests.
if last_modified != expected {
all_match = false;
} else {
any_match = true;
}
}

// Write out the main response line.
if any_match && all_match {
buf.get_mut()
.write_all(b"HTTP/1.1 304 Not Modified\r\n")
.unwrap();
} else {
buf.get_mut().write_all(b"HTTP/1.1 200 OK\r\n").unwrap();
}
// TODO: Support 451 for crate index deletions.

// Write out other headers.
buf.get_mut()
.write_all(format!("Content-Length: {}\r\n", data.len()).as_bytes())
.unwrap();
buf.get_mut()
.write_all(format!("ETag: \"{}\"\r\n", etag).as_bytes())
.unwrap();
buf.get_mut()
.write_all(format!("Last-Modified: {}\r\n", last_modified).as_bytes())
.unwrap();

// And finally, write out the body.
buf.get_mut().write_all(b"\r\n").unwrap();
buf.get_mut().write_all(&data).unwrap();
} else {
loop {
line.clear();
if buf.read_line(&mut line).unwrap() == 0 {
// Connection terminated.
continue 'server;
}

if line == "\r\n" {
break;
}
}

buf.get_mut()
.write_all(b"HTTP/1.1 404 Not Found\r\n\r\n")
.unwrap();
buf.get_mut().write_all(b"\r\n").unwrap();
}
buf.get_mut().flush().unwrap();
}
});

RegistryServer {
addr,
server: Some(t),
done,
}
}

/// Creates a new on-disk registry.
pub fn init_registry(registry_path: PathBuf, dl_url: String, api_url: Url, api_path: PathBuf) {
// Initialize a new registry.
Expand Down
2 changes: 2 additions & 0 deletions src/cargo/core/features.rs
Original file line number Diff line number Diff line change
Expand Up @@ -650,6 +650,7 @@ unstable_cli_options!(
no_index_update: bool = ("Do not update the registry index even if the cache is outdated"),
panic_abort_tests: bool = ("Enable support to run tests with -Cpanic=abort"),
host_config: bool = ("Enable the [host] section in the .cargo/config.toml file"),
http_registry: bool = ("Support HTTP-based crate registries"),
target_applies_to_host: bool = ("Enable the `target-applies-to-host` key in the .cargo/config.toml file"),
rustdoc_map: bool = ("Allow passing external documentation mappings to rustdoc"),
separate_nightlies: bool = (HIDDEN),
Expand Down Expand Up @@ -875,6 +876,7 @@ impl CliUnstable {
"multitarget" => self.multitarget = parse_empty(k, v)?,
"rustdoc-map" => self.rustdoc_map = parse_empty(k, v)?,
"terminal-width" => self.terminal_width = Some(parse_usize_opt(v)?),
"http-registry" => self.http_registry = parse_empty(k, v)?,
"namespaced-features" => stabilized_warn(k, "1.60", STABILISED_NAMESPACED_FEATURES),
"weak-dep-features" => stabilized_warn(k, "1.60", STABILIZED_WEAK_DEP_FEATURES),
"credential-process" => self.credential_process = parse_empty(k, v)?,
Expand Down
11 changes: 2 additions & 9 deletions src/cargo/core/package.rs
Original file line number Diff line number Diff line change
Expand Up @@ -405,13 +405,6 @@ impl<'cfg> PackageSet<'cfg> {
) -> CargoResult<PackageSet<'cfg>> {
// We've enabled the `http2` feature of `curl` in Cargo, so treat
// failures here as fatal as it would indicate a build-time problem.
//
// Note that the multiplexing support is pretty new so we're having it
// off-by-default temporarily.
//
// Also note that pipelining is disabled as curl authors have indicated
// that it's buggy, and we've empirically seen that it's buggy with HTTP
// proxies.
let mut multi = Multi::new();
let multiplexing = config.http_config()?.multiplexing.unwrap_or(true);
multi
Expand Down Expand Up @@ -700,7 +693,7 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
return Ok(Some(pkg));
}

// Ask the original source fo this `PackageId` for the corresponding
// Ask the original source for this `PackageId` for the corresponding
// package. That may immediately come back and tell us that the package
// is ready, or it could tell us that it needs to be downloaded.
let mut sources = self.set.sources.borrow_mut();
Expand Down Expand Up @@ -757,7 +750,7 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
// initiate dozens of connections to crates.io, but rather only one.
// Once the main one is opened we realized that pipelining is possible
// and multiplexing is possible with static.crates.io. All in all this
// reduces the number of connections done to a more manageable state.
// reduces the number of connections down to a more manageable state.
try_old_curl!(handle.pipewait(true), "pipewait");

handle.write_function(move |buf| {
Expand Down
7 changes: 6 additions & 1 deletion src/cargo/core/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,11 @@ impl<'cfg> PackageRegistry<'cfg> {
}

self.load(namespace, kind)?;

// This isn't strictly necessary since it will be called later.
// However it improves error messages for sources that issue errors
// in `block_until_ready` because the callers here have context about
// which deps are being resolved.
self.block_until_ready()?;
Ok(())
}
Expand Down Expand Up @@ -273,7 +278,7 @@ impl<'cfg> PackageRegistry<'cfg> {
// First up we need to actually resolve each `deps` specification to
// precisely one summary. We're not using the `query` method below as it
// internally uses maps we're building up as part of this method
// (`patches_available` and `patches). Instead we're going straight to
// (`patches_available` and `patches`). Instead we're going straight to
// the source to load information from it.
//
// Remember that each dependency listed in `[patch]` has to resolve to
Expand Down
7 changes: 6 additions & 1 deletion src/cargo/core/source/source_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@ impl SourceId {
Ok(SourceId::new(SourceKind::Registry, url, None)?
.with_precise(Some("locked".to_string())))
}
"sparse" => {
let url = string.into_url()?;
Ok(SourceId::new(SourceKind::Registry, url, None)?
.with_precise(Some("locked".to_string())))
}
"path" => {
let url = url.into_url()?;
SourceId::new(SourceKind::Path, url, None)
Expand Down Expand Up @@ -301,7 +306,7 @@ impl SourceId {
self,
yanked_whitelist,
config,
))),
)?)),
SourceKind::LocalRegistry => {
let path = match self.inner.url.to_file_path() {
Ok(p) => p,
Expand Down
9 changes: 6 additions & 3 deletions src/cargo/ops/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ fn registry(
}
let api_host = {
let _lock = config.acquire_package_cache_lock()?;
let mut src = RegistrySource::remote(sid, &HashSet::new(), config);
let mut src = RegistrySource::remote(sid, &HashSet::new(), config)?;
// Only update the index if the config is not available or `force` is set.
if force_update {
src.invalidate_cache()
Expand Down Expand Up @@ -528,8 +528,11 @@ pub fn http_handle_and_timeout(config: &Config) -> CargoResult<(Easy, HttpTimeou
specified"
)
}
if !config.network_allowed() {
bail!("can't make HTTP request in the offline mode")
if config.offline() {
bail!(
"attempting to make an HTTP request, but --offline was \
specified"
)
}

// The timeout option for libcurl by default times out the entire transfer,
Expand Down
10 changes: 3 additions & 7 deletions src/cargo/sources/path.rs
Original file line number Diff line number Diff line change
Expand Up @@ -498,9 +498,7 @@ impl<'cfg> Debug for PathSource<'cfg> {

impl<'cfg> Source for PathSource<'cfg> {
fn query(&mut self, dep: &Dependency, f: &mut dyn FnMut(Summary)) -> Poll<CargoResult<()>> {
if !self.updated {
return Poll::Pending;
}
self.update()?;
for s in self.packages.iter().map(|p| p.summary()) {
if dep.matches(s) {
f(s.clone())
Expand All @@ -514,9 +512,7 @@ impl<'cfg> Source for PathSource<'cfg> {
_dep: &Dependency,
f: &mut dyn FnMut(Summary),
) -> Poll<CargoResult<()>> {
if !self.updated {
return Poll::Pending;
}
self.update()?;
for s in self.packages.iter().map(|p| p.summary()) {
f(s.clone())
}
Expand All @@ -537,7 +533,7 @@ impl<'cfg> Source for PathSource<'cfg> {

fn download(&mut self, id: PackageId) -> CargoResult<MaybePackage> {
trace!("getting packages; id={}", id);

self.update()?;
let pkg = self.packages.iter().find(|pkg| pkg.package_id() == id);
pkg.cloned()
.map(MaybePackage::Ready)
Expand Down
Loading

0 comments on commit 1366225

Please sign in to comment.