Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: download snapshots from CloudFlare #3492

Merged
merged 5 commits into from
Sep 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ rand = "0.8"
rand_distr = "0.4"
raw_sync = "0.1"
rayon = "1.5"
regex = "1.9"
reqwest = { version = "0.11.18", default-features = false, features = [
"stream",
"rustls-tls",
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ ARG DATA_DIR=/home/forest/.local/share/forest
ENV DEBIAN_FRONTEND="noninteractive"
# Install binary dependencies
RUN apt-get update && \
apt-get install --no-install-recommends -y aria2 ca-certificates && \
apt-get install --no-install-recommends -y ca-certificates && \
rm -rf /var/lib/apt/lists/*
RUN update-ca-certificates

Expand Down
2 changes: 1 addition & 1 deletion Dockerfile-alpine
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ ARG SERVICE_GROUP=forest

# Install binary dependencies
RUN apk update && \
apk add --no-cache aria2 ca-certificates
apk add --no-cache ca-certificates
RUN update-ca-certificates

# Create user and group and assign appropriate rights to the forest binaries
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ install-deps:
# https://github.com/git-lfs/git-lfs/blob/main/INSTALLING.md#1-adding-the-packagecloud-repository
curl -s https://packagecloud.io/install/repositories/github/git-lfs/script.deb.sh | sudo bash
apt-get update -y
apt-get install --no-install-recommends -y git-lfs build-essential clang aria2
apt-get install --no-install-recommends -y git-lfs build-essential clang

install-lint-tools:
cargo install --locked taplo-cli
Expand Down
9 changes: 0 additions & 9 deletions documentation/src/basic_usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,6 @@ For Ubuntu, you can install the dependencies (excluding Rust) with:
sudo apt install build-essential clang
```

### Optional runtime dependencies

[aria2](https://aria2.github.io/) is an alternate backend for downloading the
snapshots. It is significantly faster than the in-built Forest downloader.

```shell
sudo apt install aria2
```

### Compilation & installation

#### From crates.io (latest release)
Expand Down
34 changes: 29 additions & 5 deletions src/cli_shared/logger/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use tracing_chrome::{ChromeLayerBuilder, FlushGuard};
use tracing_subscriber::{filter::LevelFilter, prelude::*, EnvFilter};

use crate::cli_shared::cli::CliOpts;
use crate::utils::misc::LoggingColor;

pub fn setup_logger(opts: &CliOpts) -> (Option<tracing_loki::BackgroundTask>, Option<FlushGuard>) {
let mut loki_task = None;
Expand Down Expand Up @@ -46,7 +47,7 @@ pub fn setup_logger(opts: &CliOpts) -> (Option<tracing_loki::BackgroundTask>, Op
tracing_subscriber::fmt::Layer::new()
.with_ansi(false)
.with_writer(file_appender)
.with_filter(get_env_filter()),
.with_filter(get_env_filter(default_env_filter())),
)
} else {
None
Expand All @@ -71,26 +72,38 @@ pub fn setup_logger(opts: &CliOpts) -> (Option<tracing_loki::BackgroundTask>, Op
.with(
tracing_subscriber::fmt::Layer::new()
.with_ansi(opts.color.coloring_enabled())
.with_filter(get_env_filter()),
.with_filter(get_env_filter(default_env_filter())),
)
.init();
(loki_task, flush_guard)
}

// Log warnings to stderr
pub fn setup_minimal_logger() {
tracing_subscriber::registry()
.with(
tracing_subscriber::fmt::Layer::new()
.with_ansi(LoggingColor::Auto.coloring_enabled())
.with_writer(std::io::stderr)
.with_filter(get_env_filter(default_tool_filter())),
)
.init();
}

/// Returns an [`EnvFilter`] according to the `RUST_LOG` environment variable, or a default
/// - see [`default_env_filter`]
/// - see [`default_env_filter`] and [`default_tool_filter`]
///
/// Note that [`tracing_subscriber::filter::Builder`] only allows a single default directive,
/// whereas we want to provide multiple.
/// See also <https://github.com/tokio-rs/tracing/blob/27f688efb72316a26f3ec1f952c82626692c08ff/tracing-subscriber/src/filter/env/builder.rs#L189-L194>
fn get_env_filter() -> EnvFilter {
fn get_env_filter(def: EnvFilter) -> EnvFilter {
use std::env::{
self,
VarError::{NotPresent, NotUnicode},
};
match env::var(tracing_subscriber::EnvFilter::DEFAULT_ENV) {
Ok(s) => EnvFilter::new(s),
Err(NotPresent) => default_env_filter(),
Err(NotPresent) => def,
Err(NotUnicode(_)) => EnvFilter::default(),
}
}
Expand All @@ -111,6 +124,17 @@ fn default_env_filter() -> EnvFilter {
EnvFilter::try_new(default_directives.join(",")).unwrap()
}

fn default_tool_filter() -> EnvFilter {
let default_directives = [
"warn",
"forest::snapshot=info",
"forest::progress=info",
"libp2p_bitswap=off",
"tracing_loki=off",
];
EnvFilter::try_new(default_directives.join(",")).unwrap()
}

#[test]
fn test_default_env_filter() {
let _did_not_panic = default_env_filter();
Expand Down
134 changes: 57 additions & 77 deletions src/cli_shared/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

use std::{
fmt::Display,
io,
path::{Path, PathBuf},
str::FromStr,
};
Expand All @@ -12,9 +11,9 @@ use crate::{
networks::NetworkChain,
utils::{retry, RetryArgs},
};
use anyhow::{anyhow, bail, Context as _};
use anyhow::{bail, Context as _};
use chrono::NaiveDate;
use tracing::{info, warn};
use tracing::event;
use url::Url;

use crate::cli_shared::snapshot::parse::ParsedFilename;
Expand Down Expand Up @@ -62,17 +61,17 @@ pub fn filename(
.to_string()
}

/// Fetch a compressed snapshot with `aria2c`, falling back to our own HTTP client.
/// Returns the path to the downloaded file, which matches the format in .
/// Returns the path to the downloaded file.
pub async fn fetch(
directory: &Path,
chain: &NetworkChain,
vendor: TrustedVendor,
) -> anyhow::Result<PathBuf> {
let (_len, url) = peek(vendor, chain).await?;
let (date, height, forest_format) = ParsedFilename::parse_url(&url)
.context("unexpected url format")?
let (_len, path) = peek(vendor, chain).await?;
lemmih marked this conversation as resolved.
Show resolved Hide resolved
let (date, height, forest_format) = ParsedFilename::parse_str(&path)
.context("unexpected path format")?
.date_and_height_and_forest();
let url = stable_url(vendor, chain)?;
let filename = filename(vendor, chain, date, height, forest_format);

download_file_with_retry(&url, directory, &filename).await
Expand All @@ -88,26 +87,15 @@ pub async fn download_file_with_retry(
timeout: None,
..Default::default()
},
|| download_file(url.clone(), directory, filename),
|| download_http(url, directory, filename),
)
.await?)
}

pub async fn download_file(url: Url, directory: &Path, filename: &str) -> anyhow::Result<PathBuf> {
match download_aria2c(&url, directory, filename).await {
Ok(path) => Ok(path),
Err(AriaErr::CouldNotExec(reason)) => {
warn!(%reason, "couldn't run aria2c. Falling back to conventional download, which will be much slower - consider installing aria2c.");
download_http(url, directory, filename).await
}
Err(AriaErr::Other(o)) => Err(o),
}
}

/// Returns
/// - The size of the snapshot from this vendor on this chain
/// - The final URL of the snapshot
pub async fn peek(vendor: TrustedVendor, chain: &NetworkChain) -> anyhow::Result<(u64, Url)> {
/// - The filename of the snapshot
pub async fn peek(vendor: TrustedVendor, chain: &NetworkChain) -> anyhow::Result<(u64, String)> {
let stable_url = stable_url(vendor, chain)?;
// issue an actual GET, so the content length will be of the body
// (we never actually fetch the body)
Expand All @@ -118,58 +106,33 @@ pub async fn peek(vendor: TrustedVendor, chain: &NetworkChain) -> anyhow::Result
.await?
.error_for_status()
.context("server returned an error response")?;

let cd_path = response
.headers()
.get(reqwest::header::CONTENT_DISPOSITION)
.and_then(parse_content_disposition);
Ok((
response
.content_length()
.context("no content-length header")?,
response.url().clone(),
cd_path.context("no content-disposition filepath")?,
))
}

enum AriaErr {
CouldNotExec(io::Error),
Other(anyhow::Error),
}

/// Run `aria2c`, with inherited stdout and stderr (so output will be printed).
async fn download_aria2c(url: &Url, directory: &Path, filename: &str) -> Result<PathBuf, AriaErr> {
let exit_status = tokio::process::Command::new("aria2c")
.args([
"--continue=true",
"--max-tries=0",
// Download chunks concurrently, resulting in dramatically faster downloads
"--split=5",
"--max-connection-per-server=5",
format!("--out={filename}").as_str(),
"--dir",
])
.arg(directory)
.arg(url.as_str())
.kill_on_drop(true) // allow cancellation
.spawn() // defaults to inherited stdio
.map_err(AriaErr::CouldNotExec)?
.wait()
.await
.map_err(|it| AriaErr::Other(it.into()))?;

match exit_status.success() {
true => Ok(directory.join(filename)),
false => {
let msg = exit_status
.code()
.map(|it| it.to_string())
.unwrap_or_else(|| String::from("<killed>"));
Err(AriaErr::Other(anyhow!("running aria2c failed: {msg}")))
}
}
// Extract file paths from content-disposition values:
// "attachment; filename=\"911520_2023_09_14T06_13_00Z.car.zst\""
// => "911520_2023_09_14T06_13_00Z.car.zst"
fn parse_content_disposition(value: &reqwest::header::HeaderValue) -> Option<String> {
use regex::Regex;
let re = Regex::new("filename=\"([^\"]+)\"").ok()?;
let cap = re.captures(value.to_str().ok()?)?;
Some(cap.get(1)?.as_str().to_owned())
}

/// Download the file at `url` with a private HTTP client, returning the path to the downloaded file
async fn download_http(url: Url, directory: &Path, filename: &str) -> anyhow::Result<PathBuf> {
async fn download_http(url: &Url, directory: &Path, filename: &str) -> anyhow::Result<PathBuf> {
let dst_path = directory.join(filename);

info!(%url, "downloading snapshot");
event!(target: "forest::snapshot", tracing::Level::INFO, %url, "downloading snapshot");
let mut reader = crate::utils::net::reader(url.as_str()).await?;

let mut dst = tokio::fs::File::create(&dst_path)
Expand All @@ -195,17 +158,16 @@ macro_rules! define_urls {
}

define_urls!(
const FOREST_MAINNET_COMPRESSED: &str =
"https://forest.chainsafe.io/mainnet/snapshot-latest.car.zst";
const FOREST_MAINNET_COMPRESSED: &str = "https://forest-archive.chainsafe.dev/latest/mainnet/";
const FOREST_CALIBNET_COMPRESSED: &str =
"https://forest.chainsafe.io/calibnet/snapshot-latest.car.zst";
"https://forest-archive.chainsafe.dev/latest/calibnet/";
const FILOPS_MAINNET_COMPRESSED: &str =
"https://snapshots.mainnet.filops.net/minimal/latest.zst";
const FILOPS_CALIBNET_COMPRESSED: &str =
"https://snapshots.calibrationnet.filops.net/minimal/latest.zst";
);

fn stable_url(vendor: TrustedVendor, chain: &NetworkChain) -> anyhow::Result<Url> {
pub fn stable_url(vendor: TrustedVendor, chain: &NetworkChain) -> anyhow::Result<Url> {
let s = match (vendor, chain) {
(TrustedVendor::Forest, NetworkChain::Mainnet) => FOREST_MAINNET_COMPRESSED,
(TrustedVendor::Forest, NetworkChain::Calibnet) => FOREST_CALIBNET_COMPRESSED,
Expand Down Expand Up @@ -234,7 +196,7 @@ mod parse {

use std::{fmt::Display, str::FromStr};

use anyhow::{anyhow, bail, Context};
use anyhow::{anyhow, bail};
use chrono::{NaiveDate, NaiveDateTime, NaiveTime};
use nom::{
branch::alt,
Expand All @@ -247,7 +209,6 @@ mod parse {
sequence::tuple,
Err,
};
use url::Url;

use crate::db::car::forest::FOREST_CAR_FILE_EXTENSION;

Expand Down Expand Up @@ -305,15 +266,6 @@ mod parse {
pub fn parse_str(input: &'a str) -> anyhow::Result<Self> {
enter_nom(alt((short, full)), input)
}

pub fn parse_url(url: &'a Url) -> anyhow::Result<Self> {
let filename = url
.path_segments()
.context("url cannot be a base")?
.last()
.context("url has no path")?;
Self::parse_str(filename)
}
}

/// Parse a number using its [`FromStr`] implementation.
Expand Down Expand Up @@ -492,3 +444,31 @@ mod parse {
}
}
}

#[cfg(test)]
mod tests {
use super::parse_content_disposition;
use reqwest::header::HeaderValue;

#[test]
fn content_disposition_forest() {
assert_eq!(
parse_content_disposition(&HeaderValue::from_static(
"attachment; filename*=UTF-8''forest_snapshot_calibnet_2023-09-14_height_911888.forest.car.zst; \
filename=\"forest_snapshot_calibnet_2023-09-14_height_911888.forest.car.zst\""
)).unwrap(),
"forest_snapshot_calibnet_2023-09-14_height_911888.forest.car.zst"
);
}

#[test]
fn content_disposition_filops() {
assert_eq!(
parse_content_disposition(&HeaderValue::from_static(
"attachment; filename=\"911520_2023_09_14T06_13_00Z.car.zst\""
))
.unwrap(),
"911520_2023_09_14T06_13_00Z.car.zst"
);
}
}
10 changes: 6 additions & 4 deletions src/daemon/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -488,15 +488,17 @@ async fn set_snapshot_path_if_needed(
(false, _, _) => {} // noop - don't need a snapshot
(true, true, _) => {} // noop - we need a snapshot, and we have one
(true, false, true) => {
let (_len, url) = crate::cli_shared::snapshot::peek(vendor, chain).await?;
let url = crate::cli_shared::snapshot::stable_url(vendor, chain)?;
config.client.snapshot_path = Some(url.to_string().into());
config.client.snapshot = true;
}
(true, false, false) => {
// we need a snapshot, don't have one, and don't have permission to download one, so ask the user
let (num_bytes, url) = crate::cli_shared::snapshot::peek(vendor, &config.chain.network)
.await
.context("couldn't get snapshot size")?;
let url = crate::cli_shared::snapshot::stable_url(vendor, chain)?;
let (num_bytes, _path) =
crate::cli_shared::snapshot::peek(vendor, &config.chain.network)
.await
.context("couldn't get snapshot size")?;
// dialoguer will double-print long lines, so manually print the first clause ourselves,
// then let `Confirm` handle the second.
println!("Forest requires a snapshot to sync with the network, but automatic fetching is disabled.");
Expand Down
Loading