Skip to content

Commit

Permalink
feat: Send all the things (#617)
Browse files Browse the repository at this point in the history
  • Loading branch information
rklaehn authored Dec 19, 2022
1 parent 22d68df commit 214d9ce
Show file tree
Hide file tree
Showing 8 changed files with 43 additions and 43 deletions.
16 changes: 8 additions & 8 deletions iroh-api/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::IpfsPath;
use crate::P2pApi;
use anyhow::{ensure, Context, Result};
use cid::Cid;
use futures::stream::LocalBoxStream;
use futures::stream::BoxStream;
use futures::{StreamExt, TryStreamExt};
use iroh_resolver::resolver::Resolver;
use iroh_rpc_client::Client;
Expand Down Expand Up @@ -40,7 +40,7 @@ pub struct Api {

pub enum OutType {
Dir,
Reader(Box<dyn AsyncRead + Unpin>),
Reader(Box<dyn AsyncRead + Unpin + Send>),
Symlink(PathBuf),
}

Expand Down Expand Up @@ -126,7 +126,7 @@ impl Api {
pub fn get(
&self,
ipfs_path: &IpfsPath,
) -> Result<LocalBoxStream<'static, Result<(RelativePathBuf, OutType)>>> {
) -> Result<BoxStream<'static, Result<(RelativePathBuf, OutType)>>> {
ensure!(
ipfs_path.cid().is_some(),
"IPFS path does not refer to a CID"
Expand Down Expand Up @@ -165,15 +165,15 @@ impl Api {
}
};

Ok(stream.boxed_local())
Ok(stream.boxed())
}

pub async fn check(&self) -> StatusTable {
self.client.check().await
}

pub async fn watch(&self) -> LocalBoxStream<'static, StatusTable> {
self.client.clone().watch().await.boxed_local()
pub async fn watch(&self) -> BoxStream<'static, StatusTable> {
self.client.clone().watch().await.boxed()
}

