From d57f57bc43b367cf7f5c23ab228d292f4edf494d Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Tue, 25 Oct 2022 09:49:52 +0300 Subject: [PATCH 1/4] perf: impl put_many and reuse column families This requires getting rid of a lot of async in the store. Async only in the public interface --- iroh-resolver/src/unixfs_builder.rs | 80 +++++++- iroh-rpc-client/src/store.rs | 18 +- iroh-rpc-types/proto/store.proto | 7 + iroh-rpc-types/src/store.rs | 2 + iroh-store/src/rpc.rs | 17 +- iroh-store/src/store.rs | 273 +++++++++++++++++----------- 6 files changed, 283 insertions(+), 114 deletions(-) diff --git a/iroh-resolver/src/unixfs_builder.rs b/iroh-resolver/src/unixfs_builder.rs index 3e14bd99d1..da8a18949b 100644 --- a/iroh-resolver/src/unixfs_builder.rs +++ b/iroh-resolver/src/unixfs_builder.rs @@ -10,7 +10,8 @@ use async_recursion::async_recursion; use async_trait::async_trait; use bytes::Bytes; use cid::Cid; -use futures::{stream::LocalBoxStream, Stream, StreamExt}; +use futures::stream::TryStreamExt; +use futures::{future, stream::LocalBoxStream, Stream, StreamExt}; use iroh_rpc_client::Client; use prost::Message; use tokio::io::AsyncRead; @@ -31,7 +32,7 @@ use crate::{ const DIRECTORY_LINK_LIMIT: usize = 6000; /// How many chunks to buffer up when adding content. -const ADD_PAR: usize = 24; +const _ADD_PAR: usize = 24; #[derive(Debug, PartialEq)] enum DirectoryType { @@ -496,6 +497,7 @@ pub(crate) fn encode_unixfs_pb( pub trait Store: 'static + Send + Sync + Clone { async fn has(&self, &cid: Cid) -> Result; async fn put(&self, cid: Cid, blob: Bytes, links: Vec) -> Result<()>; + async fn put_many(&self, blocks: Vec) -> Result<()>; } #[async_trait] @@ -507,6 +509,12 @@ impl Store for Client { async fn put(&self, cid: Cid, blob: Bytes, links: Vec) -> Result<()> { self.try_store()?.put(cid, blob, links).await } + + async fn put_many(&self, blocks: Vec) -> Result<()> { + self.try_store()? + .put_many(blocks.into_iter().map(|x| x.into_parts()).collect()) + .await + } } #[derive(Debug, Clone)] @@ -525,6 +533,13 @@ impl Store for StoreAndProvideClient { // we provide after insertion is finished // self.client.try_p2p()?.start_providing(&cid).await } + + async fn put_many(&self, blocks: Vec) -> Result<()> { + self.client + .try_store()? + .put_many(blocks.into_iter().map(|x| x.into_parts()).collect()) + .await + } } #[async_trait] @@ -536,6 +551,14 @@ impl Store for Arc>> { self.lock().await.insert(cid, blob); Ok(()) } + + async fn put_many(&self, blocks: Vec) -> Result<()> { + let mut this = self.lock().await; + for block in blocks { + this.insert(*block.cid(), block.data().clone()); + } + Ok(()) + } } /// Adds a single file. @@ -617,20 +640,54 @@ pub enum AddEvent { }, } -pub async fn add_blocks_to_store( +use async_stream::stream; + +fn add_blocks_to_store_chunked( + store: S, + mut blocks: Pin>>>, +) -> impl Stream> { + let mut chunk = Vec::new(); + let mut chunk_size = 0u64; + const MAX_CHUNK_SIZE: u64 = 1024 * 1024 * 16; + let t = stream! { + while let Some(block) = blocks.next().await { + let block = block?; + let block_size = block.data().len() as u64; + let cid = *block.cid(); + if chunk_size + block_size > MAX_CHUNK_SIZE { + tracing::info!("adding chunk of {} bytes", chunk_size); + store.put_many(chunk.clone()).await?; + chunk.clear(); + chunk_size = 0; + } else { + chunk_size += block_size; + } + yield Ok(AddEvent::ProgressDelta { + cid, + size: block.raw_data_size(), + }); + } + // make sure to also send the last chunk! + store.put_many(chunk).await?; + }; + t +} + +fn _add_blocks_to_store_single( store: Option, blocks: Pin>>>, ) -> impl Stream> { blocks - .map(move |block| { + .and_then(|x| future::ok(vec![x])) + .map(move |blocks| { let store = store.clone(); async move { - let block = block?; + let block = blocks?[0].clone(); let raw_data_size = block.raw_data_size(); - let (cid, bytes, links) = block.into_parts(); + let cid = *block.cid(); if let Some(store) = store { if !store.has(cid).await? { - store.put(cid, bytes, links).await?; + store.put_many(vec![block]).await?; } } @@ -640,7 +697,14 @@ pub async fn add_blocks_to_store( }) } }) - .buffered(ADD_PAR) + .buffered(_ADD_PAR) +} + +pub async fn add_blocks_to_store( + store: Option, + blocks: Pin>>>, +) -> impl Stream> { + add_blocks_to_store_chunked(store.unwrap(), blocks) } #[async_recursion(?Send)] diff --git a/iroh-rpc-client/src/store.rs b/iroh-rpc-client/src/store.rs index 407234bfe1..0a4881f5cd 100644 --- a/iroh-rpc-client/src/store.rs +++ b/iroh-rpc-client/src/store.rs @@ -8,8 +8,8 @@ use futures::Stream; #[cfg(feature = "grpc")] use iroh_rpc_types::store::store_client::StoreClient as GrpcStoreClient; use iroh_rpc_types::store::{ - GetLinksRequest, GetRequest, GetSizeRequest, HasRequest, PutRequest, Store, StoreClientAddr, - StoreClientBackend, + GetLinksRequest, GetRequest, GetSizeRequest, HasRequest, PutManyRequest, PutRequest, Store, + StoreClientAddr, StoreClientBackend, }; use iroh_rpc_types::Addr; #[cfg(feature = "grpc")] @@ -40,6 +40,20 @@ impl StoreClient { Ok(()) } + #[tracing::instrument(skip(self, blocks))] + pub async fn put_many(&self, blocks: Vec<(Cid, Bytes, Vec)>) -> Result<()> { + let blocks = blocks + .into_iter() + .map(|(cid, blob, links)| PutRequest { + cid: cid.to_bytes(), + blob, + links: links.iter().map(|l| l.to_bytes()).collect(), + }) + .collect(); + self.backend.put_many(PutManyRequest { blocks }).await?; + Ok(()) + } + #[tracing::instrument(skip(self))] pub async fn get(&self, cid: Cid) -> Result> { let req = GetRequest { diff --git a/iroh-rpc-types/proto/store.proto b/iroh-rpc-types/proto/store.proto index 7d20057285..578ffc40f5 100644 --- a/iroh-rpc-types/proto/store.proto +++ b/iroh-rpc-types/proto/store.proto @@ -7,6 +7,7 @@ import "google/protobuf/empty.proto"; service Store { rpc Version(google.protobuf.Empty) returns (VersionResponse) {} rpc Put(PutRequest) returns (google.protobuf.Empty) {} + rpc PutMany(PutManyRequest) returns (google.protobuf.Empty) {} rpc Get(GetRequest) returns (GetResponse) {} rpc Has(HasRequest) returns (HasResponse) {} rpc GetLinks(GetLinksRequest) returns(GetLinksResponse) {} @@ -17,6 +18,12 @@ message VersionResponse { string version = 1; } +// this should really be a stream +// but that would require a rewrite of the proxy! macro +message PutManyRequest { + repeated PutRequest blocks = 1; +} + message PutRequest { // Serialized CID of the given block. bytes cid = 1; diff --git a/iroh-rpc-types/src/store.rs b/iroh-rpc-types/src/store.rs index 2602079b56..8036af62ef 100644 --- a/iroh-rpc-types/src/store.rs +++ b/iroh-rpc-types/src/store.rs @@ -3,7 +3,9 @@ include_proto!("store"); proxy!( Store, version: () => VersionResponse => VersionResponse, + put: PutRequest => () => (), + put_many: PutManyRequest => () => (), get: GetRequest => GetResponse => GetResponse, has: HasRequest => HasResponse => HasResponse, get_links: GetLinksRequest => GetLinksResponse => GetLinksResponse, diff --git a/iroh-store/src/rpc.rs b/iroh-store/src/rpc.rs index d3a247b6ff..154d5fd807 100644 --- a/iroh-store/src/rpc.rs +++ b/iroh-store/src/rpc.rs @@ -6,7 +6,8 @@ use bytes::BytesMut; use cid::Cid; use iroh_rpc_types::store::{ GetLinksRequest, GetLinksResponse, GetRequest, GetResponse, GetSizeRequest, GetSizeResponse, - HasRequest, HasResponse, PutRequest, Store as RpcStore, StoreServerAddr, VersionResponse, + HasRequest, HasResponse, PutManyRequest, PutRequest, Store as RpcStore, StoreServerAddr, + VersionResponse, }; use tracing::info; @@ -35,6 +36,20 @@ impl RpcStore for Store { Ok(res) } + #[tracing::instrument(skip(self, req))] + async fn put_many(&self, req: PutManyRequest) -> Result<()> { + let req = req + .blocks + .into_iter() + .map(|req| { + let cid = cid_from_bytes(req.cid)?; + let links = links_from_bytes(req.links)?; + Ok((cid, req.blob, links)) + }) + .collect::>>()?; + self.put_many(req) + } + #[tracing::instrument(skip(self))] async fn get(&self, req: GetRequest) -> Result { let cid = cid_from_bytes(req.cid)?; diff --git a/iroh-store/src/store.rs b/iroh-store/src/store.rs index 97f380a052..f25e53c935 100644 --- a/iroh-store/src/store.rs +++ b/iroh-store/src/store.rs @@ -7,6 +7,7 @@ use std::{ }; use anyhow::{anyhow, bail, Context, Result}; +use bytes::Bytes; use cid::Cid; use iroh_metrics::{ core::{MObserver, MRecorder}, @@ -92,6 +93,13 @@ struct CodeAndId { id: u64, } +struct ColumnFamilies<'a> { + id: &'a ColumnFamily, + metadata: &'a ColumnFamily, + graph: &'a ColumnFamily, + blobs: &'a ColumnFamily, +} + impl Store { /// Creates a new database. #[tracing::instrument] @@ -191,12 +199,76 @@ impl Store { #[tracing::instrument(skip(self, links, blob))] pub async fn put, L>(&self, cid: Cid, blob: T, links: L) -> Result<()> + where + L: IntoIterator, + { + self.put0(cid, blob, links, &self.cfs()?) + } + + #[tracing::instrument(skip(self, blocks))] + pub fn put_many(&self, blocks: impl IntoIterator)>) -> Result<()> { + self.put_many0(blocks, &self.cfs()?) + } + + #[tracing::instrument(skip(self))] + pub async fn get_blob_by_hash(&self, hash: &Multihash) -> Result>> { + let cf = self.cfs()?; + for elem in self.get_ids_for_hash(hash)? { + let id = elem?.id; + let id_bytes = id.to_be_bytes(); + if let Some(blob) = self.inner.content.get_pinned_cf(cf.blobs, &id_bytes)? { + return Ok(Some(blob)); + } + } + Ok(None) + } + + #[tracing::instrument(skip(self))] + pub async fn has_blob_for_hash(&self, hash: &Multihash) -> Result { + let cf = self.cfs()?; + for elem in self.get_ids_for_hash(hash)? { + let id = elem?.id; + let id_bytes = id.to_be_bytes(); + if let Some(_blob) = self.inner.content.get_pinned_cf(cf.blobs, &id_bytes)? { + return Ok(true); + } + } + Ok(false) + } + + #[tracing::instrument(skip(self))] + pub async fn get(&self, cid: &Cid) -> Result>> { + self.get0(cid, &self.cfs()?) + } + + #[tracing::instrument(skip(self))] + pub async fn get_size(&self, cid: &Cid) -> Result> { + self.get_size0(cid, &self.cfs()?) + } + + #[tracing::instrument(skip(self))] + pub async fn has(&self, cid: &Cid) -> Result { + self.has0(cid, &self.cfs()?) + } + + #[tracing::instrument(skip(self))] + pub async fn get_links(&self, cid: &Cid) -> Result>> { + self.get_links0(cid, &self.cfs()?) + } + + fn put0, L>( + &self, + cid: Cid, + blob: T, + links: L, + cf: &ColumnFamilies, + ) -> Result<()> where L: IntoIterator, { inc!(StoreMetrics::PutRequests); - if self.has(&cid).await? { + if self.has0(&cid, cf)? { return Ok(()); } @@ -215,22 +287,17 @@ impl Store { let metadata_bytes = rkyv::to_bytes::<_, 1024>(&metadata)?; // TODO: is this the right amount of scratch space? let id_key = id_key(&cid); - let children = self.ensure_id_many(links.into_iter()).await?; + let children = self.ensure_id_many(links.into_iter(), cf)?; let graph = GraphV0 { children }; let graph_bytes = rkyv::to_bytes::<_, 1024>(&graph)?; // TODO: is this the right amount of scratch space? - - let cf_id = self.cf_id()?; - let cf_meta = self.cf_metadata()?; - let cf_graph = self.cf_graph()?; - let cf_blobs = self.cf_blobs()?; let blob_size = blob.as_ref().len(); let mut batch = WriteBatch::default(); - batch.put_cf(cf_id, id_key, &id_bytes); - batch.put_cf(cf_blobs, &id_bytes, blob); - batch.put_cf(cf_meta, &id_bytes, metadata_bytes); - batch.put_cf(cf_graph, &id_bytes, graph_bytes); + batch.put_cf(cf.id, id_key, &id_bytes); + batch.put_cf(cf.blobs, &id_bytes, blob); + batch.put_cf(cf.metadata, &id_bytes, metadata_bytes); + batch.put_cf(cf.graph, &id_bytes, graph_bytes); self.db().write(batch)?; observe!(StoreHistograms::PutRequests, start.elapsed().as_secs_f64()); record!(StoreMetrics::PutBytes, blob_size as u64); @@ -238,47 +305,61 @@ impl Store { Ok(()) } - #[tracing::instrument(skip(self))] - pub async fn get_blob_by_hash(&self, hash: &Multihash) -> Result>> { - let cf_blobs = self - .inner - .content - .cf_handle(CF_BLOBS_V0) - .ok_or_else(|| anyhow!("missing column family: blobs"))?; - for elem in self.get_ids_for_hash(hash)? { - let id = elem?.id; - let id_bytes = id.to_be_bytes(); - if let Some(blob) = self.inner.content.get_pinned_cf(&cf_blobs, &id_bytes)? { - return Ok(Some(blob)); + fn put_many0( + &self, + blocks: impl IntoIterator)>, + cf: &ColumnFamilies, + ) -> Result<()> { + inc!(StoreMetrics::PutRequests); + let start = std::time::Instant::now(); + let mut total_blob_size = 0; + + let mut batch = WriteBatch::default(); + for (cid, blob, links) in blocks.into_iter() { + if self.has0(&cid, cf)? { + return Ok(()); } - } - Ok(None) - } - #[tracing::instrument(skip(self))] - pub async fn has_blob_for_hash(&self, hash: &Multihash) -> Result { - let cf_blobs = self - .inner - .content - .cf_handle(CF_BLOBS_V0) - .ok_or_else(|| anyhow!("missing column family: blobs"))?; - for elem in self.get_ids_for_hash(hash)? { - let id = elem?.id; + let id = self.next_id(); + let id_bytes = id.to_be_bytes(); - if let Some(_blob) = self.inner.content.get_pinned_cf(&cf_blobs, &id_bytes)? { - return Ok(true); - } + + // guranteed that the key does not exists, so we want to store it + + let metadata = MetadataV0 { + codec: cid.codec(), + multihash: cid.hash().to_bytes(), + }; + let metadata_bytes = rkyv::to_bytes::<_, 1024>(&metadata)?; // TODO: is this the right amount of scratch space? + let id_key = id_key(&cid); + + let children = self.ensure_id_many(links.into_iter(), cf)?; + + let graph = GraphV0 { children }; + let graph_bytes = rkyv::to_bytes::<_, 1024>(&graph)?; // TODO: is this the right amount of scratch space? + + let blob_size = blob.as_ref().len(); + total_blob_size += blob_size as u64; + + batch.put_cf(cf.id, id_key, &id_bytes); + batch.put_cf(cf.blobs, &id_bytes, blob); + batch.put_cf(cf.metadata, &id_bytes, metadata_bytes); + batch.put_cf(cf.graph, &id_bytes, graph_bytes); } - Ok(false) + + self.db().write(batch)?; + observe!(StoreHistograms::PutRequests, start.elapsed().as_secs_f64()); + record!(StoreMetrics::PutBytes, total_blob_size); + + Ok(()) } - #[tracing::instrument(skip(self))] - pub async fn get(&self, cid: &Cid) -> Result>> { + fn get0(&self, cid: &Cid, cf: &ColumnFamilies) -> Result>> { inc!(StoreMetrics::GetRequests); let start = std::time::Instant::now(); - let res = match self.get_id(cid).await? { + let res = match self.get_id(cid, cf)? { Some(id) => { - let maybe_blob = self.get_by_id(id).await?; + let maybe_blob = self.get_by_id(id, cf)?; inc!(StoreMetrics::StoreHit); record!( StoreMetrics::GetBytes, @@ -295,12 +376,11 @@ impl Store { res } - #[tracing::instrument(skip(self))] - pub async fn get_size(&self, cid: &Cid) -> Result> { - match self.get_id(cid).await? { + fn get_size0(&self, cid: &Cid, cf: &ColumnFamilies) -> Result> { + match self.get_id(cid, cf)? { Some(id) => { inc!(StoreMetrics::StoreHit); - let maybe_size = self.get_size_by_id(id).await?; + let maybe_size = self.get_size_by_id(id)?; Ok(maybe_size) } None => { @@ -310,14 +390,12 @@ impl Store { } } - #[tracing::instrument(skip(self))] - pub async fn has(&self, cid: &Cid) -> Result { - match self.get_id(cid).await? { + fn has0(&self, cid: &Cid, cf: &ColumnFamilies) -> Result { + match self.get_id(cid, cf)? { Some(id) => { - let cf_blobs = self.cf_blobs()?; let exists = self .db() - .get_pinned_cf(cf_blobs, id.to_be_bytes())? + .get_pinned_cf(cf.blobs, id.to_be_bytes())? .is_some(); Ok(exists) } @@ -325,13 +403,12 @@ impl Store { } } - #[tracing::instrument(skip(self))] - pub async fn get_links(&self, cid: &Cid) -> Result>> { + fn get_links0(&self, cid: &Cid, cf: &ColumnFamilies) -> Result>> { inc!(StoreMetrics::GetLinksRequests); let start = std::time::Instant::now(); - let res = match self.get_id(cid).await? { + let res = match self.get_id(cid, cf)? { Some(id) => { - let maybe_links = self.get_links_by_id(id).await?; + let maybe_links = self.get_links_by_id(id, cf)?; inc!(StoreMetrics::GetLinksHit); Ok(maybe_links) } @@ -347,11 +424,10 @@ impl Store { res } - #[tracing::instrument(skip(self))] - async fn get_id(&self, cid: &Cid) -> Result> { - let cf_id = self.cf_id()?; + #[tracing::instrument(skip(self, cf))] + fn get_id(&self, cid: &Cid, cf: &ColumnFamilies) -> Result> { let id_key = id_key(cid); - let maybe_id_bytes = self.db().get_pinned_cf(cf_id, id_key)?; + let maybe_id_bytes = self.db().get_pinned_cf(cf.id, id_key)?; match maybe_id_bytes { Some(bytes) => { let arr = bytes[..8].try_into().map_err(|e| anyhow!("{:?}", e))?; @@ -393,16 +469,15 @@ impl Store { })) } - #[tracing::instrument(skip(self))] - async fn get_by_id(&self, id: u64) -> Result>> { - let cf_blobs = self.cf_blobs()?; - let maybe_blob = self.db().get_pinned_cf(cf_blobs, id.to_be_bytes())?; + #[tracing::instrument(skip(self, cf))] + fn get_by_id(&self, id: u64, cf: &ColumnFamilies) -> Result>> { + let maybe_blob = self.db().get_pinned_cf(cf.blobs, id.to_be_bytes())?; Ok(maybe_blob) } #[tracing::instrument(skip(self))] - async fn get_size_by_id(&self, id: u64) -> Result> { + fn get_size_by_id(&self, id: u64) -> Result> { let cf_blobs = self .inner .content @@ -416,17 +491,18 @@ impl Store { Ok(maybe_size) } - #[tracing::instrument(skip(self))] - async fn get_links_by_id(&self, id: u64) -> Result>> { - let cf_graph = self.cf_graph()?; + #[tracing::instrument(skip(self, cf))] + fn get_links_by_id(&self, id: u64, cf: &ColumnFamilies) -> Result>> { let id_bytes = id.to_be_bytes(); // FIXME: can't use pinned because otherwise this can trigger alignment issues :/ - match self.db().get_cf(cf_graph, &id_bytes)? { + match self.db().get_cf(cf.graph, &id_bytes)? { Some(links_id) => { - let cf_meta = self.cf_metadata()?; let graph = rkyv::check_archived_root::(&links_id) .map_err(|e| anyhow!("{:?}", e))?; - let keys = graph.children.iter().map(|id| (&cf_meta, id.to_be_bytes())); + let keys = graph + .children + .iter() + .map(|id| (&cf.metadata, id.to_be_bytes())); let meta = self.db().multi_get_cf(keys); let mut links = Vec::with_capacity(meta.len()); for (i, meta) in meta.into_iter().enumerate() { @@ -450,19 +526,16 @@ impl Store { } /// Takes a list of cids and gives them ids, which are boths stored and then returned. - #[tracing::instrument(skip(self, cids))] - async fn ensure_id_many(&self, cids: I) -> Result> + #[tracing::instrument(skip(self, cids, cf))] + fn ensure_id_many(&self, cids: I, cf: &ColumnFamilies) -> Result> where I: IntoIterator, { - let cf_id = self.cf_id()?; - let cf_meta = self.cf_metadata()?; - let mut ids = Vec::new(); let mut batch = WriteBatch::default(); for cid in cids { let id_key = id_key(&cid); - let id = if let Some(id) = self.db().get_pinned_cf(cf_id, &id_key)? { + let id = if let Some(id) = self.db().get_pinned_cf(cf.id, &id_key)? { u64::from_be_bytes(id.as_ref().try_into()?) } else { let id = self.next_id(); @@ -473,8 +546,8 @@ impl Store { multihash: cid.hash().to_bytes(), }; let metadata_bytes = rkyv::to_bytes::<_, 1024>(&metadata)?; // TODO: is this the right amount of scratch space? - batch.put_cf(&cf_id, id_key, &id_bytes); - batch.put_cf(&cf_meta, &id_bytes, metadata_bytes); + batch.put_cf(&cf.id, id_key, &id_bytes); + batch.put_cf(&cf.metadata, &id_bytes, metadata_bytes); id }; ids.push(id); @@ -492,32 +565,26 @@ impl Store { id } - fn db(&self) -> &RocksDb { - &self.inner.content - } - - fn cf_id(&self) -> Result<&ColumnFamily> { - self.db() - .cf_handle(CF_ID_V0) - .context("missing column family: id") - } - - fn cf_metadata(&self) -> Result<&ColumnFamily> { - self.db() - .cf_handle(CF_METADATA_V0) - .context("missing column family: metadata") - } - - fn cf_blobs(&self) -> Result<&ColumnFamily> { - self.db() - .cf_handle(CF_BLOBS_V0) - .context("missing column family: blobs") + fn cfs(&self) -> Result { + let db = self.db(); + Ok(ColumnFamilies { + id: db + .cf_handle(CF_ID_V0) + .context("missing column family: id")?, + metadata: db + .cf_handle(CF_METADATA_V0) + .context("missing column family: metadata")?, + graph: db + .cf_handle(CF_GRAPH_V0) + .context("missing column family: graph")?, + blobs: db + .cf_handle(CF_BLOBS_V0) + .context("missing column family: blobs")?, + }) } - fn cf_graph(&self) -> Result<&ColumnFamily> { - self.db() - .cf_handle(CF_GRAPH_V0) - .context("missing column family: graph") + fn db(&self) -> &RocksDb { + &self.inner.content } } From 5143f02a5b490000fabfbfaa1450d0fabb8afbac Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Tue, 25 Oct 2022 14:05:15 +0300 Subject: [PATCH 2/4] fix: actually store the data --- iroh-resolver/src/unixfs_builder.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/iroh-resolver/src/unixfs_builder.rs b/iroh-resolver/src/unixfs_builder.rs index da8a18949b..9b3e80e4ee 100644 --- a/iroh-resolver/src/unixfs_builder.rs +++ b/iroh-resolver/src/unixfs_builder.rs @@ -654,17 +654,19 @@ fn add_blocks_to_store_chunked( let block = block?; let block_size = block.data().len() as u64; let cid = *block.cid(); + let raw_data_size = block.raw_data_size(); if chunk_size + block_size > MAX_CHUNK_SIZE { tracing::info!("adding chunk of {} bytes", chunk_size); store.put_many(chunk.clone()).await?; chunk.clear(); chunk_size = 0; } else { + chunk.push(block); chunk_size += block_size; } yield Ok(AddEvent::ProgressDelta { cid, - size: block.raw_data_size(), + size: raw_data_size, }); } // make sure to also send the last chunk! From c3390042c203405699e1b940dee3f344a5a26e1d Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Tue, 25 Oct 2022 17:44:43 +0300 Subject: [PATCH 3/4] fix: store chunk in any case --- iroh-resolver/src/unixfs_builder.rs | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/iroh-resolver/src/unixfs_builder.rs b/iroh-resolver/src/unixfs_builder.rs index 9b3e80e4ee..b1a9cb9a03 100644 --- a/iroh-resolver/src/unixfs_builder.rs +++ b/iroh-resolver/src/unixfs_builder.rs @@ -649,21 +649,20 @@ fn add_blocks_to_store_chunked( let mut chunk = Vec::new(); let mut chunk_size = 0u64; const MAX_CHUNK_SIZE: u64 = 1024 * 1024 * 16; - let t = stream! { + stream! { while let Some(block) = blocks.next().await { let block = block?; let block_size = block.data().len() as u64; let cid = *block.cid(); let raw_data_size = block.raw_data_size(); + tracing::info!("adding chunk of {} bytes", chunk_size); if chunk_size + block_size > MAX_CHUNK_SIZE { - tracing::info!("adding chunk of {} bytes", chunk_size); store.put_many(chunk.clone()).await?; chunk.clear(); chunk_size = 0; - } else { - chunk.push(block); - chunk_size += block_size; } + chunk.push(block); + chunk_size += block_size; yield Ok(AddEvent::ProgressDelta { cid, size: raw_data_size, @@ -671,8 +670,7 @@ fn add_blocks_to_store_chunked( } // make sure to also send the last chunk! store.put_many(chunk).await?; - }; - t + } } fn _add_blocks_to_store_single( From a193ab45d0ff75f5552c49645e28fea9b759bb61 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Tue, 25 Oct 2022 20:22:39 +0300 Subject: [PATCH 4/4] refactor: use std::mem::take instead of clone/clear --- iroh-resolver/src/unixfs_builder.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/iroh-resolver/src/unixfs_builder.rs b/iroh-resolver/src/unixfs_builder.rs index b1a9cb9a03..408394774b 100644 --- a/iroh-resolver/src/unixfs_builder.rs +++ b/iroh-resolver/src/unixfs_builder.rs @@ -657,8 +657,7 @@ fn add_blocks_to_store_chunked( let raw_data_size = block.raw_data_size(); tracing::info!("adding chunk of {} bytes", chunk_size); if chunk_size + block_size > MAX_CHUNK_SIZE { - store.put_many(chunk.clone()).await?; - chunk.clear(); + store.put_many(std::mem::take(&mut chunk)).await?; chunk_size = 0; } chunk.push(block);