diff --git a/iroh-api/Cargo.toml b/iroh-api/Cargo.toml index 5ef6516c82..c522f95ac9 100644 --- a/iroh-api/Cargo.toml +++ b/iroh-api/Cargo.toml @@ -32,4 +32,3 @@ thiserror = "1.0" [dev-dependencies] tempfile = "3.3.0" -tempdir = "0.3.7" diff --git a/iroh-api/src/api_ext.rs b/iroh-api/src/api_ext.rs index 3ec078027c..512c6af60a 100644 --- a/iroh-api/src/api_ext.rs +++ b/iroh-api/src/api_ext.rs @@ -52,12 +52,10 @@ pub trait ApiExt: Api { let add_events = self.add_stream(path, wrap).await?; add_events - .try_fold(None, |acc, add_event| async move { - Ok(if let AddEvent::Done(cid) = add_event { - Some(cid) - } else { - acc - }) + .try_fold(None, |_acc, add_event| async move { + match add_event { + AddEvent::ProgressDelta { cid, .. } => Ok(Some(cid)), + } }) .await? .context("No cid found") diff --git a/iroh-bitswap/src/server.rs b/iroh-bitswap/src/server.rs index 904845b769..c43feb305b 100644 --- a/iroh-bitswap/src/server.rs +++ b/iroh-bitswap/src/server.rs @@ -293,7 +293,7 @@ impl Server { } pub async fn receive_message(&self, peer: &PeerId, message: &BitswapMessage) { - trace!("server:receive_message from {}", peer); + trace!("server:receive_message from {}: {:?}", peer, message); inc!(BitswapMetrics::MessagesProcessingServer); self.engine.message_received(peer, message).await; // TODO: only track useful messages diff --git a/iroh-resolver/benches/unixfs.rs b/iroh-resolver/benches/unixfs.rs index 85de3b562f..ebce93c980 100644 --- a/iroh-resolver/benches/unixfs.rs +++ b/iroh-resolver/benches/unixfs.rs @@ -1,4 +1,3 @@ -use anyhow::Context; use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion}; use futures::TryStreamExt; use iroh_metrics::config::Config as MetricsConfig; @@ -51,32 +50,17 @@ pub fn add_benchmark(c: &mut Criterion) { let rpc = Client::new(rpc_client).await.unwrap(); (task, rpc) }); - let rpc_ref = &rpc; - b.to_async(&executor).iter(|| async move { - let stream = - iroh_resolver::unixfs_builder::add_file(Some(rpc_ref), path, false) - .await - .unwrap(); - // we have to consume the stream here, otherwise we are - // not actually benchmarking anything - // TODO(faassen) rewrite the benchmark in terms of the iroh-api which - // can consume the stream for you - let cid = stream - .try_fold(None, |acc, add_event| async move { - Ok( - if let iroh_resolver::unixfs_builder::AddEvent::Done(cid) = - add_event - { - Some(cid) - } else { - acc - }, - ) - }) - .await - .unwrap() - .context("No cid found"); - black_box(cid) + b.to_async(&executor).iter(|| { + let rpc = rpc.clone(); + async move { + let stream = + iroh_resolver::unixfs_builder::add_file(Some(rpc), path, false) + .await + .unwrap(); + + let res: Vec<_> = stream.try_collect().await.unwrap(); + black_box(res) + } }); }, ); diff --git a/iroh-resolver/src/unixfs_builder.rs b/iroh-resolver/src/unixfs_builder.rs index 8aa193783b..ff08ce02d0 100644 --- a/iroh-resolver/src/unixfs_builder.rs +++ b/iroh-resolver/src/unixfs_builder.rs @@ -2,6 +2,7 @@ use std::{ fmt::Debug, path::{Path, PathBuf}, pin::Pin, + sync::Arc, }; use anyhow::{ensure, Result}; @@ -29,6 +30,9 @@ use crate::{ // adding a generous buffer, we are using 6k as our link limit const DIRECTORY_LINK_LIMIT: usize = 6000; +/// How many chunks to buffer up when adding content. +const ADD_PAR: usize = 24; + #[derive(Debug, PartialEq)] enum DirectoryType { Basic, @@ -489,24 +493,33 @@ pub(crate) fn encode_unixfs_pb( } #[async_trait] -pub trait Store { +pub trait Store: 'static + Send + Sync + Clone { + async fn has(&self, &cid: Cid) -> Result; async fn put(&self, cid: Cid, blob: Bytes, links: Vec) -> Result<()>; } #[async_trait] -impl Store for &Client { +impl Store for Client { + async fn has(&self, cid: Cid) -> Result { + self.try_store()?.has(cid).await + } + async fn put(&self, cid: Cid, blob: Bytes, links: Vec) -> Result<()> { self.try_store()?.put(cid, blob, links).await } } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct StoreAndProvideClient { pub client: Client, } #[async_trait] impl Store for StoreAndProvideClient { + async fn has(&self, cid: Cid) -> Result { + self.client.try_store()?.has(cid).await + } + async fn put(&self, cid: Cid, blob: Bytes, links: Vec) -> Result<()> { self.client.try_store()?.put(cid, blob, links).await?; self.client.try_p2p()?.start_providing(&cid).await @@ -514,7 +527,10 @@ impl Store for StoreAndProvideClient { } #[async_trait] -impl Store for &tokio::sync::Mutex> { +impl Store for Arc>> { + async fn has(&self, cid: Cid) -> Result { + Ok(self.lock().await.contains_key(&cid)) + } async fn put(&self, cid: Cid, blob: Bytes, _links: Vec) -> Result<()> { self.lock().await.insert(cid, blob); Ok(()) @@ -592,34 +608,38 @@ pub async fn add_symlink( /// An event on the add stream pub enum AddEvent { - /// Delta of progress in bytes - ProgressDelta(u64), - /// The root Cid of the added file, produced once in the end - Done(Cid), + ProgressDelta { + /// The current cid. This is the root on the last event. + cid: Cid, + /// Delta of progress in bytes + size: Option, + }, } pub async fn add_blocks_to_store( store: Option, - mut blocks: Pin>>>, + blocks: Pin>>>, ) -> impl Stream> { - async_stream::try_stream! { - - let mut root = None; - while let Some(block) = blocks.next().await { - let block = block?; - let raw_data_size = block.raw_data_size(); - let (cid, bytes, links) = block.into_parts(); - if let Some(ref store) = store { - store.put(cid, bytes, links).await?; - } - if let Some(raw_data_size) = raw_data_size { - yield AddEvent::ProgressDelta(raw_data_size); - } - root = Some(cid); - } + blocks + .map(move |block| { + let store = store.clone(); + async move { + let block = block?; + let raw_data_size = block.raw_data_size(); + let (cid, bytes, links) = block.into_parts(); + if let Some(store) = store { + if !store.has(cid).await? { + store.put(cid, bytes, links).await?; + } + } - yield AddEvent::Done(root.expect("missing root")) - } + Ok(AddEvent::ProgressDelta { + cid, + size: raw_data_size, + }) + } + }) + .buffered(ADD_PAR) } #[async_recursion(?Send)] @@ -631,6 +651,7 @@ async fn make_dir_from_path>(path: P) -> Result { .and_then(|s| s.to_str()) .unwrap_or_default(), ); + let mut directory_reader = tokio::fs::read_dir(path.clone()).await?; while let Some(entry) = directory_reader.next_entry().await? { let path = entry.path(); @@ -662,7 +683,6 @@ mod tests { use rand::prelude::*; use rand_chacha::ChaCha8Rng; use std::{collections::BTreeMap, io::prelude::*, sync::Arc}; - use tempfile; use tokio::io::AsyncReadExt; #[tokio::test] @@ -953,7 +973,7 @@ mod tests { #[cfg(not(windows))] #[tokio::test] async fn symlink_from_disk_test() -> Result<()> { - let temp_dir = tempfile::tempdir()?; + let temp_dir = ::tempfile::tempdir()?; let expect_name = "path_to_symlink"; let expect_target = temp_dir.path().join("path_to_target"); let expect_path = temp_dir.path().join(expect_name); diff --git a/iroh-store/src/config.rs b/iroh-store/src/config.rs index 021fc2817a..8984e75c95 100644 --- a/iroh-store/src/config.rs +++ b/iroh-store/src/config.rs @@ -94,7 +94,6 @@ impl Source for Config { #[cfg(test)] mod tests { use super::*; - use config::Config as ConfigBuilder; #[test] #[cfg(all(feature = "rpc-grpc", unix))] @@ -129,7 +128,7 @@ mod tests { fn test_build_config_from_struct() { let path = PathBuf::new().join("test"); let expect = Config::new_grpc(path); - let got: Config = ConfigBuilder::builder() + let got: Config = config::Config::builder() .add_source(expect.clone()) .build() .unwrap() diff --git a/iroh/Cargo.toml b/iroh/Cargo.toml index 876240cb82..43acdb89c5 100644 --- a/iroh/Cargo.toml +++ b/iroh/Cargo.toml @@ -28,6 +28,7 @@ iroh-util = { path = "../iroh-util"} which = "4.3.0" sysinfo = "0.26.4" iroh-localops = { path = "../iroh-localops" } +console = { version = "0.15", default-features = false } [dev-dependencies] trycmd = "0.13.7" diff --git a/iroh/src/fixture.rs b/iroh/src/fixture.rs index b9ad0ff4a5..8a14b6a291 100644 --- a/iroh/src/fixture.rs +++ b/iroh/src/fixture.rs @@ -60,13 +60,13 @@ fn fixture_get() -> MockApi { fn fixture_add_file() -> MockApi { let mut api = MockApi::default(); api.expect_add_file().returning(|_ipfs_path, _| { - let add_event = Cid::from_str("QmYbcW4tXLXHWw753boCK8Y7uxLu5abXjyYizhLznq9PUR") - .map(AddEvent::Done) - .map_err(|e| e.into()); + let cid = Cid::from_str("QmYbcW4tXLXHWw753boCK8Y7uxLu5abXjyYizhLznq9PUR").unwrap(); + let add_event = AddEvent::ProgressDelta { cid, size: Some(0) }; - Box::pin(future::ready(Ok( - futures::stream::iter(vec![add_event]).boxed_local() - ))) + Box::pin(future::ready(Ok(futures::stream::iter(vec![Ok( + add_event, + )]) + .boxed_local()))) }); api } @@ -74,13 +74,13 @@ fn fixture_add_file() -> MockApi { fn fixture_add_directory() -> MockApi { let mut api = MockApi::default(); api.expect_add_dir().returning(|_ipfs_path, _| { - let add_event = Cid::from_str("QmYbcW4tXLXHWw753boCK8Y7uxLu5abXjyYizhLznq9PUR") - .map(AddEvent::Done) - .map_err(|e| e.into()); + let cid = Cid::from_str("QmYbcW4tXLXHWw753boCK8Y7uxLu5abXjyYizhLznq9PUR").unwrap(); + let add_event = AddEvent::ProgressDelta { cid, size: Some(0) }; - Box::pin(future::ready(Ok( - futures::stream::iter(vec![add_event]).boxed_local() - ))) + Box::pin(future::ready(Ok(futures::stream::iter(vec![Ok( + add_event, + )]) + .boxed_local()))) }); api } diff --git a/iroh/src/run.rs b/iroh/src/run.rs index 8245c51d6c..08ca0a476b 100644 --- a/iroh/src/run.rs +++ b/iroh/src/run.rs @@ -1,17 +1,20 @@ use std::collections::HashMap; use std::path::{Path, PathBuf}; -use crate::doc; -#[cfg(feature = "testing")] -use crate::fixture::get_fixture_api; -use crate::p2p::{run_command as run_p2p_command, P2p}; -use crate::size::size_stream; -use anyhow::Result; +use anyhow::{ensure, Result}; use clap::{Parser, Subcommand}; +use console::style; use futures::StreamExt; use indicatif::{ProgressBar, ProgressStyle}; use iroh_api::{AddEvent, Api, ApiExt, IpfsPath, Iroh}; use iroh_metrics::config::Config as MetricsConfig; +use iroh_util::human; + +use crate::doc; +#[cfg(feature = "testing")] +use crate::fixture::get_fixture_api; +use crate::p2p::{run_command as run_p2p_command, P2p}; +use crate::size::size_stream; #[derive(Parser, Debug, Clone)] #[clap(version, long_about = None, propagate_version = true)] @@ -159,16 +162,32 @@ async fn add(api: &impl Api, path: &Path, no_wrap: bool, recursive: bool) -> Res path.display() ); } + println!("{} Calculating size...", style("[1/2]").bold().dim()); + let pb = ProgressBar::new_spinner(); - pb.set_message("Calculating size..."); let mut total_size: u64 = 0; + + pb.set_message(format!( + "Discovered size: {}", + human::format_bytes(total_size) + )); let mut stream = Box::pin(size_stream(path)); while let Some(size_info) = stream.next().await { total_size += size_info.size; + pb.set_message(format!( + "Discovered size: {}", + human::format_bytes(total_size) + )); pb.inc(1); } pb.finish_and_clear(); + println!( + "{} Importing content {}...", + style("[2/2]").bold().dim(), + human::format_bytes(total_size) + ); + let pb = ProgressBar::new(total_size); pb.set_style(ProgressStyle::with_template( "[{elapsed_precise}] {wide_bar} {bytes}/{total_bytes} ({bytes_per_sec}) {msg}", @@ -178,16 +197,20 @@ async fn add(api: &impl Api, path: &Path, no_wrap: bool, recursive: bool) -> Res pb.inc(0); let mut progress = api.add_stream(path, !no_wrap).await?; + let mut root = None; while let Some(add_event) = progress.next().await { match add_event? { - AddEvent::ProgressDelta(size) => { - pb.inc(size); - } - AddEvent::Done(cid) => { - pb.finish_and_clear(); - println!("/ipfs/{}", cid); + AddEvent::ProgressDelta { cid, size } => { + root = Some(cid); + if let Some(size) = size { + pb.inc(size); + } } } } + pb.finish_and_clear(); + ensure!(root.is_some(), "File processing failed"); + println!("/ipfs/{}", root.unwrap()); + Ok(()) } diff --git a/iroh/tests/cmd/add_directory.trycmd b/iroh/tests/cmd/add_directory.trycmd index a795a78f16..086daf0339 100644 --- a/iroh/tests/cmd/add_directory.trycmd +++ b/iroh/tests/cmd/add_directory.trycmd @@ -1,5 +1,7 @@ ``` $ iroh add -r mydir +[1/2] Calculating size... +[2/2] Importing content 5 B... /ipfs/QmYbcW4tXLXHWw753boCK8Y7uxLu5abXjyYizhLznq9PUR ``` \ No newline at end of file diff --git a/iroh/tests/cmd/add_file.trycmd b/iroh/tests/cmd/add_file.trycmd index 2bd8e25f07..6e0e8c3fa6 100644 --- a/iroh/tests/cmd/add_file.trycmd +++ b/iroh/tests/cmd/add_file.trycmd @@ -1,5 +1,7 @@ ``` $ iroh add file.txt +[1/2] Calculating size... +[2/2] Importing content 20 B... /ipfs/QmYbcW4tXLXHWw753boCK8Y7uxLu5abXjyYizhLznq9PUR ``` \ No newline at end of file