Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: impl put_many and reuse column families #412

Merged
merged 5 commits into from
Oct 25, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 71 additions & 8 deletions iroh-resolver/src/unixfs_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ use async_recursion::async_recursion;
use async_trait::async_trait;
use bytes::Bytes;
use cid::Cid;
use futures::{stream::LocalBoxStream, Stream, StreamExt};
use futures::stream::TryStreamExt;
use futures::{future, stream::LocalBoxStream, Stream, StreamExt};
use iroh_rpc_client::Client;
use prost::Message;
use tokio::io::AsyncRead;
Expand All @@ -31,7 +32,7 @@ use crate::{
const DIRECTORY_LINK_LIMIT: usize = 6000;

/// How many chunks to buffer up when adding content.
const ADD_PAR: usize = 24;
const _ADD_PAR: usize = 24;

#[derive(Debug, PartialEq)]
enum DirectoryType {
Expand Down Expand Up @@ -496,6 +497,7 @@ pub(crate) fn encode_unixfs_pb(
pub trait Store: 'static + Send + Sync + Clone {
async fn has(&self, &cid: Cid) -> Result<bool>;
async fn put(&self, cid: Cid, blob: Bytes, links: Vec<Cid>) -> Result<()>;
async fn put_many(&self, blocks: Vec<Block>) -> Result<()>;
}

#[async_trait]
Expand All @@ -507,6 +509,12 @@ impl Store for Client {
async fn put(&self, cid: Cid, blob: Bytes, links: Vec<Cid>) -> Result<()> {
self.try_store()?.put(cid, blob, links).await
}

async fn put_many(&self, blocks: Vec<Block>) -> Result<()> {
self.try_store()?
.put_many(blocks.into_iter().map(|x| x.into_parts()).collect())
.await
}
}

#[derive(Debug, Clone)]
Expand All @@ -525,6 +533,13 @@ impl Store for StoreAndProvideClient {
// we provide after insertion is finished
// self.client.try_p2p()?.start_providing(&cid).await
}

async fn put_many(&self, blocks: Vec<Block>) -> Result<()> {
self.client
.try_store()?
.put_many(blocks.into_iter().map(|x| x.into_parts()).collect())
.await
}
}

#[async_trait]
Expand All @@ -536,6 +551,14 @@ impl Store for Arc<tokio::sync::Mutex<std::collections::HashMap<Cid, Bytes>>> {
self.lock().await.insert(cid, blob);
Ok(())
}

async fn put_many(&self, blocks: Vec<Block>) -> Result<()> {
let mut this = self.lock().await;
for block in blocks {
this.insert(*block.cid(), block.data().clone());
}
Ok(())
}
}

/// Adds a single file.
Expand Down Expand Up @@ -617,20 +640,53 @@ pub enum AddEvent {
},
}