/// The `add_stream` method encodes the entry into a DAG and adds
Expand All @@ -184,9 +184,9 @@ impl Api {
pub async fn add_stream(
&self,
entry: UnixfsEntry,
) -> Result<LocalBoxStream<'static, Result<(Cid, u64)>>> {
) -> Result<BoxStream<'static, Result<(Cid, u64)>>> {
let blocks = match entry {
UnixfsEntry::File(f) => f.encode().await?.boxed_local(),
UnixfsEntry::File(f) => f.encode().await?.boxed(),
UnixfsEntry::Directory(d) => d.encode(),
UnixfsEntry::Symlink(s) => Box::pin(async_stream::try_stream! {
yield s.encode()?
Expand Down
4 changes: 2 additions & 2 deletions iroh-api/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ impl Store for Arc<tokio::sync::Mutex<std::collections::HashMap<Cid, Bytes>>> {

fn add_blocks_to_store_chunked<S: Store>(
store: S,
mut blocks: Pin<Box<dyn Stream<Item = Result<Block>>>>,
mut blocks: Pin<Box<dyn Stream<Item = Result<Block>> + Send>>,
) -> impl Stream<Item = Result<(Cid, u64)>> {
let mut chunk = Vec::new();
let mut chunk_size = 0u64;
Expand Down Expand Up @@ -87,7 +87,7 @@ fn add_blocks_to_store_chunked<S: Store>(

pub async fn add_blocks_to_store<S: Store>(
store: Option<S>,
blocks: Pin<Box<dyn Stream<Item = Result<Block>>>>,
blocks: Pin<Box<dyn Stream<Item = Result<Block>> + Send>>,
) -> impl Stream<Item = Result<(Cid, u64)>> {
add_blocks_to_store_chunked(store.unwrap(), blocks)
}
4 changes: 2 additions & 2 deletions iroh-unixfs/src/balanced_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ impl TreeBuilder {

pub fn stream_tree(
&self,
chunks: impl Stream<Item = std::io::Result<Bytes>>,
chunks: impl Stream<Item = std::io::Result<Bytes>> + Send,
) -> impl Stream<Item = Result<Block>> {
match self {
TreeBuilder::Balanced { degree } => stream_balanced_tree(chunks, *degree),
Expand All @@ -48,7 +48,7 @@ struct LinkInfo {
}

fn stream_balanced_tree(
in_stream: impl Stream<Item = std::io::Result<Bytes>>,
in_stream: impl Stream<Item = std::io::Result<Bytes>> + Send,
degree: usize,
) -> impl Stream<Item = Result<Block>> {
try_stream! {
Expand Down
28 changes: 14 additions & 14 deletions iroh-unixfs/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use anyhow::{ensure, Context, Result};
use async_recursion::async_recursion;
use bytes::Bytes;
use futures::{
stream::{self, LocalBoxStream},
stream::{self, BoxStream},
Stream, StreamExt, TryFutureExt,
};
use prost::Message;
Expand Down Expand Up @@ -104,7 +104,7 @@ impl Directory {
current.expect("must not be empty")
}

pub fn encode<'a>(self) -> LocalBoxStream<'a, Result<Block>> {
pub fn encode<'a>(self) -> BoxStream<'a, Result<Block>> {
match self {
Directory::Basic(basic) => basic.encode(),
Directory::Hamt(hamt) => hamt.encode(),
Expand All @@ -113,7 +113,7 @@ impl Directory {
}

impl BasicDirectory {
pub fn encode<'a>(self) -> LocalBoxStream<'a, Result<Block>> {
pub fn encode<'a>(self) -> BoxStream<'a, Result<Block>> {
async_stream::try_stream! {
let mut links = Vec::new();
for entry in self.entries {
Expand Down Expand Up @@ -143,18 +143,18 @@ impl BasicDirectory {
let node = UnixfsNode::Directory(Node { outer, inner });
yield node.encode()?;
}
.boxed_local()
.boxed()
}
}

impl HamtDirectory {
pub fn encode<'a>(self) -> LocalBoxStream<'a, Result<Block>> {
pub fn encode<'a>(self) -> BoxStream<'a, Result<Block>> {
self.hamt.encode()
}
}

enum Content {
Reader(Pin<Box<dyn AsyncRead>>),
Reader(Pin<Box<dyn AsyncRead + Send>>),
Path(PathBuf),
}

Expand Down Expand Up @@ -281,7 +281,7 @@ impl Symlink {
pub struct FileBuilder {
name: Option<String>,
path: Option<PathBuf>,
reader: Option<Pin<Box<dyn AsyncRead>>>,
reader: Option<Pin<Box<dyn AsyncRead + Send>>>,
chunker: Chunker,
degree: usize,
}
Expand Down Expand Up @@ -359,7 +359,7 @@ impl FileBuilder {
self
}

pub fn content_reader<T: tokio::io::AsyncRead + 'static>(mut self, content: T) -> Self {
pub fn content_reader<T: tokio::io::AsyncRead + Send + 'static>(mut self, content: T) -> Self {
self.reader = Some(Box::pin(content));
self
}
Expand Down Expand Up @@ -419,11 +419,11 @@ impl Entry {
}
}

pub async fn encode(self) -> Result<LocalBoxStream<'static, Result<Block>>> {
pub async fn encode(self) -> Result<BoxStream<'static, Result<Block>>> {
Ok(match self {
Entry::File(f) => f.encode().await?.boxed_local(),
Entry::File(f) => f.encode().await?.boxed(),
Entry::Directory(d) => d.encode(),
Entry::Symlink(s) => stream::iter(Some(s.encode())).boxed_local(),
Entry::Symlink(s) => stream::iter(Some(s.encode())).boxed(),
})
}

Expand Down Expand Up @@ -637,7 +637,7 @@ impl HamtNode {
}
}

pub fn encode<'a>(self) -> LocalBoxStream<'a, Result<Block>> {
pub fn encode<'a>(self) -> BoxStream<'a, Result<Block>> {
match self {
Self::Branch(tree) => {
async_stream::try_stream! {
Expand Down Expand Up @@ -673,11 +673,11 @@ impl HamtNode {
let node = UnixfsNode::Directory(crate::unixfs::Node { outer, inner });
yield node.encode()?;
}
.boxed_local()
.boxed()
}
Self::Leaf(HamtLeaf(_hash, entry)) => async move { entry.encode().await }
.try_flatten_stream()
.boxed_local(),
.boxed(),
}
}
}
Expand Down
8 changes: 4 additions & 4 deletions iroh-unixfs/src/chunker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::{

use anyhow::{anyhow, Context};
use bytes::Bytes;
use futures::{stream::LocalBoxStream, Stream};
use futures::{stream::BoxStream, Stream};
use tokio::io::AsyncRead;

mod fixed;
Expand Down Expand Up @@ -92,8 +92,8 @@ impl From<ChunkerConfig> for Chunker {
}

pub enum ChunkerStream<'a> {
Fixed(LocalBoxStream<'a, io::Result<Bytes>>),
Rabin(LocalBoxStream<'a, io::Result<Bytes>>),
Fixed(BoxStream<'a, io::Result<Bytes>>),
Rabin(BoxStream<'a, io::Result<Bytes>>),
}

impl<'a> Debug for ChunkerStream<'a> {
Expand Down Expand Up @@ -127,7 +127,7 @@ impl<'a> Stream for ChunkerStream<'a> {
}

impl Chunker {
pub fn chunks<'a, R: AsyncRead + Unpin + 'a>(self, source: R) -> ChunkerStream<'a> {
pub fn chunks<'a, R: AsyncRead + Unpin + Send + 'a>(self, source: R) -> ChunkerStream<'a> {
match self {
Self::Fixed(chunker) => ChunkerStream::Fixed(chunker.chunks(source)),
Self::Rabin(chunker) => ChunkerStream::Rabin(chunker.chunks(source)),
Expand Down
8 changes: 4 additions & 4 deletions iroh-unixfs/src/chunker/fixed.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::io;

use bytes::{Bytes, BytesMut};
use futures::{stream::LocalBoxStream, StreamExt};
use futures::{stream::BoxStream, StreamExt};
use tokio::io::{AsyncRead, AsyncReadExt};

/// Default size for chunks.
Expand All @@ -27,10 +27,10 @@ impl Fixed {
Self { chunk_size }
}

pub fn chunks<'a, R: AsyncRead + Unpin + 'a>(
pub fn chunks<'a, R: AsyncRead + Unpin + Send + 'a>(
self,
mut source: R,
) -> LocalBoxStream<'a, io::Result<Bytes>> {
) -> BoxStream<'a, io::Result<Bytes>> {
let chunk_size = self.chunk_size;
async_stream::stream! {
let mut buffer = BytesMut::with_capacity(chunk_size);
Expand Down Expand Up @@ -70,7 +70,7 @@ impl Fixed {
}
}
}
.boxed_local()
.boxed()
}
}

Expand Down
8 changes: 4 additions & 4 deletions iroh-unixfs/src/chunker/rabin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use std::io;

use bytes::{Bytes, BytesMut};
use futures::{stream::LocalBoxStream, StreamExt};
use futures::{stream::BoxStream, StreamExt};
use tokio::io::{AsyncRead, AsyncReadExt};

/// Rabin fingerprinting based chunker.
Expand Down Expand Up @@ -46,10 +46,10 @@ impl Rabin {
}
}

pub fn chunks<'a, R: AsyncRead + Unpin + 'a>(
pub fn chunks<'a, R: AsyncRead + Unpin + Send + 'a>(
self,
mut source: R,
) -> LocalBoxStream<'a, io::Result<Bytes>> {
) -> BoxStream<'a, io::Result<Bytes>> {
async_stream::stream! {
let target_size = 3 * self.config.max_size;
let mut buf = BytesMut::with_capacity(target_size);
Expand Down Expand Up @@ -142,7 +142,7 @@ impl Rabin {
let _ = buf.split_to(cur_idx);
}
}
.boxed_local()
.boxed()
}
}

Expand Down
10 changes: 5 additions & 5 deletions iroh/src/fixture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ fn fixture_get() -> Api {
OutType::Reader(Box::new(std::io::Cursor::new("hello"))),
)),
])
.boxed_local())
.boxed())
});
api
}
Expand Down Expand Up @@ -104,7 +104,7 @@ fn fixture_get_wrapped_file() -> Api {
OutType::Reader(Box::new(std::io::Cursor::new("hello"))),
)),
])
.boxed_local())
.boxed())
});
api
}
Expand All @@ -116,7 +116,7 @@ fn fixture_get_unwrapped_file() -> Api {
RelativePathBuf::from_path("").unwrap(),
OutType::Reader(Box::new(std::io::Cursor::new("hello"))),
))])
.boxed_local())
.boxed())
});
api
}
Expand All @@ -131,7 +131,7 @@ fn fixture_get_wrapped_symlink() -> Api {
OutType::Symlink(PathBuf::from("target/path/foo.txt")),
)),
])
.boxed_local())
.boxed())
});
api
}
Expand All @@ -143,7 +143,7 @@ fn fixture_get_unwrapped_symlink() -> Api {
RelativePathBuf::from_path("").unwrap(),
OutType::Symlink(PathBuf::from("target/path/foo.txt")),
))])
.boxed_local())
.boxed())
});
api
}
Expand Down

0 comments on commit 214d9ce

Please sign in to comment.