Skip to content

Commit

Permalink
race on gateway (#403)
Browse files Browse the repository at this point in the history
moves the racing gateway functionality into the "main" gateway, and changes the configuration location to be gateway.http_resolvers, a vec of strings.
  • Loading branch information
b5 authored Oct 24, 2022
1 parent b499751 commit 7b0a631
Show file tree
Hide file tree
Showing 9 changed files with 57 additions and 35 deletions.
4 changes: 2 additions & 2 deletions iroh-gateway/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub struct Args {
#[clap(long)]
pub cfg: Option<PathBuf>,
#[clap(long)]
denylist: bool,
use_denylist: bool,
}

impl Args {
Expand All @@ -39,7 +39,7 @@ impl Args {
if let Some(cache) = self.cache {
map.insert("cache", cache.to_string());
}
map.insert("denylist", self.denylist.to_string());
map.insert("use_denylist", self.use_denylist.to_string());
map.insert("metrics.collect", self.metrics.to_string());
map.insert("metrics.tracing", self.tracing.to_string());
map
Expand Down
24 changes: 19 additions & 5 deletions iroh-gateway/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,12 @@ pub struct Config {
/// default port to listen on
pub port: u16,
/// flag to toggle whether the gateway should use denylist on requests
pub denylist: bool,
pub use_denylist: bool,
/// URL of gateways to be used by the racing resolver.
/// strings can either be urls or subdomain gateway roots
/// values without https:// prefix are treated as subdomain gateways (eg: dweb.link)
/// values with are treated as IPFS path gateways (eg: https://ipfs.io)
pub http_resolvers: Option<Vec<String>>,
/// rpc addresses for the gateway & addresses for the rpc client to dial
pub rpc_client: RpcClientConfig,
/// metrics configuration
Expand All @@ -46,8 +51,9 @@ impl Config {
headers: HeaderMap::new(),
port,
rpc_client,
http_resolvers: None,
metrics: MetricsConfig::default(),
denylist: false,
use_denylist: false,
}
}

Expand Down Expand Up @@ -113,8 +119,9 @@ impl Default for Config {
headers: HeaderMap::new(),
port: DEFAULT_PORT,
rpc_client,
http_resolvers: None,
metrics: MetricsConfig::default(),
denylist: false,
use_denylist: false,
};
t.set_default_headers();
t
Expand All @@ -130,14 +137,18 @@ impl Source for Config {
let rpc_client = self.rpc_client.collect()?;
let mut map: Map<String, Value> = Map::new();
insert_into_config_map(&mut map, "public_url_base", self.public_url_base.clone());
insert_into_config_map(&mut map, "denylist", self.denylist);
insert_into_config_map(&mut map, "use_denylist", self.use_denylist);
// Some issue between deserializing u64 & u16, converting this to
// an signed int fixes the issue
insert_into_config_map(&mut map, "port", self.port as i32);
insert_into_config_map(&mut map, "headers", collect_headers(&self.headers)?);
insert_into_config_map(&mut map, "rpc_client", rpc_client);
let metrics = self.metrics.collect()?;
insert_into_config_map(&mut map, "metrics", metrics);

if let Some(http_resolvers) = &self.http_resolvers {
insert_into_config_map(&mut map, "http_resolvers", http_resolvers.clone());
}
Ok(map)
}
}
Expand Down Expand Up @@ -200,7 +211,10 @@ mod tests {
Value::new(None, default.public_url_base.clone()),
);
expect.insert("port".to_string(), Value::new(None, default.port as i64));
expect.insert("denylist".to_string(), Value::new(None, default.denylist));
expect.insert(
"use_denylist".to_string(),
Value::new(None, default.use_denylist),
);
expect.insert(
"headers".to_string(),
Value::new(None, collect_headers(&default.headers).unwrap()),
Expand Down
9 changes: 7 additions & 2 deletions iroh-gateway/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use iroh_gateway::{
core::Core,
metrics,
};
use iroh_resolver::racing::RacingLoader;
use iroh_rpc_client::Client as RpcClient;
use iroh_util::lock::ProgramLock;
use iroh_util::{iroh_config_path, make_config};
Expand Down Expand Up @@ -38,14 +39,18 @@ async fn main() -> Result<()> {
println!("{:#?}", config);

let metrics_config = config.metrics.clone();
let bad_bits = match config.denylist {
let bad_bits = match config.use_denylist {
true => Arc::new(Some(RwLock::new(BadBits::new()))),
false => Arc::new(None),
};
let rpc_addr = config
.server_rpc_addr()?
.ok_or_else(|| anyhow!("missing gateway rpc addr"))?;
let content_loader = RpcClient::new(config.rpc_client.clone()).await?;

let content_loader = RacingLoader::new(
RpcClient::new(config.rpc_client.clone()).await?,
config.http_resolvers.clone().unwrap_or_default(),
);
let handler = Core::new(
Arc::new(config),
rpc_addr,
Expand Down
9 changes: 0 additions & 9 deletions iroh-one/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ pub struct Config {
/// Path for the UDS socket for the gateway.
#[cfg(feature = "uds-gateway")]
pub gateway_uds_path: Option<PathBuf>,
/// URL of the gateway used by the racing resolver.
pub resolver_gateway: Option<String>,
/// Gateway specific configuration.
pub gateway: iroh_gateway::config::Config,
/// Store specific configuration.
Expand All @@ -46,7 +44,6 @@ impl Config {
p2p: iroh_p2p::config::Config,
rpc_client: RpcClientConfig,
#[cfg(feature = "uds-gateway")] gateway_uds_path: Option<PathBuf>,
resolver_gateway: Option<String>,
) -> Self {
Self {
gateway,
Expand All @@ -56,7 +53,6 @@ impl Config {
metrics: MetricsConfig::default(),
#[cfg(feature = "uds-gateway")]
gateway_uds_path,
resolver_gateway,
}
}

Expand Down Expand Up @@ -111,7 +107,6 @@ impl Default for Config {
p2p: default_p2p_config(rpc_client, metrics_config, key_store_path),
#[cfg(feature = "uds-gateway")]
gateway_uds_path: Some(gateway_uds_path),
resolver_gateway: None,
}
}
}
Expand Down Expand Up @@ -163,10 +158,6 @@ impl Source for Config {
uds_path.to_str().unwrap().to_string(),
);
}
if let Some(resolver_gateway) = &self.resolver_gateway {
insert_into_config_map(&mut map, "resolver_gateway", resolver_gateway.clone());
}

Ok(map)
}
}
Expand Down
1 change: 0 additions & 1 deletion iroh-one/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
pub mod cli;
pub mod config;
pub mod content_loader;
pub mod mem_p2p;
pub mod mem_store;
#[cfg(feature = "uds-gateway")]
Expand Down
8 changes: 4 additions & 4 deletions iroh-one/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use iroh_one::{
cli::Args,
config::{Config, CONFIG_FILE_NAME, ENV_PREFIX},
};
use iroh_resolver::racing::RacingLoader;
use iroh_rpc_client::Client as RpcClient;
use iroh_rpc_types::Addr;
use iroh_util::lock::ProgramLock;
Expand Down Expand Up @@ -70,15 +71,14 @@ async fn main() -> Result<()> {
.server_rpc_addr()?
.ok_or_else(|| anyhow!("missing gateway rpc addr"))?;

let bad_bits = match config.gateway.denylist {
let bad_bits = match config.gateway.use_denylist {
true => Arc::new(Some(RwLock::new(BadBits::new()))),
false => Arc::new(None),
};

// let content_loader = RpcClient::new(config.rpc_client.clone()).await?;
let content_loader = iroh_one::content_loader::RacingLoader::new(
let content_loader = RacingLoader::new(
RpcClient::new(config.rpc_client.clone()).await?,
config.resolver_gateway.clone(),
config.gateway.http_resolvers.clone().unwrap_or_default(),
);
let shared_state = Core::make_state(
Arc::new(config.clone()),
Expand Down
2 changes: 2 additions & 0 deletions iroh-resolver/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ once_cell = "1.13.0"
tokio-util = { version = "0.7", features = ["io"] }
libp2p = { version = "0.50", default-features = false }
async-channel = "1.7.1"
reqwest = {version = "0.11", features = ["rustls-tls"], default-features = false}
rand = "0.8.5"

[dev-dependencies]
criterion = { version = "0.4.0", features = ["async_tokio"] }
Expand Down
1 change: 1 addition & 0 deletions iroh-resolver/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ pub mod balanced_tree;
pub mod chunker;
pub mod codecs;
pub mod hamt;
pub mod racing;
pub mod resolver;
pub mod unixfs;
pub mod unixfs_builder;
Expand Down
34 changes: 22 additions & 12 deletions iroh-one/src/content_loader.rs → iroh-resolver/src/racing.rs
Original file line number Diff line number Diff line change
@@ -1,36 +1,40 @@
//! A content loader implementation for iroh-one.
use crate::resolver::{
parse_links, ContentLoader, ContextId, LoadedCid, LoaderContext, Source, IROH_STORE,
};
use anyhow::{anyhow, Result};
use async_trait::async_trait;
use bytes::Bytes;
use cid::{multibase, Cid};
use futures::{future::FutureExt, pin_mut, select};
use iroh_resolver::resolver::{
parse_links, ContentLoader, ContextId, LoadedCid, LoaderContext, Source, IROH_STORE,
};
use iroh_rpc_client::Client as RpcClient;
use rand::seq::SliceRandom;
use tracing::{debug, error, trace, warn};

#[derive(Clone, Debug)]
pub struct RacingLoader {
rpc_client: RpcClient,
gateway: Option<String>,
http_resolvers: Vec<String>,
}

impl RacingLoader {
pub fn new(rpc_client: RpcClient, gateway: Option<String>) -> Self {
pub fn new(rpc_client: RpcClient, http_resolvers: Vec<String>) -> Self {
Self {
rpc_client,
gateway,
http_resolvers,
}
}
}

impl RacingLoader {
pub fn try_raw_gateway(&self) -> Result<&String> {
self.gateway
.as_ref()
.ok_or_else(|| anyhow!("no gateway configured to fetch raw CIDs"))
match self.http_resolvers.len() {
0 => Err(anyhow!("no gateway configured to fetch raw CIDs")),
_ => {
let mut rng = rand::thread_rng();
let gw = self.http_resolvers.choose(&mut rng).unwrap();
Ok(gw)
}
}
}

async fn fetch_p2p(&self, ctx: ContextId, cid: &Cid) -> Result<Bytes, anyhow::Error> {
Expand All @@ -42,7 +46,13 @@ impl RacingLoader {
async fn fetch_http(&self, cid: &Cid) -> Result<(Bytes, String), anyhow::Error> {
let gateway = self.try_raw_gateway()?;
let cid_str = multibase::encode(multibase::Base::Base32Lower, cid.to_bytes().as_slice());
let gateway_url = format!("https://{}.ipfs.{}?format=raw", cid_str, gateway);
// support two gateway URL formats: subdomain gateways (eg: dweb.link)
// and full URL (eg: https://ipfs.io)
let gateway_url = if gateway.starts_with("https://") || gateway.starts_with("http://") {
format!("{}/ipfs/{}?format=raw", gateway, cid_str)
} else {
format!("https://{}.ipfs.{}?format=raw", cid_str, gateway)
};
debug!("Will fetch {}", gateway_url);
let response = reqwest::get(gateway_url).await?;
response
Expand Down

0 comments on commit 7b0a631

Please sign in to comment.