pub async fn add_blocks_to_store<S: Store>(
use async_stream::stream;

fn add_blocks_to_store_chunked<S: Store>(
store: S,
mut blocks: Pin<Box<dyn Stream<Item = Result<Block>>>>,
) -> impl Stream<Item = Result<AddEvent>> {
let mut chunk = Vec::new();
let mut chunk_size = 0u64;
const MAX_CHUNK_SIZE: u64 = 1024 * 1024 * 16;
stream! {
while let Some(block) = blocks.next().await {
let block = block?;
let block_size = block.data().len() as u64;
let cid = *block.cid();
let raw_data_size = block.raw_data_size();
tracing::info!("adding chunk of {} bytes", chunk_size);
if chunk_size + block_size > MAX_CHUNK_SIZE {
store.put_many(std::mem::take(&mut chunk)).await?;
chunk_size = 0;
}
chunk.push(block);
chunk_size += block_size;
yield Ok(AddEvent::ProgressDelta {
cid,
size: raw_data_size,
});
}
// make sure to also send the last chunk!
store.put_many(chunk).await?;
}
}

fn _add_blocks_to_store_single<S: Store>(
store: Option<S>,
blocks: Pin<Box<dyn Stream<Item = Result<Block>>>>,
) -> impl Stream<Item = Result<AddEvent>> {
blocks
.map(move |block| {
.and_then(|x| future::ok(vec![x]))
.map(move |blocks| {
let store = store.clone();
async move {
let block = block?;
let block = blocks?[0].clone();
let raw_data_size = block.raw_data_size();
let (cid, bytes, links) = block.into_parts();
let cid = *block.cid();
if let Some(store) = store {
if !store.has(cid).await? {
store.put(cid, bytes, links).await?;
store.put_many(vec![block]).await?;
}
}

Expand All @@ -640,7 +696,14 @@ pub async fn add_blocks_to_store<S: Store>(
})
}
})
.buffered(ADD_PAR)
.buffered(_ADD_PAR)
}

pub async fn add_blocks_to_store<S: Store>(
store: Option<S>,
blocks: Pin<Box<dyn Stream<Item = Result<Block>>>>,
) -> impl Stream<Item = Result<AddEvent>> {
add_blocks_to_store_chunked(store.unwrap(), blocks)
}

#[async_recursion(?Send)]
Expand Down
18 changes: 16 additions & 2 deletions iroh-rpc-client/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ use futures::Stream;
#[cfg(feature = "grpc")]
use iroh_rpc_types::store::store_client::StoreClient as GrpcStoreClient;
use iroh_rpc_types::store::{
GetLinksRequest, GetRequest, GetSizeRequest, HasRequest, PutRequest, Store, StoreClientAddr,
StoreClientBackend,
GetLinksRequest, GetRequest, GetSizeRequest, HasRequest, PutManyRequest, PutRequest, Store,
StoreClientAddr, StoreClientBackend,
};
use iroh_rpc_types::Addr;
#[cfg(feature = "grpc")]
Expand Down Expand Up @@ -40,6 +40,20 @@ impl StoreClient {
Ok(())
}

#[tracing::instrument(skip(self, blocks))]
pub async fn put_many(&self, blocks: Vec<(Cid, Bytes, Vec<Cid>)>) -> Result<()> {
let blocks = blocks
.into_iter()
.map(|(cid, blob, links)| PutRequest {
cid: cid.to_bytes(),
blob,
links: links.iter().map(|l| l.to_bytes()).collect(),
})
.collect();
self.backend.put_many(PutManyRequest { blocks }).await?;
Ok(())
}

#[tracing::instrument(skip(self))]
pub async fn get(&self, cid: Cid) -> Result<Option<Bytes>> {
let req = GetRequest {
Expand Down
7 changes: 7 additions & 0 deletions iroh-rpc-types/proto/store.proto
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import "google/protobuf/empty.proto";
service Store {
rpc Version(google.protobuf.Empty) returns (VersionResponse) {}
rpc Put(PutRequest) returns (google.protobuf.Empty) {}
rpc PutMany(PutManyRequest) returns (google.protobuf.Empty) {}
rpc Get(GetRequest) returns (GetResponse) {}
rpc Has(HasRequest) returns (HasResponse) {}
rpc GetLinks(GetLinksRequest) returns(GetLinksResponse) {}
Expand All @@ -17,6 +18,12 @@ message VersionResponse {
string version = 1;
}

// this should really be a stream
// but that would require a rewrite of the proxy! macro
message PutManyRequest {
repeated PutRequest blocks = 1;
}

message PutRequest {
// Serialized CID of the given block.
bytes cid = 1;
Expand Down
2 changes: 2 additions & 0 deletions iroh-rpc-types/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ include_proto!("store");
proxy!(
Store,
version: () => VersionResponse => VersionResponse,

put: PutRequest => () => (),
put_many: PutManyRequest => () => (),
get: GetRequest => GetResponse => GetResponse,
has: HasRequest => HasResponse => HasResponse,
get_links: GetLinksRequest => GetLinksResponse => GetLinksResponse,
Expand Down
17 changes: 16 additions & 1 deletion iroh-store/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ use bytes::BytesMut;
use cid::Cid;
use iroh_rpc_types::store::{
GetLinksRequest, GetLinksResponse, GetRequest, GetResponse, GetSizeRequest, GetSizeResponse,
HasRequest, HasResponse, PutRequest, Store as RpcStore, StoreServerAddr, VersionResponse,
HasRequest, HasResponse, PutManyRequest, PutRequest, Store as RpcStore, StoreServerAddr,
VersionResponse,
};
use tracing::info;

Expand Down Expand Up @@ -35,6 +36,20 @@ impl RpcStore for Store {
Ok(res)
}

#[tracing::instrument(skip(self, req))]
async fn put_many(&self, req: PutManyRequest) -> Result<()> {
let req = req
.blocks
.into_iter()
.map(|req| {
let cid = cid_from_bytes(req.cid)?;
let links = links_from_bytes(req.links)?;
Ok((cid, req.blob, links))
})
.collect::<Result<Vec<_>>>()?;
self.put_many(req)
}

#[tracing::instrument(skip(self))]
async fn get(&self, req: GetRequest) -> Result<GetResponse> {
let cid = cid_from_bytes(req.cid)?;
Expand Down
Loading