Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: improve add experience #401

Merged
merged 3 commits into from
Oct 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion iroh-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,3 @@ thiserror = "1.0"

[dev-dependencies]
tempfile = "3.3.0"
tempdir = "0.3.7"
10 changes: 4 additions & 6 deletions iroh-api/src/api_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,10 @@ pub trait ApiExt: Api {
let add_events = self.add_stream(path, wrap).await?;

add_events
.try_fold(None, |acc, add_event| async move {
Ok(if let AddEvent::Done(cid) = add_event {
Some(cid)
} else {
acc
})
.try_fold(None, |_acc, add_event| async move {
match add_event {
AddEvent::ProgressDelta { cid, .. } => Ok(Some(cid)),
}
})
.await?
.context("No cid found")
Expand Down
2 changes: 1 addition & 1 deletion iroh-bitswap/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ impl<S: Store> Server<S> {
}

pub async fn receive_message(&self, peer: &PeerId, message: &BitswapMessage) {
trace!("server:receive_message from {}", peer);
trace!("server:receive_message from {}: {:?}", peer, message);
inc!(BitswapMetrics::MessagesProcessingServer);
self.engine.message_received(peer, message).await;
// TODO: only track useful messages
Expand Down
38 changes: 11 additions & 27 deletions iroh-resolver/benches/unixfs.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use anyhow::Context;
use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion};
use futures::TryStreamExt;
use iroh_metrics::config::Config as MetricsConfig;
Expand Down Expand Up @@ -51,32 +50,17 @@ pub fn add_benchmark(c: &mut Criterion) {
let rpc = Client::new(rpc_client).await.unwrap();
(task, rpc)
});
let rpc_ref = &rpc;
b.to_async(&executor).iter(|| async move {
let stream =
iroh_resolver::unixfs_builder::add_file(Some(rpc_ref), path, false)
.await
.unwrap();
// we have to consume the stream here, otherwise we are
// not actually benchmarking anything
// TODO(faassen) rewrite the benchmark in terms of the iroh-api which
// can consume the stream for you
let cid = stream
.try_fold(None, |acc, add_event| async move {
Ok(
if let iroh_resolver::unixfs_builder::AddEvent::Done(cid) =
add_event
{
Some(cid)
} else {
acc
},
)
})
.await
.unwrap()
.context("No cid found");
black_box(cid)
b.to_async(&executor).iter(|| {
let rpc = rpc.clone();
async move {
let stream =
iroh_resolver::unixfs_builder::add_file(Some(rpc), path, false)
.await
.unwrap();

let res: Vec<_> = stream.try_collect().await.unwrap();
black_box(res)
}
});
},
);
Expand Down
76 changes: 48 additions & 28 deletions iroh-resolver/src/unixfs_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::{
fmt::Debug,
path::{Path, PathBuf},
pin::Pin,
sync::Arc,
};

use anyhow::{ensure, Result};
Expand Down Expand Up @@ -29,6 +30,9 @@ use crate::{
// adding a generous buffer, we are using 6k as our link limit
const DIRECTORY_LINK_LIMIT: usize = 6000;

/// How many chunks to buffer up when adding content.
const ADD_PAR: usize = 24;

#[derive(Debug, PartialEq)]
enum DirectoryType {
Basic,
Expand Down Expand Up @@ -489,32 +493,44 @@ pub(crate) fn encode_unixfs_pb(
}

#[async_trait]
pub trait Store {
pub trait Store: 'static + Send + Sync + Clone {
async fn has(&self, &cid: Cid) -> Result<bool>;
async fn put(&self, cid: Cid, blob: Bytes, links: Vec<Cid>) -> Result<()>;
}

#[async_trait]
impl Store for &Client {
impl Store for Client {
async fn has(&self, cid: Cid) -> Result<bool> {
self.try_store()?.has(cid).await
}

async fn put(&self, cid: Cid, blob: Bytes, links: Vec<Cid>) -> Result<()> {
self.try_store()?.put(cid, blob, links).await
}
}

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct StoreAndProvideClient {
pub client: Client,
}

#[async_trait]
impl Store for StoreAndProvideClient {
async fn has(&self, cid: Cid) -> Result<bool> {
self.client.try_store()?.has(cid).await
}

async fn put(&self, cid: Cid, blob: Bytes, links: Vec<Cid>) -> Result<()> {
self.client.try_store()?.put(cid, blob, links).await?;
self.client.try_p2p()?.start_providing(&cid).await
}
}

#[async_trait]
impl Store for &tokio::sync::Mutex<std::collections::HashMap<Cid, Bytes>> {
impl Store for Arc<tokio::sync::Mutex<std::collections::HashMap<Cid, Bytes>>> {
async fn has(&self, cid: Cid) -> Result<bool> {
Ok(self.lock().await.contains_key(&cid))
}
async fn put(&self, cid: Cid, blob: Bytes, _links: Vec<Cid>) -> Result<()> {
self.lock().await.insert(cid, blob);
Ok(())
Expand Down Expand Up @@ -592,34 +608,38 @@ pub async fn add_symlink<S: Store>(

/// An event on the add stream
pub enum AddEvent {
/// Delta of progress in bytes
ProgressDelta(u64),
/// The root Cid of the added file, produced once in the end
Done(Cid),
ProgressDelta {
/// The current cid. This is the root on the last event.
cid: Cid,
/// Delta of progress in bytes
size: Option<u64>,
},
}

pub async fn add_blocks_to_store<S: Store>(
store: Option<S>,
mut blocks: Pin<Box<dyn Stream<Item = Result<Block>>>>,
blocks: Pin<Box<dyn Stream<Item = Result<Block>>>>,
) -> impl Stream<Item = Result<AddEvent>> {
async_stream::try_stream! {

let mut root = None;
while let Some(block) = blocks.next().await {
let block = block?;
let raw_data_size = block.raw_data_size();
let (cid, bytes, links) = block.into_parts();
if let Some(ref store) = store {
store.put(cid, bytes, links).await?;
}
if let Some(raw_data_size) = raw_data_size {
yield AddEvent::ProgressDelta(raw_data_size);
}
root = Some(cid);
}
blocks
.map(move |block| {
let store = store.clone();
async move {
let block = block?;
let raw_data_size = block.raw_data_size();
let (cid, bytes, links) = block.into_parts();
if let Some(store) = store {
if !store.has(cid).await? {
store.put(cid, bytes, links).await?;
}
}

yield AddEvent::Done(root.expect("missing root"))
}
Ok(AddEvent::ProgressDelta {
cid,
size: raw_data_size,
})
}
})
.buffered(ADD_PAR)
}

#[async_recursion(?Send)]
Expand All @@ -631,6 +651,7 @@ async fn make_dir_from_path<P: Into<PathBuf>>(path: P) -> Result<Directory> {
.and_then(|s| s.to_str())
.unwrap_or_default(),
);

let mut directory_reader = tokio::fs::read_dir(path.clone()).await?;
while let Some(entry) = directory_reader.next_entry().await? {
let path = entry.path();
Expand Down Expand Up @@ -662,7 +683,6 @@ mod tests {
use rand::prelude::*;
use rand_chacha::ChaCha8Rng;
use std::{collections::BTreeMap, io::prelude::*, sync::Arc};
use tempfile;
use tokio::io::AsyncReadExt;

#[tokio::test]
Expand Down Expand Up @@ -953,7 +973,7 @@ mod tests {
#[cfg(not(windows))]
#[tokio::test]
async fn symlink_from_disk_test() -> Result<()> {
let temp_dir = tempfile::tempdir()?;
let temp_dir = ::tempfile::tempdir()?;
let expect_name = "path_to_symlink";
let expect_target = temp_dir.path().join("path_to_target");
let expect_path = temp_dir.path().join(expect_name);
Expand Down
3 changes: 1 addition & 2 deletions iroh-store/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ impl Source for Config {
#[cfg(test)]
mod tests {
use super::*;
use config::Config as ConfigBuilder;

#[test]
#[cfg(all(feature = "rpc-grpc", unix))]
Expand Down Expand Up @@ -129,7 +128,7 @@ mod tests {
fn test_build_config_from_struct() {
let path = PathBuf::new().join("test");
let expect = Config::new_grpc(path);
let got: Config = ConfigBuilder::builder()
let got: Config = config::Config::builder()
.add_source(expect.clone())
.build()
.unwrap()
Expand Down
1 change: 1 addition & 0 deletions iroh/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ iroh-util = { path = "../iroh-util"}
which = "4.3.0"
sysinfo = "0.26.4"
iroh-localops = { path = "../iroh-localops" }
console = { version = "0.15", default-features = false }

[dev-dependencies]
trycmd = "0.13.7"
Expand Down
24 changes: 12 additions & 12 deletions iroh/src/fixture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,27 +60,27 @@ fn fixture_get() -> MockApi {
fn fixture_add_file() -> MockApi {
let mut api = MockApi::default();
api.expect_add_file().returning(|_ipfs_path, _| {
let add_event = Cid::from_str("QmYbcW4tXLXHWw753boCK8Y7uxLu5abXjyYizhLznq9PUR")
.map(AddEvent::Done)
.map_err(|e| e.into());
let cid = Cid::from_str("QmYbcW4tXLXHWw753boCK8Y7uxLu5abXjyYizhLznq9PUR").unwrap();
let add_event = AddEvent::ProgressDelta { cid, size: Some(0) };

Box::pin(future::ready(Ok(
futures::stream::iter(vec![add_event]).boxed_local()
)))
Box::pin(future::ready(Ok(futures::stream::iter(vec![Ok(
add_event,
)])
.boxed_local())))
});
api
}

fn fixture_add_directory() -> MockApi {
let mut api = MockApi::default();
api.expect_add_dir().returning(|_ipfs_path, _| {
let add_event = Cid::from_str("QmYbcW4tXLXHWw753boCK8Y7uxLu5abXjyYizhLznq9PUR")
.map(AddEvent::Done)
.map_err(|e| e.into());
let cid = Cid::from_str("QmYbcW4tXLXHWw753boCK8Y7uxLu5abXjyYizhLznq9PUR").unwrap();
let add_event = AddEvent::ProgressDelta { cid, size: Some(0) };

Box::pin(future::ready(Ok(
futures::stream::iter(vec![add_event]).boxed_local()
)))
Box::pin(future::ready(Ok(futures::stream::iter(vec![Ok(
add_event,
)])
.boxed_local())))
});
api
}
Expand Down
49 changes: 36 additions & 13 deletions iroh/src/run.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
use std::collections::HashMap;
use std::path::{Path, PathBuf};

use crate::doc;
#[cfg(feature = "testing")]
use crate::fixture::get_fixture_api;
use crate::p2p::{run_command as run_p2p_command, P2p};
use crate::size::size_stream;
use anyhow::Result;
use anyhow::{ensure, Result};
use clap::{Parser, Subcommand};
use console::style;
use futures::StreamExt;
use indicatif::{ProgressBar, ProgressStyle};
use iroh_api::{AddEvent, Api, ApiExt, IpfsPath, Iroh};
use iroh_metrics::config::Config as MetricsConfig;
use iroh_util::human;

use crate::doc;
#[cfg(feature = "testing")]
use crate::fixture::get_fixture_api;
use crate::p2p::{run_command as run_p2p_command, P2p};
use crate::size::size_stream;

#[derive(Parser, Debug, Clone)]
#[clap(version, long_about = None, propagate_version = true)]
Expand Down Expand Up @@ -159,16 +162,32 @@ async fn add(api: &impl Api, path: &Path, no_wrap: bool, recursive: bool) -> Res
path.display()
);
}
println!("{} Calculating size...", style("[1/2]").bold().dim());

let pb = ProgressBar::new_spinner();
pb.set_message("Calculating size...");
let mut total_size: u64 = 0;

pb.set_message(format!(
"Discovered size: {}",
human::format_bytes(total_size)
));
let mut stream = Box::pin(size_stream(path));
while let Some(size_info) = stream.next().await {
total_size += size_info.size;
pb.set_message(format!(
"Discovered size: {}",
human::format_bytes(total_size)
));
pb.inc(1);
}
pb.finish_and_clear();

println!(
"{} Importing content {}...",
style("[2/2]").bold().dim(),
human::format_bytes(total_size)
);

let pb = ProgressBar::new(total_size);
pb.set_style(ProgressStyle::with_template(
"[{elapsed_precise}] {wide_bar} {bytes}/{total_bytes} ({bytes_per_sec}) {msg}",
Expand All @@ -178,16 +197,20 @@ async fn add(api: &impl Api, path: &Path, no_wrap: bool, recursive: bool) -> Res
pb.inc(0);

let mut progress = api.add_stream(path, !no_wrap).await?;
let mut root = None;
while let Some(add_event) = progress.next().await {
match add_event? {
AddEvent::ProgressDelta(size) => {
pb.inc(size);
}
AddEvent::Done(cid) => {
pb.finish_and_clear();
println!("/ipfs/{}", cid);
AddEvent::ProgressDelta { cid, size } => {
root = Some(cid);
if let Some(size) = size {
pb.inc(size);
}
}
}
}
pb.finish_and_clear();
ensure!(root.is_some(), "File processing failed");
println!("/ipfs/{}", root.unwrap());

Ok(())
}
2 changes: 2 additions & 0 deletions iroh/tests/cmd/add_directory.trycmd
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
```
$ iroh add -r mydir
[1/2] Calculating size...
[2/2] Importing content 5 B...
/ipfs/QmYbcW4tXLXHWw753boCK8Y7uxLu5abXjyYizhLznq9PUR

```
Loading