From b4997515edadcbbba85d6ad1e8f1aa5c16c3d84f Mon Sep 17 00:00:00 2001 From: Asmir Avdicevic Date: Mon, 24 Oct 2022 19:31:39 +0200 Subject: [PATCH 1/2] feat: connection pooling (#404) --- iroh-api/src/api.rs | 2 +- iroh-gateway/src/bad_bits.rs | 1 + iroh-gateway/src/core.rs | 2 + iroh-one/src/config.rs | 1 + iroh-one/src/content_loader.rs | 4 +- iroh-p2p/src/node.rs | 3 +- iroh-resolver/src/resolver.rs | 12 ++-- iroh-rpc-client/src/client.rs | 121 ++++++++++++++++++++++++--------- iroh-rpc-client/src/config.rs | 10 +++ iroh-share/src/p2p_node.rs | 4 +- iroh-store/benches/rpc.rs | 9 +-- 11 files changed, 120 insertions(+), 49 deletions(-) diff --git a/iroh-api/src/api.rs b/iroh-api/src/api.rs index 07bf7b54f29..48b60eb51d7 100644 --- a/iroh-api/src/api.rs +++ b/iroh-api/src/api.rs @@ -102,7 +102,7 @@ impl Api for Iroh { fn p2p(&self) -> Result { let p2p_client = self.client.try_p2p()?; - Ok(ClientP2p::new(p2p_client.clone())) + Ok(ClientP2p::new(p2p_client)) } fn get_stream( diff --git a/iroh-gateway/src/bad_bits.rs b/iroh-gateway/src/bad_bits.rs index 373c6f5a778..ef74ad3101b 100644 --- a/iroh-gateway/src/bad_bits.rs +++ b/iroh-gateway/src/bad_bits.rs @@ -182,6 +182,7 @@ mod tests { gateway_addr: None, p2p_addr: None, store_addr: None, + channels: Some(1), }, ); config.set_default_headers(); diff --git a/iroh-gateway/src/core.rs b/iroh-gateway/src/core.rs index 664a30a225c..6952b92d370 100644 --- a/iroh-gateway/src/core.rs +++ b/iroh-gateway/src/core.rs @@ -168,6 +168,7 @@ mod tests { gateway_addr: None, p2p_addr: None, store_addr: None, + channels: Some(1), }, ); config.set_default_headers(); @@ -199,6 +200,7 @@ mod tests { gateway_addr: None, p2p_addr: None, store_addr: Some(store_client_addr), + channels: Some(1), }, ); config.set_default_headers(); diff --git a/iroh-one/src/config.rs b/iroh-one/src/config.rs index e7b47273316..792736c897c 100644 --- a/iroh-one/src/config.rs +++ b/iroh-one/src/config.rs @@ -79,6 +79,7 @@ impl Config { gateway_addr: None, p2p_addr: None, store_addr: None, + channels: Some(1), } } diff --git a/iroh-one/src/content_loader.rs b/iroh-one/src/content_loader.rs index 32984cf5231..cf8631cb338 100644 --- a/iroh-one/src/content_loader.rs +++ b/iroh-one/src/content_loader.rs @@ -115,8 +115,8 @@ impl ContentLoader for RacingLoader { let len = cloned.len(); let links_len = links.len(); - if let Some(store_rpc) = rpc.store.as_ref() { - match store_rpc.put(cid, cloned, links).await { + if let Ok(store_rpc) = rpc.try_store() { + match store_rpc.clone().put(cid, cloned, links).await { Ok(_) => debug!("stored {} ({}bytes, {}links)", cid, len, links_len), Err(err) => { warn!("failed to store {}: {:?}", cid, err); diff --git a/iroh-p2p/src/node.rs b/iroh-p2p/src/node.rs index b5c8ea0fc1f..b29694a1482 100644 --- a/iroh-p2p/src/node.rs +++ b/iroh-p2p/src/node.rs @@ -1033,6 +1033,7 @@ mod tests { let cfg = iroh_rpc_client::Config { p2p_addr: Some(rpc_client_addr), + channels: Some(1), ..Default::default() }; let p2p_task = tokio::task::spawn(async move { @@ -1048,7 +1049,7 @@ mod tests { .unwrap(); let mut providers = Vec::new(); - let mut chan = client.p2p.unwrap().fetch_providers_dht(&c).await?; + let mut chan = client.try_p2p().unwrap().fetch_providers_dht(&c).await?; while let Some(new_providers) = chan.next().await { let new_providers = new_providers.unwrap(); println!("providers found: {}", new_providers.len()); diff --git a/iroh-resolver/src/resolver.rs b/iroh-resolver/src/resolver.rs index b553161f541..586b9765462 100644 --- a/iroh-resolver/src/resolver.rs +++ b/iroh-resolver/src/resolver.rs @@ -22,7 +22,7 @@ use libipld::{Ipld, IpldCodec}; use tokio::io::{AsyncRead, AsyncSeek}; use tokio::sync::Mutex; use tokio::task::JoinHandle; -use tracing::{debug, error, trace, warn}; +use tracing::{debug, trace, warn}; use iroh_metrics::{ core::{MObserver, MRecorder}, @@ -797,8 +797,8 @@ impl ContentLoader for Client { // trigger storage in the background let clone = bytes.clone(); - let store = self.store.as_ref().cloned(); - let p2p = self.try_p2p()?.clone(); + let store = self.try_store(); + let p2p = self.try_p2p()?; tokio::spawn(async move { let clone2 = clone.clone(); @@ -809,7 +809,7 @@ impl ContentLoader for Client { let len = clone.len(); let links_len = links.len(); - if let Some(store_rpc) = store.as_ref() { + if let Ok(store_rpc) = store { match store_rpc.put(cid, clone.clone(), links).await { Ok(_) => { debug!("stored {} ({}bytes, {}links)", cid, len, links_len); @@ -851,11 +851,11 @@ impl Resolver { let loader = loader_thread.clone(); tokio::task::spawn(async move { - error!("stopping session {}", session); + debug!("stopping session {}", session); if let Err(err) = loader.stop_session(session).await { warn!("failed to stop session {}: {:?}", session, err); } - error!("stopping session {} done", session); + debug!("stopping session {} done", session); }); } }); diff --git a/iroh-rpc-client/src/client.rs b/iroh-rpc-client/src/client.rs index 16e6f530c6d..96216b35878 100644 --- a/iroh-rpc-client/src/client.rs +++ b/iroh-rpc-client/src/client.rs @@ -1,4 +1,7 @@ -use anyhow::{anyhow, Context, Result}; +use std::sync::atomic::AtomicUsize; +use std::sync::Arc; + +use anyhow::{Context, Result}; #[cfg(feature = "grpc")] use futures::{Stream, StreamExt}; @@ -10,8 +13,62 @@ use crate::store::StoreClient; #[derive(Debug, Clone)] pub struct Client { pub gateway: Option, - pub p2p: Option, - pub store: Option, + p2p: P2pLBClient, + store: StoreLBClient, +} + +/// Provides a load balanced client for the store service +/// The client will round robin between all available StoreClients +#[derive(Debug, Clone)] +pub struct StoreLBClient { + clients: Vec, + pos: Arc, +} + +impl StoreLBClient { + /// round robin load balancing + pub fn get(&self) -> Option { + if self.clients.is_empty() { + return None; + } + let pos = self.pos.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + let c = self.clients.get(pos % self.clients.len()).unwrap(); + Some(c.clone()) + } + + pub fn new() -> Self { + Self { + clients: vec![], + pos: Arc::new(AtomicUsize::new(0)), + } + } +} + +/// Provides a load balanced client for the p2p service +/// The client will round robin between all available P2pClients +#[derive(Debug, Clone)] +pub struct P2pLBClient { + clients: Vec, + pos: Arc, +} + +impl P2pLBClient { + /// round robin load balancing + pub fn get(&self) -> Option { + if self.clients.is_empty() { + return None; + } + let pos = self.pos.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + let c = self.clients.get(pos % self.clients.len()).unwrap(); + Some(c.clone()) + } + + pub fn new() -> Self { + Self { + clients: vec![], + pos: Arc::new(AtomicUsize::new(0)), + } + } } impl Client { @@ -20,6 +77,7 @@ impl Client { gateway_addr, p2p_addr, store_addr, + channels, } = cfg; let gateway = if let Some(addr) = gateway_addr { @@ -32,24 +90,27 @@ impl Client { None }; - let p2p = if let Some(addr) = p2p_addr { - Some( - P2pClient::new(addr) + let n_channels = channels.unwrap_or(1); + + let mut p2p = P2pLBClient::new(); + if let Some(addr) = p2p_addr { + for _i in 0..n_channels { + let sc = P2pClient::new(addr.clone()) .await - .context("Could not create p2p rpc client")?, - ) - } else { - None - }; - let store = if let Some(addr) = store_addr { - Some( - StoreClient::new(addr) + .context("Could not create store rpc client")?; + p2p.clients.push(sc); + } + } + + let mut store = StoreLBClient::new(); + if let Some(addr) = store_addr { + for _i in 0..n_channels { + let sc = StoreClient::new(addr.clone()) .await - .context("Could not create store rpc client")?, - ) - } else { - None - }; + .context("Could not create store rpc client")?; + store.clients.push(sc); + } + } Ok(Client { gateway, @@ -58,22 +119,18 @@ impl Client { }) } - pub fn try_p2p(&self) -> Result<&P2pClient> { - self.p2p - .as_ref() - .ok_or_else(|| anyhow!("missing rpc p2p connnection")) + pub fn try_p2p(&self) -> Result { + self.p2p.get().context("missing rpc p2p connnection") } pub fn try_gateway(&self) -> Result<&GatewayClient> { self.gateway .as_ref() - .ok_or_else(|| anyhow!("missing rpc gateway connnection")) + .context("missing rpc gateway connnection") } - pub fn try_store(&self) -> Result<&StoreClient> { - self.store - .as_ref() - .ok_or_else(|| anyhow!("missing rpc store connnection")) + pub fn try_store(&self) -> Result { + self.store.get().context("missing rpc store connnection") } #[cfg(feature = "grpc")] @@ -83,12 +140,12 @@ impl Client { } else { None }; - let p = if let Some(ref p) = self.p2p { + let p = if let Some(ref p) = self.p2p.get() { Some(p.check().await) } else { None }; - let s = if let Some(ref s) = self.store { + let s = if let Some(ref s) = self.store.get() { Some(s.check().await) } else { None @@ -106,11 +163,11 @@ impl Client { let g = g.watch().await; streams.push(g.boxed()); } - if let Some(ref p) = self.p2p { + if let Some(ref p) = self.p2p.get() { let p = p.watch().await; streams.push(p.boxed()); } - if let Some(ref s) = self.store { + if let Some(ref s) = self.store.get() { let s = s.watch().await; streams.push(s.boxed()); } diff --git a/iroh-rpc-client/src/config.rs b/iroh-rpc-client/src/config.rs index 352acce2281..0c1682246ad 100644 --- a/iroh-rpc-client/src/config.rs +++ b/iroh-rpc-client/src/config.rs @@ -12,6 +12,8 @@ pub struct Config { pub p2p_addr: Option, // store rpc address pub store_addr: Option, + // number of concurent channels + pub channels: Option, } impl Source for Config { @@ -30,6 +32,9 @@ impl Source for Config { if let Some(addr) = &self.store_addr { insert_into_config_map(&mut map, "store_addr", addr.to_string()); } + if let Some(channels) = &self.channels { + insert_into_config_map(&mut map, "channels", channels.to_string()); + } Ok(map) } } @@ -40,6 +45,7 @@ impl Config { gateway_addr: Some("grpc://0.0.0.0:4400".parse().unwrap()), p2p_addr: Some("grpc://0.0.0.0:4401".parse().unwrap()), store_addr: Some("grpc://0.0.0.0:4402".parse().unwrap()), + channels: Some(16), } } } @@ -65,6 +71,10 @@ mod tests { "store_addr".to_string(), Value::new(None, default.store_addr.unwrap().to_string()), ); + expect.insert( + "channels".to_string(), + Value::new(None, default.channels.unwrap().to_string()), + ); let got = Config::default().collect().unwrap(); for key in got.keys() { let left = expect.get(key).unwrap(); diff --git a/iroh-share/src/p2p_node.rs b/iroh-share/src/p2p_node.rs index eee7b2252cb..cd906537f5f 100644 --- a/iroh-share/src/p2p_node.rs +++ b/iroh-share/src/p2p_node.rs @@ -136,11 +136,13 @@ impl P2pNode { p2p_addr: Some(rpc_p2p_addr_client.clone()), store_addr: Some(rpc_store_addr_client.clone()), gateway_addr: None, + channels: Some(1), }; let rpc_p2p_client_config = iroh_rpc_client::Config { p2p_addr: Some(rpc_p2p_addr_client.clone()), store_addr: Some(rpc_store_addr_client.clone()), gateway_addr: None, + channels: Some(1), }; let config = config::Config { libp2p: config::Libp2pConfig { @@ -215,7 +217,7 @@ impl P2pNode { } pub async fn close(self) -> Result<()> { - self.rpc.p2p.unwrap().shutdown().await?; + self.rpc.try_p2p().unwrap().shutdown().await?; self.store_task.abort(); self.p2p_task.await?; self.store_task.await.ok(); diff --git a/iroh-store/benches/rpc.rs b/iroh-store/benches/rpc.rs index 68ef3c9c234..d94976d2649 100644 --- a/iroh-store/benches/rpc.rs +++ b/iroh-store/benches/rpc.rs @@ -88,8 +88,7 @@ pub fn put_benchmark(c: &mut Criterion) { let rpc_ref = &rpc; b.to_async(&executor).iter(|| async move { rpc_ref - .store - .as_ref() + .try_store() .unwrap() .put(*key, black_box(value.clone()), vec![]) .await @@ -148,8 +147,7 @@ pub fn get_benchmark(c: &mut Criterion) { let key = cid::Cid::new_v1(RAW, hash); keys.push(key); rpc_ref - .store - .as_ref() + .try_store() .unwrap() .put(key, value.clone(), vec![]) .await @@ -166,8 +164,7 @@ pub fn get_benchmark(c: &mut Criterion) { for i in 0..iters { let key = keys_ref[(i as usize) % l]; let res = rpc_ref - .store - .as_ref() + .try_store() .unwrap() .get(key) .await From 7b0a63149c9b492c4a71272154514f8b273254bd Mon Sep 17 00:00:00 2001 From: Brendan O'Brien Date: Mon, 24 Oct 2022 13:40:31 -0400 Subject: [PATCH 2/2] race on gateway (#403) moves the racing gateway functionality into the "main" gateway, and changes the configuration location to be gateway.http_resolvers, a vec of strings. --- iroh-gateway/src/cli.rs | 4 +-- iroh-gateway/src/config.rs | 24 ++++++++++--- iroh-gateway/src/main.rs | 9 +++-- iroh-one/src/config.rs | 9 ----- iroh-one/src/lib.rs | 1 - iroh-one/src/main.rs | 8 ++--- iroh-resolver/Cargo.toml | 2 ++ iroh-resolver/src/lib.rs | 1 + .../src/racing.rs | 34 ++++++++++++------- 9 files changed, 57 insertions(+), 35 deletions(-) rename iroh-one/src/content_loader.rs => iroh-resolver/src/racing.rs (83%) diff --git a/iroh-gateway/src/cli.rs b/iroh-gateway/src/cli.rs index e0b6c0bfb71..01dcdbaf6e3 100644 --- a/iroh-gateway/src/cli.rs +++ b/iroh-gateway/src/cli.rs @@ -21,7 +21,7 @@ pub struct Args { #[clap(long)] pub cfg: Option, #[clap(long)] - denylist: bool, + use_denylist: bool, } impl Args { @@ -39,7 +39,7 @@ impl Args { if let Some(cache) = self.cache { map.insert("cache", cache.to_string()); } - map.insert("denylist", self.denylist.to_string()); + map.insert("use_denylist", self.use_denylist.to_string()); map.insert("metrics.collect", self.metrics.to_string()); map.insert("metrics.tracing", self.tracing.to_string()); map diff --git a/iroh-gateway/src/config.rs b/iroh-gateway/src/config.rs index 6a1e7f87136..93479a03012 100644 --- a/iroh-gateway/src/config.rs +++ b/iroh-gateway/src/config.rs @@ -27,7 +27,12 @@ pub struct Config { /// default port to listen on pub port: u16, /// flag to toggle whether the gateway should use denylist on requests - pub denylist: bool, + pub use_denylist: bool, + /// URL of gateways to be used by the racing resolver. + /// strings can either be urls or subdomain gateway roots + /// values without https:// prefix are treated as subdomain gateways (eg: dweb.link) + /// values with are treated as IPFS path gateways (eg: https://ipfs.io) + pub http_resolvers: Option>, /// rpc addresses for the gateway & addresses for the rpc client to dial pub rpc_client: RpcClientConfig, /// metrics configuration @@ -46,8 +51,9 @@ impl Config { headers: HeaderMap::new(), port, rpc_client, + http_resolvers: None, metrics: MetricsConfig::default(), - denylist: false, + use_denylist: false, } } @@ -113,8 +119,9 @@ impl Default for Config { headers: HeaderMap::new(), port: DEFAULT_PORT, rpc_client, + http_resolvers: None, metrics: MetricsConfig::default(), - denylist: false, + use_denylist: false, }; t.set_default_headers(); t @@ -130,7 +137,7 @@ impl Source for Config { let rpc_client = self.rpc_client.collect()?; let mut map: Map = Map::new(); insert_into_config_map(&mut map, "public_url_base", self.public_url_base.clone()); - insert_into_config_map(&mut map, "denylist", self.denylist); + insert_into_config_map(&mut map, "use_denylist", self.use_denylist); // Some issue between deserializing u64 & u16, converting this to // an signed int fixes the issue insert_into_config_map(&mut map, "port", self.port as i32); @@ -138,6 +145,10 @@ impl Source for Config { insert_into_config_map(&mut map, "rpc_client", rpc_client); let metrics = self.metrics.collect()?; insert_into_config_map(&mut map, "metrics", metrics); + + if let Some(http_resolvers) = &self.http_resolvers { + insert_into_config_map(&mut map, "http_resolvers", http_resolvers.clone()); + } Ok(map) } } @@ -200,7 +211,10 @@ mod tests { Value::new(None, default.public_url_base.clone()), ); expect.insert("port".to_string(), Value::new(None, default.port as i64)); - expect.insert("denylist".to_string(), Value::new(None, default.denylist)); + expect.insert( + "use_denylist".to_string(), + Value::new(None, default.use_denylist), + ); expect.insert( "headers".to_string(), Value::new(None, collect_headers(&default.headers).unwrap()), diff --git a/iroh-gateway/src/main.rs b/iroh-gateway/src/main.rs index 81f07e55f36..500c93c1f27 100644 --- a/iroh-gateway/src/main.rs +++ b/iroh-gateway/src/main.rs @@ -9,6 +9,7 @@ use iroh_gateway::{ core::Core, metrics, }; +use iroh_resolver::racing::RacingLoader; use iroh_rpc_client::Client as RpcClient; use iroh_util::lock::ProgramLock; use iroh_util::{iroh_config_path, make_config}; @@ -38,14 +39,18 @@ async fn main() -> Result<()> { println!("{:#?}", config); let metrics_config = config.metrics.clone(); - let bad_bits = match config.denylist { + let bad_bits = match config.use_denylist { true => Arc::new(Some(RwLock::new(BadBits::new()))), false => Arc::new(None), }; let rpc_addr = config .server_rpc_addr()? .ok_or_else(|| anyhow!("missing gateway rpc addr"))?; - let content_loader = RpcClient::new(config.rpc_client.clone()).await?; + + let content_loader = RacingLoader::new( + RpcClient::new(config.rpc_client.clone()).await?, + config.http_resolvers.clone().unwrap_or_default(), + ); let handler = Core::new( Arc::new(config), rpc_addr, diff --git a/iroh-one/src/config.rs b/iroh-one/src/config.rs index 792736c897c..535ec7e92c5 100644 --- a/iroh-one/src/config.rs +++ b/iroh-one/src/config.rs @@ -25,8 +25,6 @@ pub struct Config { /// Path for the UDS socket for the gateway. #[cfg(feature = "uds-gateway")] pub gateway_uds_path: Option, - /// URL of the gateway used by the racing resolver. - pub resolver_gateway: Option, /// Gateway specific configuration. pub gateway: iroh_gateway::config::Config, /// Store specific configuration. @@ -46,7 +44,6 @@ impl Config { p2p: iroh_p2p::config::Config, rpc_client: RpcClientConfig, #[cfg(feature = "uds-gateway")] gateway_uds_path: Option, - resolver_gateway: Option, ) -> Self { Self { gateway, @@ -56,7 +53,6 @@ impl Config { metrics: MetricsConfig::default(), #[cfg(feature = "uds-gateway")] gateway_uds_path, - resolver_gateway, } } @@ -111,7 +107,6 @@ impl Default for Config { p2p: default_p2p_config(rpc_client, metrics_config, key_store_path), #[cfg(feature = "uds-gateway")] gateway_uds_path: Some(gateway_uds_path), - resolver_gateway: None, } } } @@ -163,10 +158,6 @@ impl Source for Config { uds_path.to_str().unwrap().to_string(), ); } - if let Some(resolver_gateway) = &self.resolver_gateway { - insert_into_config_map(&mut map, "resolver_gateway", resolver_gateway.clone()); - } - Ok(map) } } diff --git a/iroh-one/src/lib.rs b/iroh-one/src/lib.rs index e0fa7def378..5cdc0834134 100644 --- a/iroh-one/src/lib.rs +++ b/iroh-one/src/lib.rs @@ -1,6 +1,5 @@ pub mod cli; pub mod config; -pub mod content_loader; pub mod mem_p2p; pub mod mem_store; #[cfg(feature = "uds-gateway")] diff --git a/iroh-one/src/main.rs b/iroh-one/src/main.rs index c2db3354451..9fcd7fb1621 100644 --- a/iroh-one/src/main.rs +++ b/iroh-one/src/main.rs @@ -10,6 +10,7 @@ use iroh_one::{ cli::Args, config::{Config, CONFIG_FILE_NAME, ENV_PREFIX}, }; +use iroh_resolver::racing::RacingLoader; use iroh_rpc_client::Client as RpcClient; use iroh_rpc_types::Addr; use iroh_util::lock::ProgramLock; @@ -70,15 +71,14 @@ async fn main() -> Result<()> { .server_rpc_addr()? .ok_or_else(|| anyhow!("missing gateway rpc addr"))?; - let bad_bits = match config.gateway.denylist { + let bad_bits = match config.gateway.use_denylist { true => Arc::new(Some(RwLock::new(BadBits::new()))), false => Arc::new(None), }; - // let content_loader = RpcClient::new(config.rpc_client.clone()).await?; - let content_loader = iroh_one::content_loader::RacingLoader::new( + let content_loader = RacingLoader::new( RpcClient::new(config.rpc_client.clone()).await?, - config.resolver_gateway.clone(), + config.gateway.http_resolvers.clone().unwrap_or_default(), ); let shared_state = Core::make_state( Arc::new(config.clone()), diff --git a/iroh-resolver/Cargo.toml b/iroh-resolver/Cargo.toml index dff45f0afc7..fe580e619d9 100644 --- a/iroh-resolver/Cargo.toml +++ b/iroh-resolver/Cargo.toml @@ -29,6 +29,8 @@ once_cell = "1.13.0" tokio-util = { version = "0.7", features = ["io"] } libp2p = { version = "0.50", default-features = false } async-channel = "1.7.1" +reqwest = {version = "0.11", features = ["rustls-tls"], default-features = false} +rand = "0.8.5" [dev-dependencies] criterion = { version = "0.4.0", features = ["async_tokio"] } diff --git a/iroh-resolver/src/lib.rs b/iroh-resolver/src/lib.rs index b41409025b6..878d828529b 100644 --- a/iroh-resolver/src/lib.rs +++ b/iroh-resolver/src/lib.rs @@ -2,6 +2,7 @@ pub mod balanced_tree; pub mod chunker; pub mod codecs; pub mod hamt; +pub mod racing; pub mod resolver; pub mod unixfs; pub mod unixfs_builder; diff --git a/iroh-one/src/content_loader.rs b/iroh-resolver/src/racing.rs similarity index 83% rename from iroh-one/src/content_loader.rs rename to iroh-resolver/src/racing.rs index cf8631cb338..039f6e198ae 100644 --- a/iroh-one/src/content_loader.rs +++ b/iroh-resolver/src/racing.rs @@ -1,36 +1,40 @@ -//! A content loader implementation for iroh-one. - +use crate::resolver::{ + parse_links, ContentLoader, ContextId, LoadedCid, LoaderContext, Source, IROH_STORE, +}; use anyhow::{anyhow, Result}; use async_trait::async_trait; use bytes::Bytes; use cid::{multibase, Cid}; use futures::{future::FutureExt, pin_mut, select}; -use iroh_resolver::resolver::{ - parse_links, ContentLoader, ContextId, LoadedCid, LoaderContext, Source, IROH_STORE, -}; use iroh_rpc_client::Client as RpcClient; +use rand::seq::SliceRandom; use tracing::{debug, error, trace, warn}; #[derive(Clone, Debug)] pub struct RacingLoader { rpc_client: RpcClient, - gateway: Option, + http_resolvers: Vec, } impl RacingLoader { - pub fn new(rpc_client: RpcClient, gateway: Option) -> Self { + pub fn new(rpc_client: RpcClient, http_resolvers: Vec) -> Self { Self { rpc_client, - gateway, + http_resolvers, } } } impl RacingLoader { pub fn try_raw_gateway(&self) -> Result<&String> { - self.gateway - .as_ref() - .ok_or_else(|| anyhow!("no gateway configured to fetch raw CIDs")) + match self.http_resolvers.len() { + 0 => Err(anyhow!("no gateway configured to fetch raw CIDs")), + _ => { + let mut rng = rand::thread_rng(); + let gw = self.http_resolvers.choose(&mut rng).unwrap(); + Ok(gw) + } + } } async fn fetch_p2p(&self, ctx: ContextId, cid: &Cid) -> Result { @@ -42,7 +46,13 @@ impl RacingLoader { async fn fetch_http(&self, cid: &Cid) -> Result<(Bytes, String), anyhow::Error> { let gateway = self.try_raw_gateway()?; let cid_str = multibase::encode(multibase::Base::Base32Lower, cid.to_bytes().as_slice()); - let gateway_url = format!("https://{}.ipfs.{}?format=raw", cid_str, gateway); + // support two gateway URL formats: subdomain gateways (eg: dweb.link) + // and full URL (eg: https://ipfs.io) + let gateway_url = if gateway.starts_with("https://") || gateway.starts_with("http://") { + format!("{}/ipfs/{}?format=raw", gateway, cid_str) + } else { + format!("https://{}.ipfs.{}?format=raw", cid_str, gateway) + }; debug!("Will fetch {}", gateway_url); let response = reqwest::get(gateway_url).await?; response