diff --git a/iroh-api/src/api.rs b/iroh-api/src/api.rs index 92bfd5a882..488d4856be 100644 --- a/iroh-api/src/api.rs +++ b/iroh-api/src/api.rs @@ -7,7 +7,7 @@ use crate::IpfsPath; use crate::P2pApi; use anyhow::{ensure, Context, Result}; use cid::Cid; -use futures::stream::LocalBoxStream; +use futures::stream::BoxStream; use futures::{StreamExt, TryStreamExt}; use iroh_resolver::resolver::Resolver; use iroh_rpc_client::Client; @@ -40,7 +40,7 @@ pub struct Api { pub enum OutType { Dir, - Reader(Box), + Reader(Box), Symlink(PathBuf), } @@ -126,7 +126,7 @@ impl Api { pub fn get( &self, ipfs_path: &IpfsPath, - ) -> Result>> { + ) -> Result>> { ensure!( ipfs_path.cid().is_some(), "IPFS path does not refer to a CID" @@ -165,15 +165,15 @@ impl Api { } }; - Ok(stream.boxed_local()) + Ok(stream.boxed()) } pub async fn check(&self) -> StatusTable { self.client.check().await } - pub async fn watch(&self) -> LocalBoxStream<'static, StatusTable> { - self.client.clone().watch().await.boxed_local() + pub async fn watch(&self) -> BoxStream<'static, StatusTable> { + self.client.clone().watch().await.boxed() } /// The `add_stream` method encodes the entry into a DAG and adds @@ -184,9 +184,9 @@ impl Api { pub async fn add_stream( &self, entry: UnixfsEntry, - ) -> Result>> { + ) -> Result>> { let blocks = match entry { - UnixfsEntry::File(f) => f.encode().await?.boxed_local(), + UnixfsEntry::File(f) => f.encode().await?.boxed(), UnixfsEntry::Directory(d) => d.encode(), UnixfsEntry::Symlink(s) => Box::pin(async_stream::try_stream! { yield s.encode()? diff --git a/iroh-api/src/store.rs b/iroh-api/src/store.rs index ae91629f02..921f39e1a8 100644 --- a/iroh-api/src/store.rs +++ b/iroh-api/src/store.rs @@ -57,7 +57,7 @@ impl Store for Arc>> { fn add_blocks_to_store_chunked( store: S, - mut blocks: Pin>>>, + mut blocks: Pin> + Send>>, ) -> impl Stream> { let mut chunk = Vec::new(); let mut chunk_size = 0u64; @@ -87,7 +87,7 @@ fn add_blocks_to_store_chunked( pub async fn add_blocks_to_store( store: Option, - blocks: Pin>>>, + blocks: Pin> + Send>>, ) -> impl Stream> { add_blocks_to_store_chunked(store.unwrap(), blocks) } diff --git a/iroh-unixfs/src/balanced_tree.rs b/iroh-unixfs/src/balanced_tree.rs index 4b05fa41ff..4d43591970 100644 --- a/iroh-unixfs/src/balanced_tree.rs +++ b/iroh-unixfs/src/balanced_tree.rs @@ -33,7 +33,7 @@ impl TreeBuilder { pub fn stream_tree( &self, - chunks: impl Stream>, + chunks: impl Stream> + Send, ) -> impl Stream> { match self { TreeBuilder::Balanced { degree } => stream_balanced_tree(chunks, *degree), @@ -48,7 +48,7 @@ struct LinkInfo { } fn stream_balanced_tree( - in_stream: impl Stream>, + in_stream: impl Stream> + Send, degree: usize, ) -> impl Stream> { try_stream! { diff --git a/iroh-unixfs/src/builder.rs b/iroh-unixfs/src/builder.rs index 47d4d8f7a7..7ef9ef005a 100644 --- a/iroh-unixfs/src/builder.rs +++ b/iroh-unixfs/src/builder.rs @@ -9,7 +9,7 @@ use anyhow::{ensure, Context, Result}; use async_recursion::async_recursion; use bytes::Bytes; use futures::{ - stream::{self, LocalBoxStream}, + stream::{self, BoxStream}, Stream, StreamExt, TryFutureExt, }; use prost::Message; @@ -104,7 +104,7 @@ impl Directory { current.expect("must not be empty") } - pub fn encode<'a>(self) -> LocalBoxStream<'a, Result> { + pub fn encode<'a>(self) -> BoxStream<'a, Result> { match self { Directory::Basic(basic) => basic.encode(), Directory::Hamt(hamt) => hamt.encode(), @@ -113,7 +113,7 @@ impl Directory { } impl BasicDirectory { - pub fn encode<'a>(self) -> LocalBoxStream<'a, Result> { + pub fn encode<'a>(self) -> BoxStream<'a, Result> { async_stream::try_stream! { let mut links = Vec::new(); for entry in self.entries { @@ -143,18 +143,18 @@ impl BasicDirectory { let node = UnixfsNode::Directory(Node { outer, inner }); yield node.encode()?; } - .boxed_local() + .boxed() } } impl HamtDirectory { - pub fn encode<'a>(self) -> LocalBoxStream<'a, Result> { + pub fn encode<'a>(self) -> BoxStream<'a, Result> { self.hamt.encode() } } enum Content { - Reader(Pin>), + Reader(Pin>), Path(PathBuf), } @@ -281,7 +281,7 @@ impl Symlink { pub struct FileBuilder { name: Option, path: Option, - reader: Option>>, + reader: Option>>, chunker: Chunker, degree: usize, } @@ -359,7 +359,7 @@ impl FileBuilder { self } - pub fn content_reader(mut self, content: T) -> Self { + pub fn content_reader(mut self, content: T) -> Self { self.reader = Some(Box::pin(content)); self } @@ -419,11 +419,11 @@ impl Entry { } } - pub async fn encode(self) -> Result>> { + pub async fn encode(self) -> Result>> { Ok(match self { - Entry::File(f) => f.encode().await?.boxed_local(), + Entry::File(f) => f.encode().await?.boxed(), Entry::Directory(d) => d.encode(), - Entry::Symlink(s) => stream::iter(Some(s.encode())).boxed_local(), + Entry::Symlink(s) => stream::iter(Some(s.encode())).boxed(), }) } @@ -637,7 +637,7 @@ impl HamtNode { } } - pub fn encode<'a>(self) -> LocalBoxStream<'a, Result> { + pub fn encode<'a>(self) -> BoxStream<'a, Result> { match self { Self::Branch(tree) => { async_stream::try_stream! { @@ -673,11 +673,11 @@ impl HamtNode { let node = UnixfsNode::Directory(crate::unixfs::Node { outer, inner }); yield node.encode()?; } - .boxed_local() + .boxed() } Self::Leaf(HamtLeaf(_hash, entry)) => async move { entry.encode().await } .try_flatten_stream() - .boxed_local(), + .boxed(), } } } diff --git a/iroh-unixfs/src/chunker.rs b/iroh-unixfs/src/chunker.rs index d15ff52696..1b7678a355 100644 --- a/iroh-unixfs/src/chunker.rs +++ b/iroh-unixfs/src/chunker.rs @@ -8,7 +8,7 @@ use std::{ use anyhow::{anyhow, Context}; use bytes::Bytes; -use futures::{stream::LocalBoxStream, Stream}; +use futures::{stream::BoxStream, Stream}; use tokio::io::AsyncRead; mod fixed; @@ -92,8 +92,8 @@ impl From for Chunker { } pub enum ChunkerStream<'a> { - Fixed(LocalBoxStream<'a, io::Result>), - Rabin(LocalBoxStream<'a, io::Result>), + Fixed(BoxStream<'a, io::Result>), + Rabin(BoxStream<'a, io::Result>), } impl<'a> Debug for ChunkerStream<'a> { @@ -127,7 +127,7 @@ impl<'a> Stream for ChunkerStream<'a> { } impl Chunker { - pub fn chunks<'a, R: AsyncRead + Unpin + 'a>(self, source: R) -> ChunkerStream<'a> { + pub fn chunks<'a, R: AsyncRead + Unpin + Send + 'a>(self, source: R) -> ChunkerStream<'a> { match self { Self::Fixed(chunker) => ChunkerStream::Fixed(chunker.chunks(source)), Self::Rabin(chunker) => ChunkerStream::Rabin(chunker.chunks(source)), diff --git a/iroh-unixfs/src/chunker/fixed.rs b/iroh-unixfs/src/chunker/fixed.rs index 9dded67900..0c71da94e2 100644 --- a/iroh-unixfs/src/chunker/fixed.rs +++ b/iroh-unixfs/src/chunker/fixed.rs @@ -1,7 +1,7 @@ use std::io; use bytes::{Bytes, BytesMut}; -use futures::{stream::LocalBoxStream, StreamExt}; +use futures::{stream::BoxStream, StreamExt}; use tokio::io::{AsyncRead, AsyncReadExt}; /// Default size for chunks. @@ -27,10 +27,10 @@ impl Fixed { Self { chunk_size } } - pub fn chunks<'a, R: AsyncRead + Unpin + 'a>( + pub fn chunks<'a, R: AsyncRead + Unpin + Send + 'a>( self, mut source: R, - ) -> LocalBoxStream<'a, io::Result> { + ) -> BoxStream<'a, io::Result> { let chunk_size = self.chunk_size; async_stream::stream! { let mut buffer = BytesMut::with_capacity(chunk_size); @@ -70,7 +70,7 @@ impl Fixed { } } } - .boxed_local() + .boxed() } } diff --git a/iroh-unixfs/src/chunker/rabin.rs b/iroh-unixfs/src/chunker/rabin.rs index 44ffc18c61..69fccdd526 100644 --- a/iroh-unixfs/src/chunker/rabin.rs +++ b/iroh-unixfs/src/chunker/rabin.rs @@ -3,7 +3,7 @@ use std::io; use bytes::{Bytes, BytesMut}; -use futures::{stream::LocalBoxStream, StreamExt}; +use futures::{stream::BoxStream, StreamExt}; use tokio::io::{AsyncRead, AsyncReadExt}; /// Rabin fingerprinting based chunker. @@ -46,10 +46,10 @@ impl Rabin { } } - pub fn chunks<'a, R: AsyncRead + Unpin + 'a>( + pub fn chunks<'a, R: AsyncRead + Unpin + Send + 'a>( self, mut source: R, - ) -> LocalBoxStream<'a, io::Result> { + ) -> BoxStream<'a, io::Result> { async_stream::stream! { let target_size = 3 * self.config.max_size; let mut buf = BytesMut::with_capacity(target_size); @@ -142,7 +142,7 @@ impl Rabin { let _ = buf.split_to(cur_idx); } } - .boxed_local() + .boxed() } } diff --git a/iroh/src/fixture.rs b/iroh/src/fixture.rs index 6a30c92138..a9c08e3926 100644 --- a/iroh/src/fixture.rs +++ b/iroh/src/fixture.rs @@ -51,7 +51,7 @@ fn fixture_get() -> Api { OutType::Reader(Box::new(std::io::Cursor::new("hello"))), )), ]) - .boxed_local()) + .boxed()) }); api } @@ -104,7 +104,7 @@ fn fixture_get_wrapped_file() -> Api { OutType::Reader(Box::new(std::io::Cursor::new("hello"))), )), ]) - .boxed_local()) + .boxed()) }); api } @@ -116,7 +116,7 @@ fn fixture_get_unwrapped_file() -> Api { RelativePathBuf::from_path("").unwrap(), OutType::Reader(Box::new(std::io::Cursor::new("hello"))), ))]) - .boxed_local()) + .boxed()) }); api } @@ -131,7 +131,7 @@ fn fixture_get_wrapped_symlink() -> Api { OutType::Symlink(PathBuf::from("target/path/foo.txt")), )), ]) - .boxed_local()) + .boxed()) }); api } @@ -143,7 +143,7 @@ fn fixture_get_unwrapped_symlink() -> Api { RelativePathBuf::from_path("").unwrap(), OutType::Symlink(PathBuf::from("target/path/foo.txt")), ))]) - .boxed_local()) + .boxed()) }); api }