diff --git a/iroh-api/src/api.rs b/iroh-api/src/api.rs index b7673551c1..7cb6952fde 100644 --- a/iroh-api/src/api.rs +++ b/iroh-api/src/api.rs @@ -14,7 +14,6 @@ use iroh_rpc_client::{Client, ClientStatus}; use iroh_unixfs::{ builder::Entry as UnixfsEntry, content_loader::{FullLoader, FullLoaderConfig}, - ResponseClip, }; use iroh_util::{iroh_config_path, make_config}; #[cfg(feature = "testing")] @@ -152,13 +151,13 @@ impl Api { if out.is_dir() { yield (relative_path, OutType::Dir); } else if out.is_symlink() { - let mut reader = out.pretty(resolver.clone(), Default::default(), ResponseClip::NoClip)?; + let mut reader = out.pretty(resolver.clone(), Default::default(), None)?; let mut target = String::new(); reader.read_to_string(&mut target).await?; let target = PathBuf::from(target); yield (relative_path, OutType::Symlink(target)); } else { - let reader = out.pretty(resolver.clone(), Default::default(), ResponseClip::NoClip)?; + let reader = out.pretty(resolver.clone(), Default::default(), None)?; yield (relative_path, OutType::Reader(Box::new(reader))); } } diff --git a/iroh-gateway/src/bad_bits.rs b/iroh-gateway/src/bad_bits.rs index a7c1eecdc8..e60a626f12 100644 --- a/iroh-gateway/src/bad_bits.rs +++ b/iroh-gateway/src/bad_bits.rs @@ -218,7 +218,7 @@ mod tests { let uri = hyper::Uri::builder() .scheme("http") .authority(format!("localhost:{}", addr.port())) - .path_and_query(format!("/ipfs/{}", bad_cid)) + .path_and_query(format!("/ipfs/{bad_cid}")) .build() .unwrap(); let client = hyper::Client::new(); @@ -228,7 +228,7 @@ mod tests { let uri = hyper::Uri::builder() .scheme("http") .authority(format!("localhost:{}", addr.port())) - .path_and_query(format!("/ipfs/{}/{}", bad_cid, bad_path)) + .path_and_query(format!("/ipfs/{bad_cid}/{bad_path}")) .build() .unwrap(); let client = hyper::Client::new(); @@ -238,7 +238,7 @@ mod tests { let uri = hyper::Uri::builder() .scheme("http") .authority(format!("localhost:{}", addr.port())) - .path_and_query(format!("/ipfs/{}/{}", bad_cid_2, bad_path)) + .path_and_query(format!("/ipfs/{bad_cid_2}/{bad_path}")) .build() .unwrap(); let client = hyper::Client::new(); @@ -249,7 +249,7 @@ mod tests { let uri = hyper::Uri::builder() .scheme("http") .authority(format!("localhost:{}", addr.port())) - .path_and_query(format!("/ipfs/{}/{}?format=raw", bad_cid_2, bad_path)) + .path_and_query(format!("/ipfs/{bad_cid_2}/{bad_path}?format=raw")) .build() .unwrap(); let client = hyper::Client::new(); @@ -260,7 +260,7 @@ mod tests { let uri = hyper::Uri::builder() .scheme("http") .authority(format!("localhost:{}", addr.port())) - .path_and_query(format!("/ipfs/{}/{}", bad_cid, good_path)) + .path_and_query(format!("/ipfs/{bad_cid}/{good_path}")) .build() .unwrap(); let client = hyper::Client::new(); @@ -270,7 +270,7 @@ mod tests { let uri = hyper::Uri::builder() .scheme("http") .authority(format!("localhost:{}", addr.port())) - .path_and_query(format!("/ipfs/{}/{}", bad_cid_2, good_path)) + .path_and_query(format!("/ipfs/{bad_cid_2}/{good_path}")) .build() .unwrap(); let client = hyper::Client::new(); @@ -280,7 +280,7 @@ mod tests { let uri = hyper::Uri::builder() .scheme("http") .authority(format!("localhost:{}", addr.port())) - .path_and_query(format!("/ipfs/{}", good_cid)) + .path_and_query(format!("/ipfs/{good_cid}")) .build() .unwrap(); let client = hyper::Client::new(); diff --git a/iroh-gateway/src/client.rs b/iroh-gateway/src/client.rs index 6c10bc7914..cef754e581 100644 --- a/iroh-gateway/src/client.rs +++ b/iroh-gateway/src/client.rs @@ -16,7 +16,7 @@ use iroh_metrics::{ }; use iroh_resolver::dns_resolver::Config; use iroh_resolver::resolver::{CidOrDomain, Metadata, Out, OutPrettyReader, OutType, Resolver}; -use iroh_unixfs::{content_loader::ContentLoader, ResponseClip, Source}; +use iroh_unixfs::{content_loader::ContentLoader, Source}; use mime::Mime; use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWrite}; use tokio_util::io::ReaderStream; @@ -120,15 +120,11 @@ impl Client { let body = FileResult::Directory(path_metadata); Ok((body, metadata)) } else { - let mut clip = 0; - if let Some(range) = &range { - clip = range.end as usize; - } let reader = path_metadata .pretty( self.resolver.clone(), OutMetrics { start: start_time }, - ResponseClip::from(clip), + range.as_ref().map(|range| range.end as usize), ) .map_err(|e| e.to_string())?; @@ -194,7 +190,7 @@ impl Client { let reader = res.pretty( self.resolver.clone(), OutMetrics { start: start_time }, - ResponseClip::NoClip, + None, ); match reader { Ok(mut reader) => { diff --git a/iroh-gateway/src/core.rs b/iroh-gateway/src/core.rs index f1d280e7b2..1438882e45 100644 --- a/iroh-gateway/src/core.rs +++ b/iroh-gateway/src/core.rs @@ -38,7 +38,7 @@ impl Core { ) -> anyhow::Result { tokio::spawn(async move { if let Err(err) = rpc::new(rpc_addr, Gateway::default()).await { - tracing::error!("Failed to run gateway rpc handler: {}", err); + tracing::error!("Failed to run gateway rpc handler: {err}"); } }); let mut templates = HashMap::new(); @@ -123,6 +123,8 @@ mod tests { use axum::response::Response; use cid::Cid; use futures::{StreamExt, TryStreamExt}; + use http::header::CONTENT_RANGE; + use http::HeaderValue; use hyper::Body; use iroh_rpc_client::Client as RpcClient; use iroh_rpc_client::Config as RpcClientConfig; @@ -131,6 +133,9 @@ mod tests { use iroh_unixfs::builder::{DirectoryBuilder, FileBuilder}; use iroh_unixfs::content_loader::{FullLoader, FullLoaderConfig}; use iroh_unixfs::unixfs::UnixfsNode; + use rand::distributions::{Alphanumeric, DistString}; + use rand::rngs::SmallRng; + use rand::SeedableRng; use std::io; use tokio_util::io::StreamReader; @@ -142,7 +147,6 @@ mod tests { file_cids: Vec, core_task: tokio::task::JoinHandle<()>, store_task: tokio::task::JoinHandle<()>, - files: Vec<(String, Vec)>, } impl TestSetup { @@ -209,10 +213,10 @@ mod tests { let store = rpc_client.try_store().unwrap(); let mut cids = vec![]; let mut dir_builder = DirectoryBuilder::new().name(dir); - for (name, content) in files { + for (name, content) in files.iter() { let file = FileBuilder::new() .name(name) - .content_bytes(content.clone()) + .content_bytes(content.to_vec()) .build() .await .unwrap(); @@ -226,7 +230,8 @@ mod tests { cids.push(cid); store.put(cid, bytes, links).await.unwrap(); } - (*cids.last().unwrap(), cids) + let root = *cids.last().unwrap(); + (root, cids) } async fn do_request( @@ -254,7 +259,7 @@ mod tests { .unwrap() } - async fn setup_test(redirect_to_subdomains: bool) -> TestSetup { + async fn setup_test(redirect_to_subdomains: bool, files: &[(String, Vec)]) -> TestSetup { let (store_client_addr, store_task) = spawn_store().await; let mut config = Config::new( 0, @@ -269,11 +274,7 @@ mod tests { config.redirect_to_subdomain = redirect_to_subdomains; let (gateway_addr, rpc_client, core_task) = spawn_gateway(Arc::new(config)).await; let dir = "demo"; - let files = vec![ - ("hello.txt".to_string(), b"ola".to_vec()), - ("world.txt".to_string(), b"mundo".to_vec()), - ]; - let (root_cid, file_cids) = put_directory_with_files(&rpc_client, dir, &files).await; + let (root_cid, file_cids) = put_directory_with_files(&rpc_client, dir, files).await; TestSetup { gateway_addr, @@ -281,7 +282,6 @@ mod tests { file_cids, core_task, store_task, - files, } } @@ -319,7 +319,11 @@ mod tests { // TODO(b5) - refactor to return anyhow::Result<()> #[tokio::test] async fn test_fetch_car_recursive() { - let test_setup = setup_test(false).await; + let files = &[ + ("hello.txt".to_string(), b"ola".to_vec()), + ("world.txt".to_string(), b"mundo".to_vec()), + ]; + let test_setup = setup_test(false, files).await; // request the root cid as a recursive car let res = do_request( @@ -351,29 +355,32 @@ mod tests { HashSet::from_iter(test_setup.file_cids.iter()) ); assert_eq!(cids[0], test_setup.root_cid); - assert_eq!(nodes.len(), test_setup.files.len() + 1); + assert_eq!(nodes.len(), files.len() + 1); assert!(nodes[0].is_dir()); assert_eq!( nodes[0] .links() .map(|link| link.unwrap().name.unwrap().to_string()) .collect::>(), - test_setup - .files + files .iter() .map(|(name, _content)| name.to_string()) .collect::>() ); for (i, node) in nodes[1..].iter().enumerate() { - assert_eq!(node, &UnixfsNode::Raw(test_setup.files[i].1.clone().into())); + assert_eq!(node, &UnixfsNode::Raw(files[i].1.clone().into())); } test_setup.shutdown().await } #[tokio::test] async fn test_head_request_to_file() { - let test_setup = setup_test(false).await; + let files = &[ + ("hello.txt".to_string(), b"ola".to_vec()), + ("world.txt".to_string(), b"mundo".to_vec()), + ]; + let test_setup = setup_test(false, files).await; // request the root cid as a recursive car let res = do_request( @@ -396,7 +403,11 @@ mod tests { #[tokio::test] async fn test_gateway_requests() { - let test_setup = setup_test(false).await; + let files = &[ + ("hello.txt".to_string(), b"ola".to_vec()), + ("world.txt".to_string(), b"mundo".to_vec()), + ]; + let test_setup = setup_test(false, files).await; // request the root cid as a recursive car let res = do_request( @@ -433,30 +444,117 @@ mod tests { HashSet::from_iter(test_setup.file_cids.iter()) ); assert_eq!(cids[0], test_setup.root_cid); - assert_eq!(nodes.len(), test_setup.files.len() + 1); + assert_eq!(nodes.len(), files.len() + 1); assert!(nodes[0].is_dir()); assert_eq!( nodes[0] .links() .map(|link| link.unwrap().name.unwrap().to_string()) .collect::>(), - test_setup - .files + files .iter() .map(|(name, _content)| name.to_string()) .collect::>() ); for (i, node) in nodes[1..].iter().enumerate() { - assert_eq!(node, &UnixfsNode::Raw(test_setup.files[i].1.clone().into())); + assert_eq!(node, &UnixfsNode::Raw(files[i].1.clone().into())); } test_setup.shutdown().await } + #[tokio::test] + async fn test_range_requests() { + let files = [( + "large.txt".to_string(), + Alphanumeric + .sample_string(&mut SmallRng::seed_from_u64(42), 8 * 1024 * 1024) + .bytes() + .collect(), + )]; + let large_file = &files[0].1; + let test_setup = setup_test(false, &files).await; + + // ----------------------------- + + let res = do_request( + "GET", + &format!("localhost:{}", test_setup.gateway_addr.port()), + "/large.txt", + Some(&[ + ("host", &format!("{}.ipfs.localhost", test_setup.root_cid)), + ("range", "bytes=0-1"), + ]), + ) + .await; + + assert_eq!(http::StatusCode::PARTIAL_CONTENT, res.status()); + assert_eq!( + HeaderValue::from_str("bytes 0-1/8388608").unwrap(), + res.headers().get(CONTENT_RANGE).unwrap() + ); + + let (body, _) = res.into_body().into_future().await; + assert_eq!(body.unwrap().unwrap(), large_file[0..2]); + + // ----------------------------- + + let res = do_request( + "GET", + &format!("localhost:{}", test_setup.gateway_addr.port()), + "/large.txt", + Some(&[ + ("host", &format!("{}.ipfs.localhost", test_setup.root_cid)), + ("range", "bytes=4000-1000000"), + ]), + ) + .await; + + assert_eq!(http::StatusCode::PARTIAL_CONTENT, res.status()); + assert_eq!( + HeaderValue::from_str("bytes 4000-1000000/8388608").unwrap(), + res.headers().get(CONTENT_RANGE).unwrap() + ); + + let content = hyper::body::to_bytes(res.into_body()).await.unwrap(); + assert_eq!(content.len(), 1000001 - 4000); + assert_eq!(content, large_file[4000..1000001]); + + let res = do_request( + "GET", + &format!("localhost:{}", test_setup.gateway_addr.port()), + "/large.txt", + Some(&[ + ("host", &format!("{}.ipfs.localhost", test_setup.root_cid)), + ("range", "bytes=0-8388607"), + ]), + ) + .await; + + assert_eq!(http::StatusCode::PARTIAL_CONTENT, res.status()); + assert_eq!( + HeaderValue::from_str("bytes 0-8388607/8388608").unwrap(), + res.headers().get(CONTENT_RANGE).unwrap() + ); + + let content = hyper::body::to_bytes(res.into_body()).await.unwrap(); + assert_eq!(content.len(), large_file.len()); + assert_eq!(content, large_file); + + test_setup.shutdown().await + } + #[tokio::test] async fn test_gateway_redirection() { - let test_setup = setup_test(true).await; + let test_setup = setup_test( + true, + &[ + ("hello.txt".to_string(), b"ola".to_vec()), + ("world.txt".to_string(), b"mundo".to_vec()), + ], + ) + .await; // Usual request let res = do_request( @@ -617,7 +715,7 @@ mod tests { assert_eq!(http::StatusCode::MOVED_PERMANENTLY, res.status()); assert_eq!( - format!("http://{}.ipfs.ipfs.io/world.txt", test_setup.root_cid,), + format!("http://{}.ipfs.ipfs.io/world.txt", test_setup.root_cid), res.headers().get("Location").unwrap().to_str().unwrap(), ); diff --git a/iroh-gateway/src/handler_params.rs b/iroh-gateway/src/handler_params.rs index 9a78ecdd6b..0663edbbfa 100644 --- a/iroh-gateway/src/handler_params.rs +++ b/iroh-gateway/src/handler_params.rs @@ -36,7 +36,7 @@ impl GetParams { if q.is_empty() { q } else { - format!("?{}", q) + format!("?{q}") } } } diff --git a/iroh-gateway/src/handlers.rs b/iroh-gateway/src/handlers.rs index 14039aeecc..dad30f6ab0 100644 --- a/iroh-gateway/src/handlers.rs +++ b/iroh-gateway/src/handlers.rs @@ -205,7 +205,7 @@ async fn request_preprocessing( Err(err) => { return Err(GatewayError::new( StatusCode::BAD_REQUEST, - &format!("invalid header value: {}", err), + &format!("invalid header value: {err}"), )); } }; @@ -257,7 +257,7 @@ pub async fn handler( .retrieve_path_metadata(req.resolved_path) .await .map_err(|e| GatewayError::new(StatusCode::INTERNAL_SERVER_ERROR, &e))?; - add_content_length_header(&mut response_headers, path_metadata.metadata().clone()); + add_content_length_header(&mut response_headers, path_metadata.metadata().size); Ok(GatewayResponse::empty(response_headers)) } Method::GET => { @@ -301,7 +301,7 @@ pub async fn subdomain_handler( let path = Path::from_parts( parsed_subdomain_url.scheme, &inlined_dns_link_to_dns_link(parsed_subdomain_url.cid_or_domain), - path_params.content_path.as_deref().unwrap_or(""), + path_params.content_path.as_deref().unwrap_or("/"), ) .map_err(|e| GatewayError::new(StatusCode::BAD_REQUEST, &e.to_string()))?; handler( @@ -397,7 +397,7 @@ fn protocol_handler_redirect(uri_param: String) -> Result { return Err(GatewayError::new( StatusCode::BAD_REQUEST, - &format!("invalid uri: {}", e), + &format!("invalid uri: {e}"), )); } }; @@ -412,11 +412,11 @@ fn protocol_handler_redirect(uri_param: String) -> Result( )), Err(e) => Err(GatewayError::new( StatusCode::PRECONDITION_FAILED, - &format!("Error checking cache: {}", e), + &format!("Error checking cache: {e}"), )), }, CidOrDomain::Domain(_) => Err(GatewayError::new( @@ -574,9 +574,9 @@ async fn serve_raw( if let Some(res) = etag_check(&headers, &req.cid, &req.format) { return Ok(res); } - add_cache_control_headers(&mut headers, metadata.clone()); - add_ipfs_roots_headers(&mut headers, metadata.clone()); - add_content_length_header(&mut headers, metadata.clone()); + add_cache_control_headers(&mut headers, &metadata); + add_ipfs_roots_headers(&mut headers, &metadata); + add_content_length_header(&mut headers, metadata.size); if let Some(mut capped_range) = range { if let Some(size) = metadata.size { @@ -624,14 +624,14 @@ async fn serve_car( set_content_disposition_headers(&mut headers, &file_name, DISPOSITION_ATTACHMENT); - add_cache_control_headers(&mut headers, metadata.clone()); - add_content_length_header(&mut headers, metadata.clone()); + add_cache_control_headers(&mut headers, &metadata); + add_content_length_header(&mut headers, metadata.size); let etag = format!("W/{}", get_etag(&req.cid, Some(req.format.clone()))); set_etag_headers(&mut headers, etag); if let Some(res) = etag_check(&headers, &req.cid, &req.format) { return Ok(res); } - add_ipfs_roots_headers(&mut headers, metadata); + add_ipfs_roots_headers(&mut headers, &metadata); Ok(GatewayResponse::new(StatusCode::OK, body, headers)) } FileResult::Directory(_) => Err(GatewayError::new( @@ -692,7 +692,7 @@ async fn serve_fs( .get_file(req.resolved_path.clone(), start_time, range.clone()) .await .map_err(|e| GatewayError::new(StatusCode::INTERNAL_SERVER_ERROR, &e))?; - add_ipfs_roots_headers(&mut headers, metadata.clone()); + add_ipfs_roots_headers(&mut headers, &metadata); match body { FileResult::Directory(res) => { let dir_list: anyhow::Result> = res @@ -719,8 +719,14 @@ async fn serve_fs( Some(_) => { // todo(arqu): error on no size // todo(arqu): add lazy seeking - add_cache_control_headers(&mut headers, metadata.clone()); - add_content_length_header(&mut headers, metadata.clone()); + add_cache_control_headers(&mut headers, &metadata); + add_content_length_header( + &mut headers, + range + .as_ref() + .map(|range| range.end - range.start) + .or(metadata.size), + ); set_etag_headers(&mut headers, get_etag(&req.cid, Some(req.format.clone()))); if let Some(res) = etag_check(&headers, &req.cid, &req.format) { return Ok(res); @@ -740,12 +746,12 @@ async fn serve_fs( let content_sniffed_mime = body.get_mime(); add_content_type_headers(&mut headers, &name, content_sniffed_mime); } - if let Some(mut capped_range) = range { + if let Some(mut range) = range { if let Some(size) = metadata.size { - capped_range.end = std::cmp::min(capped_range.end, size); + range.end = std::cmp::min(range.end, size); } - add_etag_range(&mut headers, capped_range.clone()); - add_content_range_headers(&mut headers, capped_range, metadata.size); + add_etag_range(&mut headers, range.clone()); + add_content_range_headers(&mut headers, range, metadata.size); Ok(GatewayResponse::new( StatusCode::PARTIAL_CONTENT, body, @@ -764,8 +770,11 @@ async fn serve_fs( FileResult::Raw(body) => { // todo(arqu): error on no size // todo(arqu): add lazy seeking - add_cache_control_headers(&mut headers, metadata.clone()); - add_content_length_header(&mut headers, metadata.clone()); + add_cache_control_headers(&mut headers, &metadata); + add_content_length_header( + &mut headers, + range.map(|range| range.end - range.start).or(metadata.size), + ); set_etag_headers(&mut headers, get_etag(&req.cid, Some(req.format.clone()))); if let Some(res) = etag_check(&headers, &req.cid, &req.format) { return Ok(res); @@ -873,7 +882,7 @@ async fn serve_fs_dir( ); link.insert( "path".to_string(), - Json::String(format!("{}{}", root_path, name)), + Json::String(format!("{root_path}{name}")), ); link.insert("icon".to_string(), Json::String(icon_class_name(name))); link @@ -928,6 +937,6 @@ pub async fn middleware_error_handler( return GatewayError::new( StatusCode::INTERNAL_SERVER_ERROR, - format!("unhandled internal error: {}", err).as_str(), + format!("unhandled internal error: {err}").as_str(), ); } diff --git a/iroh-gateway/src/headers.rs b/iroh-gateway/src/headers.rs index 628aba1b31..7d1fdf2c2b 100644 --- a/iroh-gateway/src/headers.rs +++ b/iroh-gateway/src/headers.rs @@ -68,7 +68,7 @@ pub fn set_content_disposition_headers(headers: &mut HeaderMap, filename: &str, // TODO: handle non-ascii filenames https://github.com/ipfs/specs/blob/main/http-gateways/PATH_GATEWAY.md#content-disposition-response-header headers.insert( CONTENT_DISPOSITION, - HeaderValue::from_str(&format!("{}; filename={}", disposition, filename)).unwrap(), + HeaderValue::from_str(&format!("{disposition}; filename={filename}")).unwrap(), ); } @@ -103,11 +103,14 @@ pub fn parse_range_header(range: &HeaderValue) -> Option> { if start >= end || end == 0 { return None; } - Some(Range { start, end }) + Some(Range { + start, + end: end + 1, + }) } #[tracing::instrument()] -pub fn add_cache_control_headers(headers: &mut HeaderMap, metadata: Metadata) { +pub fn add_cache_control_headers(headers: &mut HeaderMap, metadata: &Metadata) { if metadata.path.typ() == PathType::Ipns { let lmdt: OffsetDateTime = time::SystemTime::now().into(); // TODO: better last modified headers based on actual dns ttls @@ -121,20 +124,20 @@ pub fn add_cache_control_headers(headers: &mut HeaderMap, metadata: Metadata) { } #[tracing::instrument()] -pub fn add_content_length_header(headers: &mut HeaderMap, metadata: Metadata) { - if let Some(size) = metadata.size { +pub fn add_content_length_header(headers: &mut HeaderMap, content_length: Option) { + if let Some(content_length) = content_length { headers.insert( CONTENT_LENGTH, - HeaderValue::from_str(&size.to_string()).unwrap(), + HeaderValue::from_str(&content_length.to_string()).unwrap(), ); } } #[tracing::instrument()] -pub fn add_ipfs_roots_headers(headers: &mut HeaderMap, metadata: Metadata) { +pub fn add_ipfs_roots_headers(headers: &mut HeaderMap, metadata: &Metadata) { let mut roots = "".to_string(); - for rcid in metadata.resolved_path { - write!(roots, "{},", rcid).unwrap(); + for rcid in &metadata.resolved_path { + write!(roots, "{rcid},").unwrap(); } roots.pop(); headers.insert(&HEADER_X_IPFS_ROOTS, HeaderValue::from_str(&roots).unwrap()); @@ -163,10 +166,10 @@ pub fn get_etag(cid: &CidOrDomain, response_format: Option) -> S if let Some(fmt) = response_format { let ext = fmt.get_extenstion(); if !ext.is_empty() { - suffix = format!(".{}", ext); + suffix = format!(".{ext}"); } } - format!("\"{}{}\"", cid, suffix) + format!("\"{cid}{suffix}\"") } CidOrDomain::Domain(_) => { // TODO: @@ -321,7 +324,7 @@ mod tests { fn parse_range_header_test() { let range = HeaderValue::from_str("bytes=0-10").unwrap(); let r = parse_range_header(&range); - assert_eq!(r, Some(Range { start: 0, end: 10 })); + assert_eq!(r, Some(Range { start: 0, end: 11 })); let range = HeaderValue::from_str("byts=0-10").unwrap(); let r = parse_range_header(&range); @@ -345,7 +348,7 @@ mod tests { r, Some(Range { start: 100, - end: 200 + end: 201 }) ); @@ -455,7 +458,7 @@ mod tests { ), Some(ResponseFormat::Raw), ); - let wetag = format!("W/{}", etag); + let wetag = format!("W/{etag}"); let other_etag = get_etag( &CidOrDomain::Cid( Cid::try_from("bafkreigh2akiscaildcqabsyg3dfr6chu3fgpregiymsck7e7aqa4aaaaa") @@ -463,8 +466,8 @@ mod tests { ), Some(ResponseFormat::Raw), ); - let other_wetag = format!("W/{}", other_etag); - let long_etag = format!("{},{}", other_etag, wetag); + let other_wetag = format!("W/{other_etag}"); + let long_etag = format!("{other_etag},{wetag}"); assert!(etag_matches(any_etag, &etag)); assert!(etag_matches(&etag, &wetag)); diff --git a/iroh-gateway/src/main.rs b/iroh-gateway/src/main.rs index 25e9b9f7af..e02ee70393 100644 --- a/iroh-gateway/src/main.rs +++ b/iroh-gateway/src/main.rs @@ -36,7 +36,7 @@ async fn main() -> Result<()> { ) .unwrap(); config.metrics = metrics::metrics_config_with_compile_time_info(config.metrics); - println!("{:#?}", config); + println!("{config:#?}"); let metrics_config = config.metrics.clone(); let dns_resolver_config = config.dns_resolver.clone(); diff --git a/iroh-gateway/src/response.rs b/iroh-gateway/src/response.rs index 2c07263895..489cee19c6 100644 --- a/iroh-gateway/src/response.rs +++ b/iroh-gateway/src/response.rs @@ -33,7 +33,7 @@ impl std::convert::TryFrom<&str> for ResponseFormat { if rf.starts_with("application/vnd.ipld.") { Ok(ResponseFormat::Fs(rf.to_string())) } else { - Err(format!("{}: {}", ERR_UNSUPPORTED_FORMAT, rf)) + Err(format!("{ERR_UNSUPPORTED_FORMAT}: {rf}")) } } } diff --git a/iroh-gateway/src/templates.rs b/iroh-gateway/src/templates.rs index 124212ffde..bb128b29ce 100644 --- a/iroh-gateway/src/templates.rs +++ b/iroh-gateway/src/templates.rs @@ -25,5 +25,5 @@ pub fn icon_class_name(path: &str) -> String { } else { "_blank" }; - format!("icon-{}", icon) + format!("icon-{icon}") } diff --git a/iroh-resolver/src/dns_resolver.rs b/iroh-resolver/src/dns_resolver.rs index 7814d6a269..f2400c5715 100644 --- a/iroh-resolver/src/dns_resolver.rs +++ b/iroh-resolver/src/dns_resolver.rs @@ -80,7 +80,7 @@ impl DnsResolver { #[tracing::instrument] pub async fn resolve_dnslink(&self, url: &str) -> Result> { - let url = format!("_dnslink.{}.", url); + let url = format!("_dnslink.{url}."); let records = self.resolve_txt_record(&url).await?; let records = records .into_iter() diff --git a/iroh-resolver/src/resolver.rs b/iroh-resolver/src/resolver.rs index aa7e248e80..6e13834776 100644 --- a/iroh-resolver/src/resolver.rs +++ b/iroh-resolver/src/resolver.rs @@ -16,8 +16,8 @@ use iroh_unixfs::{ codecs::Codec, content_loader::{ContentLoader, ContextId, LoaderContext}, parse_links, - unixfs::{poll_read_buf_at_pos, DataType, UnixfsChildStream, UnixfsContentReader, UnixfsNode}, - Block, Link, LoadedCid, ResponseClip, Source, + unixfs::{read_data_to_buf, DataType, UnixfsChildStream, UnixfsContentReader, UnixfsNode}, + Block, Link, LoadedCid, Source, }; use libipld::codec::Encode; use libipld::prelude::Codec as _; @@ -93,15 +93,14 @@ impl Path { let root = Cid::from_str(cid_or_domain).context("invalid cid")?; (PathType::Ipfs, CidOrDomain::Cid(root)) }; - let tail = if tail_path != "/" { - tail_path - .split(&['/', '\\']) - .filter(|s| !s.is_empty()) - .map(String::from) - .collect() - } else { - vec!["".to_string()] - }; + let mut tail = tail_path + .split(&['/', '\\']) + .filter(|s| !s.is_empty()) + .map(String::from) + .collect::>(); + if tail_path.ends_with('/') { + tail.push("".to_string()) + } Ok(Path { typ, root, tail }) } @@ -151,7 +150,7 @@ impl Display for Path { if part.is_empty() { continue; } - write!(f, "/{}", part)?; + write!(f, "/{part}")?; } if self.has_trailing_slash() { @@ -352,38 +351,38 @@ impl Out { self, loader: Resolver, om: OutMetrics, - clip: ResponseClip, + pos_max: Option, ) -> Result> { let pos = 0; match self.content { OutContent::DagPb(_, mut bytes) => { - if let ResponseClip::Clip(n) = clip { - bytes.truncate(n); + if let Some(pos_max) = pos_max { + bytes.truncate(pos_max); } Ok(OutPrettyReader::DagPb(BytesReader { pos, bytes, om })) } OutContent::DagCbor(_, mut bytes) => { - if let ResponseClip::Clip(n) = clip { - bytes.truncate(n); + if let Some(pos_max) = pos_max { + bytes.truncate(pos_max); } Ok(OutPrettyReader::DagCbor(BytesReader { pos, bytes, om })) } OutContent::DagJson(_, mut bytes) => { - if let ResponseClip::Clip(n) = clip { - bytes.truncate(n); + if let Some(pos_max) = pos_max { + bytes.truncate(pos_max); } Ok(OutPrettyReader::DagJson(BytesReader { pos, bytes, om })) } OutContent::Raw(_, mut bytes) => { - if let ResponseClip::Clip(n) = clip { - bytes.truncate(n); + if let Some(pos_max) = pos_max { + bytes.truncate(pos_max); } Ok(OutPrettyReader::Raw(BytesReader { pos, bytes, om })) } OutContent::Unixfs(node) => { let ctx = self.context; let reader = node - .into_content_reader(ctx, loader.loader().clone(), om, clip)? + .into_content_reader(ctx, loader.loader().clone(), om, pos_max)? .ok_or_else(|| anyhow!("cannot read the contents of a directory"))?; Ok(OutPrettyReader::Unixfs(reader)) @@ -519,15 +518,14 @@ impl AsyncRead for OutPrettyReader { | OutPrettyReader::DagJson(bytes_reader) | OutPrettyReader::Raw(bytes_reader) => { let pos_current = bytes_reader.pos; - let res = poll_read_buf_at_pos( + let bytes_read = read_data_to_buf( &mut bytes_reader.pos, - ResponseClip::Clip(bytes_reader.bytes.len()), - &bytes_reader.bytes, + Some(bytes_reader.bytes.len()), + &bytes_reader.bytes[pos_current..], buf, ); - let bytes_read = bytes_reader.pos - pos_current; bytes_reader.om.observe_bytes_read(pos_current, bytes_read); - Poll::Ready(res) + Poll::Ready(Ok(())) } OutPrettyReader::Unixfs(r) => Pin::new(&mut *r).poll_read(cx, buf), } @@ -1156,7 +1154,7 @@ mod tests { ctx, resolver.loader().clone(), OutMetrics::default(), - ResponseClip::Clip(range.end as usize), + Some(range.end as usize), ) .unwrap() .unwrap(); @@ -1214,11 +1212,11 @@ mod tests { let non_dir_path: Path = non_dir_test.parse().unwrap(); let dir_path: Path = dir_test.parse().unwrap(); assert!(non_dir_path.tail().is_empty()); - assert!(dir_path.tail().len() == 1); + assert_eq!(dir_path.tail().len(), 1); assert!(dir_path.tail()[0].is_empty()); - assert!(non_dir_path.to_string() == non_dir_test); - assert!(dir_path.to_string() == dir_test); + assert_eq!(non_dir_path.to_string(), non_dir_test); + assert_eq!(dir_path.to_string(), dir_test); assert!(dir_path.has_trailing_slash()); assert!(!non_dir_path.has_trailing_slash()); } @@ -1293,11 +1291,7 @@ mod tests { let out_bytes = read_to_vec( new_ipld - .pretty( - resolver.clone(), - OutMetrics::default(), - ResponseClip::NoClip, - ) + .pretty(resolver.clone(), OutMetrics::default(), None) .unwrap(), ) .await @@ -1326,11 +1320,7 @@ mod tests { let out_bytes = read_to_vec( new_ipld - .pretty( - resolver.clone(), - OutMetrics::default(), - ResponseClip::NoClip, - ) + .pretty(resolver.clone(), OutMetrics::default(), None) .unwrap(), ) .await @@ -1453,7 +1443,7 @@ mod tests { ipld_hello_txt.context, resolver.loader().clone(), OutMetrics::default(), - ResponseClip::NoClip, + None, ) .unwrap() .unwrap() @@ -1489,7 +1479,7 @@ mod tests { ipld_hello_txt.context, resolver.loader().clone(), OutMetrics::default(), - ResponseClip::NoClip, + None, ) .unwrap() .unwrap() @@ -1552,7 +1542,7 @@ mod tests { ipld_bar_txt.context, resolver.loader().clone(), OutMetrics::default(), - ResponseClip::NoClip, + None, ) .unwrap() .unwrap() @@ -1846,7 +1836,7 @@ mod tests { ipld_hello_txt.context, resolver.loader().clone(), OutMetrics::default(), - ResponseClip::NoClip, + None, ) .unwrap() .unwrap() @@ -1889,7 +1879,7 @@ mod tests { ipld_bar_txt.context, resolver.loader().clone(), OutMetrics::default(), - ResponseClip::NoClip + None ) .unwrap() .unwrap() @@ -1956,7 +1946,7 @@ mod tests { ipld_readme.context, resolver.loader().clone(), OutMetrics::default(), - ResponseClip::NoClip, + None, ) .unwrap() .unwrap(), @@ -2124,7 +2114,7 @@ mod tests { ipld_hello_txt.context, resolver.loader().clone(), OutMetrics::default(), - ResponseClip::NoClip + None ) .unwrap() .unwrap() @@ -2181,7 +2171,7 @@ mod tests { ipld_bar_txt.context, resolver.loader().clone(), OutMetrics::default(), - ResponseClip::NoClip + None ) .unwrap() .unwrap() @@ -2219,7 +2209,7 @@ mod tests { ipld_bar_txt.context, resolver.loader().clone(), OutMetrics::default(), - ResponseClip::NoClip + None ) .unwrap() .unwrap() @@ -2257,7 +2247,7 @@ mod tests { ipld_bar_txt.context, resolver.loader().clone(), OutMetrics::default(), - ResponseClip::NoClip, + None, ) .unwrap() .unwrap() @@ -2295,7 +2285,7 @@ mod tests { ipld_bar_txt.context, resolver.loader().clone(), OutMetrics::default(), - ResponseClip::NoClip + None ) .unwrap() .unwrap() @@ -2323,7 +2313,7 @@ mod tests { ipld_bar_txt.context, resolver.loader().clone(), OutMetrics::default(), - ResponseClip::NoClip + None ) .unwrap() .unwrap() @@ -2389,7 +2379,7 @@ mod tests { ipld_txt.context, resolver.loader().clone(), OutMetrics::default(), - ResponseClip::NoClip + None ) .unwrap() .unwrap() @@ -2460,7 +2450,7 @@ mod tests { ipld_txt.context, resolver.loader().clone(), OutMetrics::default(), - ResponseClip::NoClip + None ) .unwrap() .unwrap() diff --git a/iroh-resolver/tests/roundtrip.rs b/iroh-resolver/tests/roundtrip.rs index d1985a2507..8bbb8b585b 100644 --- a/iroh-resolver/tests/roundtrip.rs +++ b/iroh-resolver/tests/roundtrip.rs @@ -8,7 +8,6 @@ use iroh_unixfs::{ builder::{Directory, DirectoryBuilder, FileBuilder, SymlinkBuilder}, chunker::DEFAULT_CHUNKS_SIZE, content_loader::ContentLoader, - ResponseClip, }; use proptest::prelude::*; use rand::prelude::*; @@ -97,11 +96,7 @@ async fn build_testdir( if item.is_dir() { mkdir(&mut agg, path.tail())?; } else { - let reader = item.pretty( - resolver.clone(), - OutMetrics::default(), - ResponseClip::NoClip, - )?; + let reader = item.pretty(resolver.clone(), OutMetrics::default(), None)?; let data = read_to_vec(reader).await?; mkfile(&mut agg, path.tail(), data.into())?; } @@ -133,7 +128,11 @@ fn dir_roundtrip_test_sync(dir: TestDir, hamt: bool) -> bool { } /// a roundtrip test that converts a file to an unixfs DAG and back -async fn file_roundtrip_test(data: Bytes, chunk_size: usize, degree: usize) -> Result { +async fn file_roundtrip_test( + data: Bytes, + chunk_size: usize, + degree: usize, +) -> Result<(Vec, Vec)> { let file = FileBuilder::new() .name("file.bin") .fixed_chunker(chunk_size) @@ -146,9 +145,8 @@ async fn file_roundtrip_test(data: Bytes, chunk_size: usize, degree: usize) -> R let out = resolver .resolve(iroh_resolver::resolver::Path::from_cid(root)) .await?; - let t = read_to_vec(out.pretty(resolver, OutMetrics::default(), ResponseClip::NoClip)?).await?; - println!("{}", data.len()); - Ok(t == data) + let t = read_to_vec(out.pretty(resolver, OutMetrics::default(), None)?).await?; + Ok((t, data.to_vec())) } /// a roundtrip test that converts a symlink to a unixfs DAG and back @@ -166,7 +164,7 @@ async fn symlink_roundtrip_test() -> Result<()> { let out = resolver .resolve(iroh_resolver::resolver::Path::from_cid(root)) .await?; - let mut reader = out.pretty(resolver, OutMetrics::default(), ResponseClip::NoClip)?; + let mut reader = out.pretty(resolver, OutMetrics::default(), None)?; let mut t = String::new(); reader.read_to_string(&mut t).await?; println!("{}", t); @@ -175,7 +173,7 @@ async fn symlink_roundtrip_test() -> Result<()> { } /// sync version of file_roundtrip_test for use in proptest -fn file_roundtrip_test_sync(data: Bytes, chunk_size: usize, degree: usize) -> bool { +fn file_roundtrip_test_sync(data: Bytes, chunk_size: usize, degree: usize) -> (Vec, Vec) { let f = file_roundtrip_test(data, chunk_size, degree); tokio::runtime::Builder::new_current_thread() .build() @@ -209,7 +207,8 @@ fn arb_chunk_size() -> impl Strategy { proptest! { #[test] fn test_file_roundtrip(data in proptest::collection::vec(any::(), 0usize..1024), chunk_size in arb_chunk_size(), degree in arb_degree()) { - assert!(file_roundtrip_test_sync(data.into(), chunk_size, degree)); + let (left, right) = file_roundtrip_test_sync(data.into(), chunk_size, degree); + assert_eq!(left, right); } #[test] @@ -260,7 +259,8 @@ async fn test_builder_roundtrip_complex_tree_1() -> Result<()> { let mut rng = ChaCha8Rng::from_seed([0; 32]); let mut data = vec![0u8; 1024 * 128]; rng.fill(data.as_mut_slice()); - assert!(file_roundtrip_test(data.into(), 1024, 4).await?); + let (left, right) = file_roundtrip_test(data.into(), 1024, 4).await?; + assert_eq!(left, right); Ok(()) } @@ -270,6 +270,8 @@ async fn test_builder_roundtrip_128m() -> Result<()> { let mut rng = ChaCha8Rng::from_seed([0; 32]); let mut data = vec![0u8; 128 * 1024 * 1024]; rng.fill(data.as_mut_slice()); - assert!(file_roundtrip_test(data.into(), DEFAULT_CHUNKS_SIZE, DEFAULT_DEGREE).await?); + let (left, right) = + file_roundtrip_test(data.into(), DEFAULT_CHUNKS_SIZE, DEFAULT_DEGREE).await?; + assert_eq!(left, right); Ok(()) } diff --git a/iroh-resolver/tests/unixfs.rs b/iroh-resolver/tests/unixfs.rs index 78ed046fe7..99d0482655 100644 --- a/iroh-resolver/tests/unixfs.rs +++ b/iroh-resolver/tests/unixfs.rs @@ -8,7 +8,6 @@ use iroh_unixfs::{ self, builder::FileBuilder, chunker::{self, Chunker}, - ResponseClip, }; async fn read_fixture(path: impl AsRef) -> Result> { @@ -80,9 +79,7 @@ async fn test_dagger_testdata() -> Result<()> { let stream = file.encode().await?; let (root, resolver) = stream_to_resolver(stream).await?; let out = resolver.resolve(Path::from_cid(root)).await?; - let t = - read_to_vec(out.pretty(resolver, OutMetrics::default(), ResponseClip::NoClip)?) - .await?; + let t = read_to_vec(out.pretty(resolver, OutMetrics::default(), None)?).await?; println!("Root: {}", root); println!("Len: {}", data.len()); diff --git a/iroh-rpc-client/src/lib.rs b/iroh-rpc-client/src/lib.rs index f982bf8a97..4015d57523 100644 --- a/iroh-rpc-client/src/lib.rs +++ b/iroh-rpc-client/src/lib.rs @@ -69,7 +69,7 @@ pub async fn open_client(addr: Addr) -> anyhow::Result { - let uri = format!("http://{}", uri).parse()?; + let uri = format!("http://{uri}").parse()?; let channel = http2::ClientChannel::new(uri); let channel = combined::ClientChannel::new(Some(channel), None); Ok(RpcClient::::new(channel)) diff --git a/iroh-share/src/receiver.rs b/iroh-share/src/receiver.rs index 5c0ab3ea24..de2571093b 100644 --- a/iroh-share/src/receiver.rs +++ b/iroh-share/src/receiver.rs @@ -5,7 +5,7 @@ use futures::{ }; use iroh_p2p::NetworkEvent; use iroh_resolver::resolver::{Out, OutPrettyReader, OutType, Path, Resolver, UnixfsType}; -use iroh_unixfs::{Link, ResponseClip}; +use iroh_unixfs::Link; use libp2p::gossipsub::{GossipsubMessage, MessageId, TopicHash}; use libp2p::PeerId; use tokio::sync::mpsc::{channel, Receiver as ChannelReceiver}; @@ -238,8 +238,7 @@ impl Data { } pub fn pretty(self) -> Result> { - self.root - .pretty(self.resolver, Default::default(), ResponseClip::NoClip) + self.root.pretty(self.resolver, Default::default(), None) } pub async fn read_file(&self, link: &Link) -> Result { diff --git a/iroh-store/src/store.rs b/iroh-store/src/store.rs index 2726f9f7f4..a1cb6136c9 100644 --- a/iroh-store/src/store.rs +++ b/iroh-store/src/store.rs @@ -683,8 +683,7 @@ impl<'a> ReadStore<'a> { let n_id = self.db.iterator_cf(&cf.id, IteratorMode::Start).count(); if n_meta != n_id { res.push(format!( - "non bijective mapping between cid and id. Metadata and id cfs have different lengths: {} != {}", - n_meta, n_id + "non bijective mapping between cid and id. Metadata and id cfs have different lengths: {n_meta} != {n_id}" )); } Ok(res) diff --git a/iroh-unixfs/src/chunker.rs b/iroh-unixfs/src/chunker.rs index 1b7678a355..5d134b0ec4 100644 --- a/iroh-unixfs/src/chunker.rs +++ b/iroh-unixfs/src/chunker.rs @@ -49,7 +49,7 @@ pub enum ChunkerConfig { impl Display for ChunkerConfig { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - Self::Fixed(chunk_size) => write!(f, "fixed-{}", chunk_size), + Self::Fixed(chunk_size) => write!(f, "fixed-{chunk_size}"), Self::Rabin => write!(f, "rabin"), } } diff --git a/iroh-unixfs/src/content_loader.rs b/iroh-unixfs/src/content_loader.rs index a8997ee113..188f8d5cec 100644 --- a/iroh-unixfs/src/content_loader.rs +++ b/iroh-unixfs/src/content_loader.rs @@ -101,7 +101,7 @@ impl GatewayUrl { url } GatewayUrl::Subdomain(raw) => { - format!("https://{}.ipfs.{}?format=raw", cid_str, raw).parse()? + format!("https://{cid_str}.ipfs.{raw}?format=raw").parse()? } }; Ok(url) diff --git a/iroh-unixfs/src/lib.rs b/iroh-unixfs/src/lib.rs index d2f90aef61..611c41f781 100644 --- a/iroh-unixfs/src/lib.rs +++ b/iroh-unixfs/src/lib.rs @@ -8,7 +8,7 @@ pub mod indexer; mod types; pub mod unixfs; -pub use crate::types::{Block, Link, LinkRef, Links, LoadedCid, PbLinks, ResponseClip, Source}; +pub use crate::types::{Block, Link, LinkRef, Links, LoadedCid, PbLinks, Source}; use std::collections::BTreeSet; diff --git a/iroh-unixfs/src/types.rs b/iroh-unixfs/src/types.rs index a643d01dd9..9dd618c7b0 100644 --- a/iroh-unixfs/src/types.rs +++ b/iroh-unixfs/src/types.rs @@ -79,23 +79,6 @@ impl Block { } } -/// Holds information if we should clip the response and to what offset -#[derive(Debug, Clone, Copy)] -pub enum ResponseClip { - NoClip, - Clip(usize), -} - -impl From for ResponseClip { - fn from(item: usize) -> Self { - if item == 0 { - ResponseClip::NoClip - } else { - ResponseClip::Clip(item) - } - } -} - #[derive(Debug, Clone, PartialEq, Eq)] pub struct Link { pub cid: Cid, diff --git a/iroh-unixfs/src/unixfs.rs b/iroh-unixfs/src/unixfs.rs index 3efda34e0e..4fe8553e05 100644 --- a/iroh-unixfs/src/unixfs.rs +++ b/iroh-unixfs/src/unixfs.rs @@ -18,7 +18,7 @@ use crate::{ codecs::Codec, content_loader::{ContentLoader, LoaderContext}, hamt::Hamt, - types::{Block, Link, LinkRef, Links, PbLinks, ResponseClip}, + types::{Block, Link, LinkRef, Links, PbLinks}, }; pub(crate) mod unixfs_pb { @@ -306,12 +306,12 @@ impl UnixfsNode { } /// If this is a directory or hamt shard, returns a stream that yields all children of it. - pub fn as_child_reader<'a, 'b: 'a, C: ContentLoader>( - &'a self, + pub fn as_child_reader( + &self, ctx: LoaderContext, loader: C, om: OutMetrics, - ) -> Result>> { + ) -> Result> { match self { UnixfsNode::Raw(_) | UnixfsNode::RawNode(_) @@ -339,7 +339,7 @@ impl UnixfsNode { ctx: LoaderContext, loader: C, om: OutMetrics, - pos_max: ResponseClip, + pos_max: Option, ) -> Result>> { match self { UnixfsNode::Raw(_) @@ -351,7 +351,6 @@ impl UnixfsNode { Ok(Some(UnixfsContentReader::File { root_node: self, pos: 0, - skip_pos: 0, pos_max, current_node: CurrentNodeState::Outer, current_links, @@ -383,9 +382,9 @@ impl<'a> Debug for UnixfsChildStream<'a> { UnixfsChildStream::Hamt { pos, out_metrics, .. } => - write!(f, "UnixfsChildStream::Hamt {{ stream: BoxStream>, pos: {}, out_metrics {:?} }}", pos, out_metrics), + write!(f, "UnixfsChildStream::Hamt {{ stream: BoxStream>, pos: {pos}, out_metrics {out_metrics:?} }}"), UnixfsChildStream::Directory { out_metrics, .. } => - write!(f, "UnixfsChildStream::Directory {{ stream: BoxStream>, out_metrics {:?} }}", out_metrics), + write!(f, "UnixfsChildStream::Directory {{ stream: BoxStream>, out_metrics {out_metrics:?} }}"), } } } @@ -397,9 +396,7 @@ pub enum UnixfsContentReader { /// Absolute position in bytes pos: usize, /// Absolute max position in bytes, only used for clipping responses - pos_max: ResponseClip, - /// Amount of bytes to skip to seek up to pos - skip_pos: usize, + pos_max: Option, /// Current node being operated on, only used for nested nodes (not the root). current_node: CurrentNodeState, /// Stack of links left to traverse. @@ -451,7 +448,6 @@ impl AsyncRead for UnixfsContentReader { root_node, pos, pos_max, - skip_pos, current_node, current_links, loader, @@ -459,18 +455,17 @@ impl AsyncRead for UnixfsContentReader { ctx, } => { let typ = root_node.typ(); - let pos_current = *pos; + let pos_old = *pos; let poll_res = match root_node { UnixfsNode::Raw(data) => { - let res = poll_read_buf_at_pos(pos, *pos_max, data, buf); - Poll::Ready(res) + read_data_to_buf(pos, *pos_max, &data[*pos..], buf); + Poll::Ready(Ok(())) } UnixfsNode::File(node) => poll_read_file_at( cx, node, loader.clone(), pos, - skip_pos, *pos_max, buf, current_links, @@ -479,16 +474,16 @@ impl AsyncRead for UnixfsContentReader { ), UnixfsNode::Symlink(node) => { let data = node.inner.data.as_deref().unwrap_or_default(); - let res = poll_read_buf_at_pos(pos, *pos_max, data, buf); - Poll::Ready(res) + read_data_to_buf(pos, *pos_max, &data[*pos..], buf); + Poll::Ready(Ok(())) } _ => Poll::Ready(Err(std::io::Error::new( std::io::ErrorKind::InvalidData, - format!("unsupported Unixfs type for file types: {:?} ", typ), + format!("unsupported Unixfs type for file types: {typ:?} "), ))), }; - let bytes_read = *pos - pos_current; - out_metrics.observe_bytes_read(pos_current, bytes_read); + let bytes_read = *pos - pos_old; + out_metrics.observe_bytes_read(pos_old, bytes_read); poll_res } } @@ -501,63 +496,58 @@ impl AsyncSeek for UnixfsContentReader { UnixfsContentReader::File { root_node, pos, - pos_max: _, - skip_pos, - current_node: _, - current_links: _, - loader: _, - out_metrics: _, - ctx: _, - } => match position { - std::io::SeekFrom::Start(offset) => { - let mut i = offset as usize; - let data_len = root_node.size(); - if let Some(data_len) = data_len { - if data_len == 0 { - *pos = 0; - return Ok(()); + current_node, + current_links, + .. + } => { + let data_len = root_node.size(); + *current_node = CurrentNodeState::Outer; + *current_links = vec![root_node.links_owned().unwrap()]; + match position { + std::io::SeekFrom::Start(offset) => { + let mut i = offset as usize; + if let Some(data_len) = data_len { + if data_len == 0 { + *pos = 0; + return Ok(()); + } + i = std::cmp::min(i, data_len - 1); } - i = std::cmp::min(i, data_len - 1); + *pos = i; } - *pos = i; - *skip_pos = i; - } - std::io::SeekFrom::End(offset) => { - let data_len = root_node.size(); - if let Some(data_len) = data_len { - if data_len == 0 { - *pos = 0; - return Ok(()); - } - let mut i = (data_len as i64 + offset) % data_len as i64; - if i < 0 { - i += data_len as i64; + std::io::SeekFrom::End(offset) => { + if let Some(data_len) = data_len { + if data_len == 0 { + *pos = 0; + return Ok(()); + } + let mut i = (data_len as i64 + offset) % data_len as i64; + if i < 0 { + i += data_len as i64; + } + *pos = i as usize; + } else { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "cannot seek from end of unknown length", + )); } - *pos = i as usize; - *skip_pos = i as usize; - } else { - return Err(std::io::Error::new( - std::io::ErrorKind::InvalidInput, - "cannot seek from end of unknown length", - )); } - } - std::io::SeekFrom::Current(offset) => { - let mut i = *pos as i64 + offset; - i = std::cmp::max(0, i); - - let data_len = root_node.size(); - if let Some(data_len) = data_len { - if data_len == 0 { - *pos = 0; - return Ok(()); + std::io::SeekFrom::Current(offset) => { + let mut i = *pos as i64 + offset; + i = std::cmp::max(0, i); + + if let Some(data_len) = data_len { + if data_len == 0 { + *pos = 0; + return Ok(()); + } + i = std::cmp::min(i, data_len as i64 - 1); } - i = std::cmp::min(i, data_len as i64 - 1); + *pos = i as usize; } - *pos = i as usize; - *skip_pos = i as usize; } - }, + } } Ok(()) } @@ -567,73 +557,99 @@ impl AsyncSeek for UnixfsContentReader { _cx: &mut Context<'_>, ) -> Poll> { match &mut *self { - UnixfsContentReader::File { - root_node: _, - pos, - pos_max: _, - skip_pos: _, - current_node: _, - current_links: _, - loader: _, - out_metrics: _, - ctx: _, - } => Poll::Ready(Ok(*pos as u64)), + UnixfsContentReader::File { pos, .. } => Poll::Ready(Ok(*pos as u64)), } } } -pub fn poll_read_buf_at_pos( +pub fn read_data_to_buf( pos: &mut usize, - clip: ResponseClip, + pos_max: Option, data: &[u8], buf: &mut tokio::io::ReadBuf<'_>, -) -> std::io::Result<()> { - let mut pos_max = data.len(); - if let ResponseClip::Clip(n) = clip { - pos_max = n; - } - if *pos >= data.len() || *pos >= pos_max { - return Ok(()); - } - let data_len = data.len() - *pos; - let amt = std::cmp::min(data_len, buf.remaining()); - let amt = std::cmp::min(amt, pos_max - *pos); - buf.put_slice(&data[*pos..*pos + amt]); +) -> usize { + let data_to_read = pos_max.map(|pos_max| pos_max - *pos).unwrap_or(data.len()); + let amt = std::cmp::min(std::cmp::min(data_to_read, buf.remaining()), data.len()); + buf.put_slice(&data[..amt]); *pos += amt; + amt +} - Ok(()) +pub fn find_block(node: &UnixfsNode, pos: u64, node_offset: u64) -> (u64, Option) { + let pivots = node + .blocksizes() + .iter() + .scan(node_offset, |state, &x| { + *state += x; + Some(*state) + }) + .collect::>(); + let block_index = match pivots.binary_search(&pos) { + Ok(b) => b + 1, + Err(b) => b, + }; + if block_index < pivots.len() { + let next_node_offset = if block_index > 0 { + pivots[block_index - 1] + } else { + node_offset + }; + (next_node_offset, Some(block_index)) + } else { + (pivots[pivots.len() - 1], None) + } } #[allow(clippy::large_enum_variant)] pub enum CurrentNodeState { + // Initial state Outer, - None, - Loaded(usize, UnixfsNode), - Loading(BoxFuture<'static, Result>), + // Need to load next node from the list + NextNodeRequested { + next_node_offset: usize, + }, + // Node has been loaded and ready to be processed + Loaded { + node_offset: usize, + node_pos: usize, + node: UnixfsNode, + }, + // Ongoing loading of the node + Loading { + node_offset: usize, + fut: BoxFuture<'static, Result>, + }, } impl Debug for CurrentNodeState { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { CurrentNodeState::Outer => write!(f, "CurrentNodeState::Outer"), - CurrentNodeState::None => write!(f, "CurrentNodeState::None"), - CurrentNodeState::Loaded(pos, n) => { - write!(f, "CurrentNodeState::Loaded({:?}, {:?})", pos, n) + CurrentNodeState::NextNodeRequested { next_node_offset } => { + write!(f, "CurrentNodeState::None ({next_node_offset})") + } + CurrentNodeState::Loaded { + node_offset, + node_pos, + node, + } => { + write!( + f, + "CurrentNodeState::Loaded({node_offset:?}, {node_pos:?}, {node:?})" + ) } - CurrentNodeState::Loading(_) => write!(f, "CurrentNodeState::Loading(Fut)"), + CurrentNodeState::Loading { .. } => write!(f, "CurrentNodeState::Loading(Fut)"), } } } fn load_next_node( + next_node_offset: usize, current_node: &mut CurrentNodeState, current_links: &mut Vec>, loader: C, ctx: std::sync::Arc>, ) -> bool { - // Load next node - - // find non empty links let links = loop { if let Some(last_mut) = current_links.last_mut() { if last_mut.is_empty() { @@ -645,7 +661,7 @@ fn load_next_node( } } else { // no links left we are done - return true; + return false; } }; @@ -659,8 +675,11 @@ fn load_next_node( Ok(node) } .boxed(); - *current_node = CurrentNodeState::Loading(fut); - false + *current_node = CurrentNodeState::Loading { + node_offset: next_node_offset, + fut, + }; + true } #[allow(clippy::too_many_arguments)] @@ -669,43 +688,52 @@ fn poll_read_file_at( root_node: &Node, loader: C, pos: &mut usize, - skip_pos: &mut usize, - pos_max: ResponseClip, + pos_max: Option, buf: &mut tokio::io::ReadBuf<'_>, current_links: &mut Vec>, current_node: &mut CurrentNodeState, ctx: std::sync::Arc>, ) -> Poll> { loop { + if let Some(pos_max) = pos_max { + if pos_max <= *pos { + return Poll::Ready(Ok(())); + } + } match current_node { CurrentNodeState::Outer => { // check for links if root_node.outer.links.is_empty() { // simplest case just one file let data = root_node.inner.data.as_deref().unwrap_or(&[][..]); - let res = poll_read_buf_at_pos(pos, pos_max, data, buf); - return Poll::Ready(res); + read_data_to_buf(pos, pos_max, &data[*pos..], buf); + return Poll::Ready(Ok(())); } // read root local data if let Some(ref data) = root_node.inner.data { if *pos < data.len() { - let res = poll_read_buf_at_pos(pos, pos_max, data, buf); - return Poll::Ready(res); + read_data_to_buf(pos, pos_max, &data[*pos..], buf); + return Poll::Ready(Ok(())); } } - *current_node = CurrentNodeState::None; - if load_next_node(current_node, current_links, loader.clone(), ctx.clone()) { - return Poll::Ready(Ok(())); - } + *current_node = CurrentNodeState::NextNodeRequested { + next_node_offset: 0, + }; } - CurrentNodeState::None => { - if load_next_node(current_node, current_links, loader.clone(), ctx.clone()) { + CurrentNodeState::NextNodeRequested { next_node_offset } => { + let loaded_next_node = load_next_node( + *next_node_offset, + current_node, + current_links, + loader.clone(), + ctx.clone(), + ); + if !loaded_next_node { return Poll::Ready(Ok(())); } } - CurrentNodeState::Loading(fut) => { - // Already loading the next node, just wait + CurrentNodeState::Loading { node_offset, fut } => { match fut.poll_unpin(cx) { Poll::Pending => { return Poll::Pending; @@ -713,8 +741,24 @@ fn poll_read_file_at( Poll::Ready(Ok(node)) => { match node.links_owned() { Ok(links) => { - current_links.push(links); - *current_node = CurrentNodeState::Loaded(0, node); + if !links.is_empty() { + let (next_node_offset, block_index) = + find_block(&node, *pos as u64, *node_offset as u64); + if let Some(block_index) = block_index { + let new_links = + links.into_iter().skip(block_index).collect(); + current_links.push(new_links); + } + *current_node = CurrentNodeState::NextNodeRequested { + next_node_offset: next_node_offset as usize, + } + } else { + *current_node = CurrentNodeState::Loaded { + node_offset: *node_offset, + node_pos: *pos - *node_offset, + node, + } + } } Err(e) => { return Poll::Ready(Err(std::io::Error::new( @@ -726,7 +770,9 @@ fn poll_read_file_at( // TODO: do one read } Poll::Ready(Err(e)) => { - *current_node = CurrentNodeState::None; + *current_node = CurrentNodeState::NextNodeRequested { + next_node_offset: *node_offset, + }; return Poll::Ready(Err(std::io::Error::new( std::io::ErrorKind::InvalidData, e.to_string(), @@ -734,98 +780,48 @@ fn poll_read_file_at( } } } - CurrentNodeState::Loaded(ref mut node_pos, ref mut current_node_inner) => { - // already loaded - let ty = current_node_inner.typ(); - match current_node_inner { - UnixfsNode::Raw(data) => { - if node_pos < skip_pos { - if *node_pos + data.len() < *skip_pos { - *skip_pos -= data.len(); - *node_pos += data.len(); - } else { - *node_pos += *skip_pos - *node_pos; - *skip_pos = 0; - } - } - - let old = *node_pos; - let mut node_pos_max = data.len(); - if let ResponseClip::Clip(n) = pos_max { - if *pos >= n + old { - return Poll::Ready(Ok(())); - } - node_pos_max = (n + old) - *pos; - } - let res = poll_read_buf_at_pos( - node_pos, - ResponseClip::Clip(node_pos_max), - data, - buf, - ); - // advance global pos - let amt_read = *node_pos - old; - *pos += amt_read; - if amt_read > 0 { - return Poll::Ready(res); - } else if *node_pos == data.len() { - // finished reading this node - if load_next_node( - current_node, - current_links, - loader.clone(), - ctx.clone(), - ) { - return Poll::Ready(Ok(())); - } - } + CurrentNodeState::Loaded { + ref node_offset, + ref mut node_pos, + node: ref mut current_node_inner, + } => match current_node_inner { + UnixfsNode::Raw(data) => { + if *node_offset + data.len() <= *pos { + *current_node = CurrentNodeState::NextNodeRequested { + next_node_offset: node_offset + data.len(), + }; + continue; } - UnixfsNode::File(node) | UnixfsNode::RawNode(node) => { - // read direct node data - if let Some(ref data) = node.inner.data { - if node_pos < skip_pos { - if *node_pos + data.len() < *skip_pos { - *skip_pos -= data.len(); - *node_pos += data.len(); - } else { - *node_pos += *skip_pos - *node_pos; - *skip_pos = 0; - } - } - let old = *node_pos; - let mut node_pos_max = data.len(); - if let ResponseClip::Clip(n) = pos_max { - if *pos >= n + old { - return Poll::Ready(Ok(())); - } - node_pos_max = (n + old) - *pos; - } - let res = poll_read_buf_at_pos( - node_pos, - ResponseClip::Clip(node_pos_max), - data, - buf, - ); - let amt_read = *node_pos - old; - *pos += amt_read; - if amt_read > 0 { - return Poll::Ready(res); - } - } - // follow links - if load_next_node(current_node, current_links, loader.clone(), ctx.clone()) - { - return Poll::Ready(Ok(())); + let bytes_read = read_data_to_buf(pos, pos_max, &data[*node_pos..], buf); + *node_pos += bytes_read; + return Poll::Ready(Ok(())); + } + UnixfsNode::File(node) | UnixfsNode::RawNode(node) => { + if let Some(ref data) = node.inner.data { + if node_offset + data.len() <= *pos { + *current_node = CurrentNodeState::NextNodeRequested { + next_node_offset: node_offset + data.len(), + }; + continue; } + let bytes_read = read_data_to_buf(pos, pos_max, &data[*node_pos..], buf); + *node_pos += bytes_read; + return Poll::Ready(Ok(())); } - _ => { - return Poll::Ready(Err(std::io::Error::new( - std::io::ErrorKind::InvalidData, - format!("invalid type nested in chunked file: {:?}", ty), - ))); - } + *current_node = CurrentNodeState::NextNodeRequested { + next_node_offset: *node_offset, + }; } - } + _ => { + return Poll::Ready(Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + format!( + "invalid type nested in chunked file: {:?}", + current_node_inner.typ() + ), + ))); + } + }, } } } diff --git a/iroh/src/doc.rs b/iroh/src/doc.rs index 895bc55e1d..512c89d1af 100644 --- a/iroh/src/doc.rs +++ b/iroh/src/doc.rs @@ -70,7 +70,7 @@ with `iroh stop`. Daemons provide 'services'. Services work together to fullfill requests. There are three services: - storage - a database of IPFS content + store - a database of IPFS content p2p - peer-2-peer networking functionality gateway - bridge the IPFS network to HTTP @@ -89,7 +89,7 @@ pub const STOP_LONG_DESCRIPTION: &str = " stop turns local iroh services off by killing daemon processes. There are three iroh services, each backed by a daemon: - storage - a database of IPFS content + store - a database of IPFS content p2p - peer-2-peer networking functionality gateway - bridge the IPFS network to HTTP