Skip to content

Commit

Permalink
feat: provide only root, and only at the end of add (#406)
Browse files Browse the repository at this point in the history
* wip feat: provide only root, and only at the end of add

We are still waiting for the add to complete

* Fix fixure based tests

* feat: add progress for DHT update

* require p2p service for add

* fix tests

* add all flag to iroh start

Co-authored-by: b5 <[email protected]>
  • Loading branch information
rklaehn and b5 authored Oct 25, 2022
1 parent 7b0a631 commit 96c6148
Show file tree
Hide file tree
Showing 8 changed files with 106 additions and 22 deletions.
7 changes: 7 additions & 0 deletions iroh-api/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::p2p::MockP2p;
use crate::p2p::{ClientP2p, P2p};
use crate::{AddEvent, IpfsPath};
use anyhow::Result;
use cid::Cid;
use futures::future::{BoxFuture, LocalBoxFuture};
use futures::stream::LocalBoxStream;
use futures::FutureExt;
Expand Down Expand Up @@ -40,6 +41,8 @@ pub trait Api {

fn p2p(&self) -> Result<Self::P>;

fn provide(&self, cid: Cid) -> LocalBoxFuture<'_, Result<()>>;

/// Produces a asynchronous stream of file descriptions
/// Each description is a tuple of a relative path, and either a `Directory` or a `Reader`
/// with the file contents.
Expand Down Expand Up @@ -100,6 +103,10 @@ impl Iroh {
impl Api for Iroh {
type P = ClientP2p;

fn provide(&self, cid: Cid) -> LocalBoxFuture<'_, Result<()>> {
async move { self.client.try_p2p()?.start_providing(&cid).await }.boxed_local()
}

fn p2p(&self) -> Result<ClientP2p> {
let p2p_client = self.client.try_p2p()?;
Ok(ClientP2p::new(p2p_client))
Expand Down
3 changes: 1 addition & 2 deletions iroh-api/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use anyhow::{anyhow, Error};
use std::io;
use thiserror::Error as ThisError;
// use std::error::Error;
use anyhow::{anyhow, Error};

/// LockError is the set of known program lock errors
#[derive(ThisError, Debug)]
Expand Down
5 changes: 3 additions & 2 deletions iroh-resolver/src/unixfs_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -521,8 +521,9 @@ impl Store for StoreAndProvideClient {
}

async fn put(&self, cid: Cid, blob: Bytes, links: Vec<Cid>) -> Result<()> {
self.client.try_store()?.put(cid, blob, links).await?;
self.client.try_p2p()?.start_providing(&cid).await
self.client.try_store()?.put(cid, blob, links).await
// we provide after insertion is finished
// self.client.try_p2p()?.start_providing(&cid).await
}
}

Expand Down
19 changes: 19 additions & 0 deletions iroh/src/fixture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::str::FromStr;

use futures::StreamExt;
use iroh_api::{AddEvent, Cid, Lookup, MockApi, MockP2p, OutType, PeerId};
use iroh_api::{ServiceStatus, StatusRow, StatusTable};
use relative_path::RelativePathBuf;

type GetFixture = fn() -> MockApi;
Expand Down Expand Up @@ -59,6 +60,13 @@ fn fixture_get() -> MockApi {

fn fixture_add_file() -> MockApi {
let mut api = MockApi::default();
api.expect_check().returning(|| {
Box::pin(future::ready(StatusTable::new(
Some(StatusRow::new("gateway", 1, ServiceStatus::Serving)),
Some(StatusRow::new("p2p", 1, ServiceStatus::Serving)),
Some(StatusRow::new("store", 1, ServiceStatus::Serving)),
)))
});
api.expect_add_file().returning(|_ipfs_path, _| {
let cid = Cid::from_str("QmYbcW4tXLXHWw753boCK8Y7uxLu5abXjyYizhLznq9PUR").unwrap();
let add_event = AddEvent::ProgressDelta { cid, size: Some(0) };
Expand All @@ -68,11 +76,20 @@ fn fixture_add_file() -> MockApi {
)])
.boxed_local())))
});
api.expect_provide()
.returning(|_| Box::pin(future::ready(Ok(()))));
api
}

