Skip to content

Commit

Permalink
Merge branch 'main' into rklaehn/separate-provide
Browse files Browse the repository at this point in the history
  • Loading branch information
rklaehn authored Oct 24, 2022
2 parents 245a6c7 + 7b0a631 commit eb40e69
Show file tree
Hide file tree
Showing 18 changed files with 177 additions and 84 deletions.
2 changes: 1 addition & 1 deletion iroh-api/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ impl Api for Iroh {

fn p2p(&self) -> Result<ClientP2p> {
let p2p_client = self.client.try_p2p()?;
Ok(ClientP2p::new(p2p_client.clone()))
Ok(ClientP2p::new(p2p_client))
}

fn get_stream(
Expand Down
1 change: 1 addition & 0 deletions iroh-gateway/src/bad_bits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ mod tests {
gateway_addr: None,
p2p_addr: None,
store_addr: None,
channels: Some(1),
},
);
config.set_default_headers();
Expand Down
4 changes: 2 additions & 2 deletions iroh-gateway/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub struct Args {
#[clap(long)]
pub cfg: Option<PathBuf>,
#[clap(long)]
denylist: bool,
use_denylist: bool,
}

impl Args {
Expand All @@ -39,7 +39,7 @@ impl Args {
if let Some(cache) = self.cache {
map.insert("cache", cache.to_string());
}
map.insert("denylist", self.denylist.to_string());
map.insert("use_denylist", self.use_denylist.to_string());
map.insert("metrics.collect", self.metrics.to_string());
map.insert("metrics.tracing", self.tracing.to_string());
map
Expand Down
24 changes: 19 additions & 5 deletions iroh-gateway/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,12 @@ pub struct Config {
/// default port to listen on
pub port: u16,
/// flag to toggle whether the gateway should use denylist on requests
pub denylist: bool,
pub use_denylist: bool,
/// URL of gateways to be used by the racing resolver.
/// strings can either be urls or subdomain gateway roots
/// values without https:// prefix are treated as subdomain gateways (eg: dweb.link)
/// values with are treated as IPFS path gateways (eg: https://ipfs.io)
pub http_resolvers: Option<Vec<String>>,
/// rpc addresses for the gateway & addresses for the rpc client to dial
pub rpc_client: RpcClientConfig,
/// metrics configuration
Expand All @@ -46,8 +51,9 @@ impl Config {
headers: HeaderMap::new(),
port,
rpc_client,
http_resolvers: None,
metrics: MetricsConfig::default(),
denylist: false,
use_denylist: false,
}
}

Expand Down Expand Up @@ -113,8 +119,9 @@ impl Default for Config {
headers: HeaderMap::new(),
port: DEFAULT_PORT,
rpc_client,
http_resolvers: None,
metrics: MetricsConfig::default(),
denylist: false,
use_denylist: false,
};
t.set_default_headers();
t
Expand All @@ -130,14 +137,18 @@ impl Source for Config {
let rpc_client = self.rpc_client.collect()?;
let mut map: Map<String, Value> = Map::new();
insert_into_config_map(&mut map, "public_url_base", self.public_url_base.clone());
insert_into_config_map(&mut map, "denylist", self.denylist);
insert_into_config_map(&mut map, "use_denylist", self.use_denylist);
// Some issue between deserializing u64 & u16, converting this to
// an signed int fixes the issue
insert_into_config_map(&mut map, "port", self.port as i32);
insert_into_config_map(&mut map, "headers", collect_headers(&self.headers)?);
insert_into_config_map(&mut map, "rpc_client", rpc_client);
let metrics = self.metrics.collect()?;
insert_into_config_map(&mut map, "metrics", metrics);

if let Some(http_resolvers) = &self.http_resolvers {
insert_into_config_map(&mut map, "http_resolvers", http_resolvers.clone());
}
Ok(map)
}
}
Expand Down Expand Up @@ -200,7 +211,10 @@ mod tests {
Value::new(None, default.public_url_base.clone()),
);
expect.insert("port".to_string(), Value::new(None, default.port as i64));
expect.insert("denylist".to_string(), Value::new(None, default.denylist));
expect.insert(
"use_denylist".to_string(),
Value::new(None, default.use_denylist),
);
expect.insert(
"headers".to_string(),
Value::new(None, collect_headers(&default.headers).unwrap()),
Expand Down
2 changes: 2 additions & 0 deletions iroh-gateway/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ mod tests {
gateway_addr: None,
p2p_addr: None,
store_addr: None,
channels: Some(1),
},
);
config.set_default_headers();
Expand Down Expand Up @@ -199,6 +200,7 @@ mod tests {
gateway_addr: None,
p2p_addr: None,
store_addr: Some(store_client_addr),
channels: Some(1),
},
);
config.set_default_headers();
Expand Down
9 changes: 7 additions & 2 deletions iroh-gateway/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use iroh_gateway::{
core::Core,
metrics,
};
use iroh_resolver::racing::RacingLoader;
use iroh_rpc_client::Client as RpcClient;
use iroh_util::lock::ProgramLock;
use iroh_util::{iroh_config_path, make_config};
Expand Down Expand Up @@ -38,14 +39,18 @@ async fn main() -> Result<()> {
println!("{:#?}", config);

let metrics_config = config.metrics.clone();
let bad_bits = match config.denylist {
let bad_bits = match config.use_denylist {
true => Arc::new(Some(RwLock::new(BadBits::new()))),
false => Arc::new(None),
};
let rpc_addr = config
.server_rpc_addr()?
.ok_or_else(|| anyhow!("missing gateway rpc addr"))?;
let content_loader = RpcClient::new(config.rpc_client.clone()).await?;

let content_loader = RacingLoader::new(
RpcClient::new(config.rpc_client.clone()).await?,
config.http_resolvers.clone().unwrap_or_default(),
);
let handler = Core::new(
Arc::new(config),
rpc_addr,
Expand Down
10 changes: 1 addition & 9 deletions iroh-one/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ pub struct Config {
/// Path for the UDS socket for the gateway.
#[cfg(feature = "uds-gateway")]
pub gateway_uds_path: Option<PathBuf>,
/// URL of the gateway used by the racing resolver.
pub resolver_gateway: Option<String>,
/// Gateway specific configuration.
pub gateway: iroh_gateway::config::Config,
/// Store specific configuration.
Expand All @@ -46,7 +44,6 @@ impl Config {
p2p: iroh_p2p::config::Config,
rpc_client: RpcClientConfig,
#[cfg(feature = "uds-gateway")] gateway_uds_path: Option<PathBuf>,
resolver_gateway: Option<String>,
) -> Self {
Self {
gateway,
Expand All @@ -56,7 +53,6 @@ impl Config {
metrics: MetricsConfig::default(),
#[cfg(feature = "uds-gateway")]
gateway_uds_path,
resolver_gateway,
}
}

Expand All @@ -79,6 +75,7 @@ impl Config {
gateway_addr: None,
p2p_addr: None,
store_addr: None,
channels: Some(1),
}
}

Expand Down Expand Up @@ -110,7 +107,6 @@ impl Default for Config {
p2p: default_p2p_config(rpc_client, metrics_config, key_store_path),
#[cfg(feature = "uds-gateway")]
gateway_uds_path: Some(gateway_uds_path),
resolver_gateway: None,
}
}
}
Expand Down Expand Up @@ -162,10 +158,6 @@ impl Source for Config {
uds_path.to_str().unwrap().to_string(),
);
}
if let Some(resolver_gateway) = &self.resolver_gateway {
insert_into_config_map(&mut map, "resolver_gateway", resolver_gateway.clone());
}

Ok(map)
}
}
Expand Down
1 change: 0 additions & 1 deletion iroh-one/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
pub mod cli;
pub mod config;
pub mod content_loader;
pub mod mem_p2p;
pub mod mem_store;
#[cfg(feature = "uds-gateway")]
Expand Down
8 changes: 4 additions & 4 deletions iroh-one/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use iroh_one::{
cli::Args,
config::{Config, CONFIG_FILE_NAME, ENV_PREFIX},
};
use iroh_resolver::racing::RacingLoader;
use iroh_rpc_client::Client as RpcClient;
use iroh_rpc_types::Addr;
use iroh_util::lock::ProgramLock;
Expand Down Expand Up @@ -70,15 +71,14 @@ async fn main() -> Result<()> {
.server_rpc_addr()?
.ok_or_else(|| anyhow!("missing gateway rpc addr"))?;

let bad_bits = match config.gateway.denylist {
let bad_bits = match config.gateway.use_denylist {
true => Arc::new(Some(RwLock::new(BadBits::new()))),
false => Arc::new(None),
};

// let content_loader = RpcClient::new(config.rpc_client.clone()).await?;
let content_loader = iroh_one::content_loader::RacingLoader::new(
let content_loader = RacingLoader::new(
RpcClient::new(config.rpc_client.clone()).await?,
config.resolver_gateway.clone(),
config.gateway.http_resolvers.clone().unwrap_or_default(),
);
let shared_state = Core::make_state(
Arc::new(config.clone()),
Expand Down
3 changes: 2 additions & 1 deletion iroh-p2p/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1033,6 +1033,7 @@ mod tests {

let cfg = iroh_rpc_client::Config {
p2p_addr: Some(rpc_client_addr),
channels: Some(1),
..Default::default()
};
let p2p_task = tokio::task::spawn(async move {
Expand All @@ -1048,7 +1049,7 @@ mod tests {
.unwrap();

let mut providers = Vec::new();
let mut chan = client.p2p.unwrap().fetch_providers_dht(&c).await?;
let mut chan = client.try_p2p().unwrap().fetch_providers_dht(&c).await?;
while let Some(new_providers) = chan.next().await {
let new_providers = new_providers.unwrap();
println!("providers found: {}", new_providers.len());
Expand Down
2 changes: 2 additions & 0 deletions iroh-resolver/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ once_cell = "1.13.0"
tokio-util = { version = "0.7", features = ["io"] }
libp2p = { version = "0.50", default-features = false }
async-channel = "1.7.1"
reqwest = {version = "0.11", features = ["rustls-tls"], default-features = false}
rand = "0.8.5"

[dev-dependencies]
criterion = { version = "0.4.0", features = ["async_tokio"] }
Expand Down
1 change: 1 addition & 0 deletions iroh-resolver/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ pub mod balanced_tree;
pub mod chunker;
pub mod codecs;
pub mod hamt;
pub mod racing;
pub mod resolver;
pub mod unixfs;
pub mod unixfs_builder;
Expand Down
38 changes: 24 additions & 14 deletions iroh-one/src/content_loader.rs → iroh-resolver/src/racing.rs
Original file line number Diff line number Diff line change
@@ -1,36 +1,40 @@
//! A content loader implementation for iroh-one.
use crate::resolver::{
parse_links, ContentLoader, ContextId, LoadedCid, LoaderContext, Source, IROH_STORE,
};
use anyhow::{anyhow, Result};
use async_trait::async_trait;
use bytes::Bytes;
use cid::{multibase, Cid};
use futures::{future::FutureExt, pin_mut, select};
use iroh_resolver::resolver::{
parse_links, ContentLoader, ContextId, LoadedCid, LoaderContext, Source, IROH_STORE,
};
use iroh_rpc_client::Client as RpcClient;
use rand::seq::SliceRandom;
use tracing::{debug, error, trace, warn};

#[derive(Clone, Debug)]
pub struct RacingLoader {
rpc_client: RpcClient,
gateway: Option<String>,
http_resolvers: Vec<String>,
}

impl RacingLoader {
pub fn new(rpc_client: RpcClient, gateway: Option<String>) -> Self {
pub fn new(rpc_client: RpcClient, http_resolvers: Vec<String>) -> Self {
Self {
rpc_client,
gateway,
http_resolvers,
}
}
}

impl RacingLoader {
pub fn try_raw_gateway(&self) -> Result<&String> {
self.gateway
.as_ref()
.ok_or_else(|| anyhow!("no gateway configured to fetch raw CIDs"))
match self.http_resolvers.len() {
0 => Err(anyhow!("no gateway configured to fetch raw CIDs")),
_ => {
let mut rng = rand::thread_rng();
let gw = self.http_resolvers.choose(&mut rng).unwrap();
Ok(gw)
}
}
}

async fn fetch_p2p(&self, ctx: ContextId, cid: &Cid) -> Result<Bytes, anyhow::Error> {
Expand All @@ -42,7 +46,13 @@ impl RacingLoader {
async fn fetch_http(&self, cid: &Cid) -> Result<(Bytes, String), anyhow::Error> {
let gateway = self.try_raw_gateway()?;
let cid_str = multibase::encode(multibase::Base::Base32Lower, cid.to_bytes().as_slice());
let gateway_url = format!("https://{}.ipfs.{}?format=raw", cid_str, gateway);
// support two gateway URL formats: subdomain gateways (eg: dweb.link)
// and full URL (eg: https://ipfs.io)
let gateway_url = if gateway.starts_with("https://") || gateway.starts_with("http://") {
format!("{}/ipfs/{}?format=raw", gateway, cid_str)
} else {
format!("https://{}.ipfs.{}?format=raw", cid_str, gateway)
};
debug!("Will fetch {}", gateway_url);
let response = reqwest::get(gateway_url).await?;
response
Expand Down Expand Up @@ -115,8 +125,8 @@ impl ContentLoader for RacingLoader {

let len = cloned.len();
let links_len = links.len();
if let Some(store_rpc) = rpc.store.as_ref() {
match store_rpc.put(cid, cloned, links).await {
if let Ok(store_rpc) = rpc.try_store() {
match store_rpc.clone().put(cid, cloned, links).await {
Ok(_) => debug!("stored {} ({}bytes, {}links)", cid, len, links_len),
Err(err) => {
warn!("failed to store {}: {:?}", cid, err);
Expand Down
12 changes: 6 additions & 6 deletions iroh-resolver/src/resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use libipld::{Ipld, IpldCodec};
use tokio::io::{AsyncRead, AsyncSeek};
use tokio::sync::Mutex;
use tokio::task::JoinHandle;
use tracing::{debug, error, trace, warn};
use tracing::{debug, trace, warn};

use iroh_metrics::{
core::{MObserver, MRecorder},
Expand Down Expand Up @@ -797,8 +797,8 @@ impl ContentLoader for Client {

// trigger storage in the background
let clone = bytes.clone();
let store = self.store.as_ref().cloned();
let p2p = self.try_p2p()?.clone();
let store = self.try_store();
let p2p = self.try_p2p()?;

tokio::spawn(async move {
let clone2 = clone.clone();
Expand All @@ -809,7 +809,7 @@ impl ContentLoader for Client {

let len = clone.len();
let links_len = links.len();
if let Some(store_rpc) = store.as_ref() {
if let Ok(store_rpc) = store {
match store_rpc.put(cid, clone.clone(), links).await {
Ok(_) => {
debug!("stored {} ({}bytes, {}links)", cid, len, links_len);
Expand Down Expand Up @@ -851,11 +851,11 @@ impl<T: ContentLoader> Resolver<T> {
let loader = loader_thread.clone();

tokio::task::spawn(async move {
error!("stopping session {}", session);
debug!("stopping session {}", session);
if let Err(err) = loader.stop_session(session).await {
warn!("failed to stop session {}: {:?}", session, err);
}
error!("stopping session {} done", session);
debug!("stopping session {} done", session);
});
}
});
Expand Down
Loading

0 comments on commit eb40e69

Please sign in to comment.