fn fixture_add_directory() -> MockApi {
let mut api = MockApi::default();
api.expect_check().returning(|| {
Box::pin(future::ready(StatusTable::new(
Some(StatusRow::new("gateway", 1, ServiceStatus::Serving)),
Some(StatusRow::new("p2p", 1, ServiceStatus::Serving)),
Some(StatusRow::new("store", 1, ServiceStatus::Serving)),
)))
});
api.expect_add_dir().returning(|_ipfs_path, _| {
let cid = Cid::from_str("QmYbcW4tXLXHWw753boCK8Y7uxLu5abXjyYizhLznq9PUR").unwrap();
let add_event = AddEvent::ProgressDelta { cid, size: Some(0) };
Expand All @@ -82,6 +99,8 @@ fn fixture_add_directory() -> MockApi {
)])
.boxed_local())))
});
api.expect_provide()
.returning(|_| Box::pin(future::ready(Ok(()))));
api
}

Expand Down
62 changes: 52 additions & 10 deletions iroh/src/run.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::path::{Path, PathBuf};

use anyhow::{ensure, Result};
use anyhow::{Context, Result};
use clap::{Parser, Subcommand};
use console::style;
use futures::StreamExt;
Expand All @@ -14,6 +14,7 @@ use crate::doc;
#[cfg(feature = "testing")]
use crate::fixture::get_fixture_api;
use crate::p2p::{run_command as run_p2p_command, P2p};
use crate::services::require_services;
use crate::size::size_stream;

#[derive(Parser, Debug, Clone)]
Expand Down Expand Up @@ -64,6 +65,9 @@ enum Commands {
#[clap(after_help = doc::START_LONG_DESCRIPTION )]
Start {
service: Vec<String>,
/// Start all services
#[clap(short, long)]
all: bool,
},
/// status checks the health of the different processes
#[clap(about = "Check the health of the different iroh services")]
Expand Down Expand Up @@ -134,8 +138,16 @@ impl Cli {
println!("Saving file(s) to {}", root_path.to_str().unwrap());
}
Commands::P2p(p2p) => run_p2p_command(&api.p2p()?, p2p).await?,
Commands::Start { service } => {
crate::services::start(api, service).await?;
Commands::Start { service, all } => {
let mut svc = &vec![
String::from("store"),
String::from("p2p"),
String::from("gateway"),
];
if !*all {
svc = service;
};
crate::services::start(api, svc).await?;
}
Commands::Status { watch } => {
crate::services::status(api, *watch).await?;
Expand All @@ -162,7 +174,14 @@ async fn add(api: &impl Api, path: &Path, no_wrap: bool, recursive: bool) -> Res
path.display()
);
}
println!("{} Calculating size...", style("[1/2]").bold().dim());

// we require p2p for adding right now because we don't have a mechanism for
// hydrating only the root CID to the p2p node for providing if a CID were
// ingested offline. Offline adding should happen, but this is the current
// path of least confusion
require_services(api, HashSet::from(["store", "p2p"])).await?;

println!("{} Calculating size...", style("[1/3]").bold().dim());

let pb = ProgressBar::new_spinner();
let mut total_size: u64 = 0;
Expand All @@ -184,7 +203,7 @@ async fn add(api: &impl Api, path: &Path, no_wrap: bool, recursive: bool) -> Res

println!(
"{} Importing content {}...",
style("[2/2]").bold().dim(),
style("[2/3]").bold().dim(),
human::format_bytes(total_size)
);

Expand All @@ -197,20 +216,43 @@ async fn add(api: &impl Api, path: &Path, no_wrap: bool, recursive: bool) -> Res
pb.inc(0);

let mut progress = api.add_stream(path, !no_wrap).await?;
let mut root = None;
let mut cids = Vec::new();
while let Some(add_event) = progress.next().await {
match add_event? {
AddEvent::ProgressDelta { cid, size } => {
root = Some(cid);
cids.push(cid);
if let Some(size) = size {
pb.inc(size);
}
}
}
}
pb.finish_and_clear();
ensure!(root.is_some(), "File processing failed");
println!("/ipfs/{}", root.unwrap());

let pb = ProgressBar::new(cids.len().try_into().unwrap());
let root = *cids.last().context("File processing failed")?;
// remove everything but the root
cids.splice(0..cids.len() - 1, []);
let rec_str = if cids.len() == 1 { "record" } else { "records" };
println!(
"{} Providing {} {} to the distributed hash table ...",
style("[3/3]").bold().dim(),
cids.len(),
rec_str,
);
pb.set_style(
ProgressStyle::with_template(
"[{elapsed_precise}] {wide_bar} {pos}/{len} ({per_sec}) {msg}",
)
.unwrap(),
);
pb.inc(0);
for cid in cids {
api.provide(cid).await?;
pb.inc(1);
}
pb.finish_and_clear();
println!("/ipfs/{}", root);

Ok(())
}
22 changes: 18 additions & 4 deletions iroh/src/services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::time::SystemTime;
use sysinfo::PidExt;
use tracing::info;

use iroh_api::{Api, ServiceStatus, StatusRow, StatusTable};
use iroh_api::{Api, ApiError, ServiceStatus, StatusRow, StatusTable};
use iroh_util::lock::{LockError, ProgramLock};

const SERVICE_START_TIMEOUT_SECONDS: u64 = 15;
Expand Down Expand Up @@ -88,7 +88,7 @@ async fn start_services(api: &impl Api, services: HashSet<&str>) -> Result<()> {

iroh_localops::process::daemonize(bin_path, log_path.clone())?;

let is_up = ensure_status(api, service, iroh_api::ServiceStatus::Serving).await?;
let is_up = poll_until_status(api, service, iroh_api::ServiceStatus::Serving).await?;
if is_up {
println!("{}", "success".green());
} else {
Expand Down Expand Up @@ -134,7 +134,7 @@ pub async fn stop_services(api: &impl Api, services: HashSet<&str>) -> Result<()
print!("stopping {}... ", &daemon_name);
match iroh_localops::process::stop(pid.as_u32()) {
Ok(_) => {
let is_down = ensure_status(
let is_down = poll_until_status(
api,
service,
iroh_api::ServiceStatus::Down(tonic::Status::unavailable(
Expand Down Expand Up @@ -252,9 +252,23 @@ where
Ok(())
}

/// require a set of services is up
pub async fn require_services(api: &impl Api, services: HashSet<&str>) -> Result<()> {
let table = api.check().await;
for service in table.iter() {
if services.contains(service.name()) && service.status() != iroh_api::ServiceStatus::Serving
{
return Err(anyhow!(ApiError::ConnectionRefused {
service: service.name()
}));
}
}
Ok(())
}

/// poll until a service matches the desired status. returns Ok(true) if status was matched,
/// and Ok(false) if desired status isn't reported before SERVICE_START_TIMEOUT_SECONDS
async fn ensure_status(
async fn poll_until_status(
api: &impl Api,
service: &str,
status: iroh_api::ServiceStatus,
Expand Down
5 changes: 3 additions & 2 deletions iroh/tests/cmd/add_directory.trycmd
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
```
$ iroh add -r mydir
[1/2] Calculating size...
[2/2] Importing content 5 B...
[1/3] Calculating size...
[2/3] Importing content 5 B...
[3/3] Providing 1 record to the distributed hash table ...
/ipfs/QmYbcW4tXLXHWw753boCK8Y7uxLu5abXjyYizhLznq9PUR

```
5 changes: 3 additions & 2 deletions iroh/tests/cmd/add_file.trycmd
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
```
$ iroh add file.txt
[1/2] Calculating size...
[2/2] Importing content 20 B...
[1/3] Calculating size...
[2/3] Importing content 20 B...
[3/3] Providing 1 record to the distributed hash table ...
/ipfs/QmYbcW4tXLXHWw753boCK8Y7uxLu5abXjyYizhLznq9PUR

```

0 comments on commit 96c6148

Please sign in to comment.