diff --git a/iroh-store/benches/store.rs b/iroh-store/benches/store.rs index de67357707..54aebb7eda 100644 --- a/iroh-store/benches/store.rs +++ b/iroh-store/benches/store.rs @@ -31,9 +31,8 @@ pub fn put_benchmark(c: &mut Criterion) { }; let store = executor.block_on(async { Store::create(config).await.unwrap() }); let store_ref = &store; - b.to_async(&executor).iter(|| async move { - store_ref.put(*key, black_box(value), []).await.unwrap() - }); + b.to_async(&executor) + .iter(|| async move { store_ref.put(*key, black_box(value), []).unwrap() }); }, ); } @@ -65,7 +64,7 @@ pub fn get_benchmark(c: &mut Criterion) { let hash = Code::Sha2_256.digest(&value); let key = cid::Cid::new_v1(RAW, hash); keys.push(key); - store_ref.put(key, &value, []).await.unwrap(); + store_ref.put(key, &value, []).unwrap(); } keys }); @@ -77,7 +76,7 @@ pub fn get_benchmark(c: &mut Criterion) { let start = Instant::now(); for i in 0..iters { let key = &keys_ref[(i as usize) % l]; - let res = store_ref.get(key).await.unwrap().unwrap(); + let res = store_ref.get(key).unwrap().unwrap(); black_box(res); } start.elapsed() diff --git a/iroh-store/src/rpc.rs b/iroh-store/src/rpc.rs index 154d5fd807..9e4bf02e25 100644 --- a/iroh-store/src/rpc.rs +++ b/iroh-store/src/rpc.rs @@ -30,7 +30,7 @@ impl RpcStore for Store { async fn put(&self, req: PutRequest) -> Result<()> { let cid = cid_from_bytes(req.cid)?; let links = links_from_bytes(req.links)?; - let res = self.put(cid, req.blob, links).await?; + let res = self.put(cid, req.blob, links)?; info!("store rpc call: put cid {}", cid); Ok(res) @@ -53,7 +53,7 @@ impl RpcStore for Store { #[tracing::instrument(skip(self))] async fn get(&self, req: GetRequest) -> Result { let cid = cid_from_bytes(req.cid)?; - if let Some(res) = self.get(&cid).await? { + if let Some(res) = self.get(&cid)? { Ok(GetResponse { data: Some(BytesMut::from(&res[..]).freeze()), }) @@ -65,7 +65,7 @@ impl RpcStore for Store { #[tracing::instrument(skip(self))] async fn has(&self, req: HasRequest) -> Result { let cid = cid_from_bytes(req.cid)?; - let has = self.has(&cid).await?; + let has = self.has(&cid)?; Ok(HasResponse { has }) } @@ -73,7 +73,7 @@ impl RpcStore for Store { #[tracing::instrument(skip(self))] async fn get_links(&self, req: GetLinksRequest) -> Result { let cid = cid_from_bytes(req.cid)?; - if let Some(res) = self.get_links(&cid).await? { + if let Some(res) = self.get_links(&cid)? { let links = res.into_iter().map(|cid| cid.to_bytes()).collect(); Ok(GetLinksResponse { links }) } else { diff --git a/iroh-store/src/store.rs b/iroh-store/src/store.rs index f25e53c935..bbbaa1b987 100644 --- a/iroh-store/src/store.rs +++ b/iroh-store/src/store.rs @@ -93,13 +93,6 @@ struct CodeAndId { id: u64, } -struct ColumnFamilies<'a> { - id: &'a ColumnFamily, - metadata: &'a ColumnFamily, - graph: &'a ColumnFamily, - blobs: &'a ColumnFamily, -} - impl Store { /// Creates a new database. #[tracing::instrument] @@ -198,79 +191,102 @@ impl Store { } #[tracing::instrument(skip(self, links, blob))] - pub async fn put, L>(&self, cid: Cid, blob: T, links: L) -> Result<()> + pub fn put, L>(&self, cid: Cid, blob: T, links: L) -> Result<()> where L: IntoIterator, { - self.put0(cid, blob, links, &self.cfs()?) + self.local_store()?.put(cid, blob, links) } #[tracing::instrument(skip(self, blocks))] pub fn put_many(&self, blocks: impl IntoIterator)>) -> Result<()> { - self.put_many0(blocks, &self.cfs()?) + self.local_store()?.put_many(blocks) } #[tracing::instrument(skip(self))] - pub async fn get_blob_by_hash(&self, hash: &Multihash) -> Result>> { - let cf = self.cfs()?; - for elem in self.get_ids_for_hash(hash)? { - let id = elem?.id; - let id_bytes = id.to_be_bytes(); - if let Some(blob) = self.inner.content.get_pinned_cf(cf.blobs, &id_bytes)? { - return Ok(Some(blob)); - } - } - Ok(None) + pub fn get_blob_by_hash(&self, hash: &Multihash) -> Result>> { + self.local_store()?.get_blob_by_hash(hash) } #[tracing::instrument(skip(self))] - pub async fn has_blob_for_hash(&self, hash: &Multihash) -> Result { - let cf = self.cfs()?; - for elem in self.get_ids_for_hash(hash)? { - let id = elem?.id; - let id_bytes = id.to_be_bytes(); - if let Some(_blob) = self.inner.content.get_pinned_cf(cf.blobs, &id_bytes)? { - return Ok(true); - } - } - Ok(false) + pub fn has_blob_for_hash(&self, hash: &Multihash) -> Result { + self.local_store()?.has_blob_for_hash(hash) } #[tracing::instrument(skip(self))] - pub async fn get(&self, cid: &Cid) -> Result>> { - self.get0(cid, &self.cfs()?) + pub fn get(&self, cid: &Cid) -> Result>> { + self.local_store()?.get(cid) } #[tracing::instrument(skip(self))] pub async fn get_size(&self, cid: &Cid) -> Result> { - self.get_size0(cid, &self.cfs()?) + self.local_store()?.get_size(cid) } #[tracing::instrument(skip(self))] - pub async fn has(&self, cid: &Cid) -> Result { - self.has0(cid, &self.cfs()?) + pub fn has(&self, cid: &Cid) -> Result { + self.local_store()?.has(cid) } #[tracing::instrument(skip(self))] - pub async fn get_links(&self, cid: &Cid) -> Result>> { - self.get_links0(cid, &self.cfs()?) + pub fn get_links(&self, cid: &Cid) -> Result>> { + self.local_store()?.get_links(cid) } - fn put0, L>( + #[cfg(test)] + fn get_ids_for_hash( &self, - cid: Cid, - blob: T, - links: L, - cf: &ColumnFamilies, - ) -> Result<()> + hash: &Multihash, + ) -> Result> + '_> { + self.local_store()?.get_ids_for_hash(hash) + } + + fn local_store(&self) -> Result { + let db = &self.inner.content; + Ok(LocalStore { + db, + id: db + .cf_handle(CF_ID_V0) + .context("missing column family: id")?, + metadata: db + .cf_handle(CF_METADATA_V0) + .context("missing column family: metadata")?, + graph: db + .cf_handle(CF_GRAPH_V0) + .context("missing column family: graph")?, + blobs: db + .cf_handle(CF_BLOBS_V0) + .context("missing column family: blobs")?, + next_id: &self.inner.next_id, + }) + } +} + +/// The local store is fully synchronous and is not Send. +/// +/// Due to this, it can store column family handles. +/// +/// All interacion with the database is done through this struct. +struct LocalStore<'a> { + db: &'a RocksDb, + id: &'a ColumnFamily, + metadata: &'a ColumnFamily, + graph: &'a ColumnFamily, + blobs: &'a ColumnFamily, + next_id: &'a AtomicU64, +} + +impl<'a> LocalStore<'a> { + fn put, L>(&self, cid: Cid, blob: T, links: L) -> Result<()> where L: IntoIterator, { inc!(StoreMetrics::PutRequests); - if self.has0(&cid, cf)? { + if self.has(&cid)? { return Ok(()); } + let cf = self; let id = self.next_id(); @@ -298,25 +314,22 @@ impl Store { batch.put_cf(cf.blobs, &id_bytes, blob); batch.put_cf(cf.metadata, &id_bytes, metadata_bytes); batch.put_cf(cf.graph, &id_bytes, graph_bytes); - self.db().write(batch)?; + self.db.write(batch)?; observe!(StoreHistograms::PutRequests, start.elapsed().as_secs_f64()); record!(StoreMetrics::PutBytes, blob_size as u64); Ok(()) } - fn put_many0( - &self, - blocks: impl IntoIterator)>, - cf: &ColumnFamilies, - ) -> Result<()> { + fn put_many(&self, blocks: impl IntoIterator)>) -> Result<()> { inc!(StoreMetrics::PutRequests); let start = std::time::Instant::now(); let mut total_blob_size = 0; + let cf = self; let mut batch = WriteBatch::default(); for (cid, blob, links) in blocks.into_iter() { - if self.has0(&cid, cf)? { + if self.has(&cid)? { return Ok(()); } @@ -347,19 +360,19 @@ impl Store { batch.put_cf(cf.graph, &id_bytes, graph_bytes); } - self.db().write(batch)?; + self.db.write(batch)?; observe!(StoreHistograms::PutRequests, start.elapsed().as_secs_f64()); record!(StoreMetrics::PutBytes, total_blob_size); Ok(()) } - fn get0(&self, cid: &Cid, cf: &ColumnFamilies) -> Result>> { + fn get(&self, cid: &Cid) -> Result>> { inc!(StoreMetrics::GetRequests); let start = std::time::Instant::now(); - let res = match self.get_id(cid, cf)? { + let res = match self.get_id(cid)? { Some(id) => { - let maybe_blob = self.get_by_id(id, cf)?; + let maybe_blob = self.get_by_id(id)?; inc!(StoreMetrics::StoreHit); record!( StoreMetrics::GetBytes, @@ -376,8 +389,8 @@ impl Store { res } - fn get_size0(&self, cid: &Cid, cf: &ColumnFamilies) -> Result> { - match self.get_id(cid, cf)? { + fn get_size(&self, cid: &Cid) -> Result> { + match self.get_id(cid)? { Some(id) => { inc!(StoreMetrics::StoreHit); let maybe_size = self.get_size_by_id(id)?; @@ -390,12 +403,12 @@ impl Store { } } - fn has0(&self, cid: &Cid, cf: &ColumnFamilies) -> Result { - match self.get_id(cid, cf)? { + fn has(&self, cid: &Cid) -> Result { + match self.get_id(cid)? { Some(id) => { let exists = self - .db() - .get_pinned_cf(cf.blobs, id.to_be_bytes())? + .db + .get_pinned_cf(self.blobs, id.to_be_bytes())? .is_some(); Ok(exists) } @@ -403,12 +416,12 @@ impl Store { } } - fn get_links0(&self, cid: &Cid, cf: &ColumnFamilies) -> Result>> { + fn get_links(&self, cid: &Cid) -> Result>> { inc!(StoreMetrics::GetLinksRequests); let start = std::time::Instant::now(); - let res = match self.get_id(cid, cf)? { + let res = match self.get_id(cid)? { Some(id) => { - let maybe_links = self.get_links_by_id(id, cf)?; + let maybe_links = self.get_links_by_id(id)?; inc!(StoreMetrics::GetLinksHit); Ok(maybe_links) } @@ -424,10 +437,10 @@ impl Store { res } - #[tracing::instrument(skip(self, cf))] - fn get_id(&self, cid: &Cid, cf: &ColumnFamilies) -> Result> { + #[tracing::instrument(skip(self))] + fn get_id(&self, cid: &Cid) -> Result> { let id_key = id_key(cid); - let maybe_id_bytes = self.db().get_pinned_cf(cf.id, id_key)?; + let maybe_id_bytes = self.db.get_pinned_cf(self.id, id_key)?; match maybe_id_bytes { Some(bytes) => { let arr = bytes[..8].try_into().map_err(|e| anyhow!("{:?}", e))?; @@ -440,17 +453,11 @@ impl Store { fn get_ids_for_hash( &self, hash: &Multihash, - ) -> Result> + '_> { + ) -> Result> + 'a> { let hash = hash.to_bytes(); - let cf_id = self - .inner - .content - .cf_handle(CF_ID_V0) - .ok_or_else(|| anyhow!("missing column family: id"))?; let iter = self - .inner - .content - .iterator_cf(cf_id, IteratorMode::From(&hash, Direction::Forward)); + .db + .iterator_cf(self.id, IteratorMode::From(&hash, Direction::Forward)); let hash_len = hash.len(); Ok(iter .take_while(move |elem| { @@ -469,33 +476,49 @@ impl Store { })) } - #[tracing::instrument(skip(self, cf))] - fn get_by_id(&self, id: u64, cf: &ColumnFamilies) -> Result>> { - let maybe_blob = self.db().get_pinned_cf(cf.blobs, id.to_be_bytes())?; + fn get_blob_by_hash(&self, hash: &Multihash) -> Result>> { + for elem in self.get_ids_for_hash(hash)? { + let id = elem?.id; + let id_bytes = id.to_be_bytes(); + if let Some(blob) = self.db.get_pinned_cf(self.blobs, &id_bytes)? { + return Ok(Some(blob)); + } + } + Ok(None) + } + + #[tracing::instrument(skip(self))] + fn has_blob_for_hash(&self, hash: &Multihash) -> Result { + for elem in self.get_ids_for_hash(hash)? { + let id = elem?.id; + let id_bytes = id.to_be_bytes(); + if let Some(_blob) = self.db.get_pinned_cf(self.blobs, &id_bytes)? { + return Ok(true); + } + } + Ok(false) + } + + #[tracing::instrument(skip(self))] + fn get_by_id(&self, id: u64) -> Result>> { + let maybe_blob = self.db.get_pinned_cf(self.blobs, id.to_be_bytes())?; Ok(maybe_blob) } #[tracing::instrument(skip(self))] fn get_size_by_id(&self, id: u64) -> Result> { - let cf_blobs = self - .inner - .content - .cf_handle(CF_BLOBS_V0) - .ok_or_else(|| anyhow!("missing column family: blobs"))?; - let maybe_blob = self - .inner - .content - .get_pinned_cf(cf_blobs, id.to_be_bytes())?; + let maybe_blob = self.db.get_pinned_cf(self.blobs, id.to_be_bytes())?; let maybe_size = maybe_blob.map(|b| b.len()); Ok(maybe_size) } - #[tracing::instrument(skip(self, cf))] - fn get_links_by_id(&self, id: u64, cf: &ColumnFamilies) -> Result>> { + #[tracing::instrument(skip(self))] + fn get_links_by_id(&self, id: u64) -> Result>> { let id_bytes = id.to_be_bytes(); // FIXME: can't use pinned because otherwise this can trigger alignment issues :/ - match self.db().get_cf(cf.graph, &id_bytes)? { + let cf = self; + match self.db.get_cf(cf.graph, &id_bytes)? { Some(links_id) => { let graph = rkyv::check_archived_root::(&links_id) .map_err(|e| anyhow!("{:?}", e))?; @@ -503,7 +526,7 @@ impl Store { .children .iter() .map(|id| (&cf.metadata, id.to_be_bytes())); - let meta = self.db().multi_get_cf(keys); + let meta = self.db.multi_get_cf(keys); let mut links = Vec::with_capacity(meta.len()); for (i, meta) in meta.into_iter().enumerate() { match meta? { @@ -527,7 +550,7 @@ impl Store { /// Takes a list of cids and gives them ids, which are boths stored and then returned. #[tracing::instrument(skip(self, cids, cf))] - fn ensure_id_many(&self, cids: I, cf: &ColumnFamilies) -> Result> + fn ensure_id_many(&self, cids: I, cf: &LocalStore) -> Result> where I: IntoIterator, { @@ -535,7 +558,7 @@ impl Store { let mut batch = WriteBatch::default(); for cid in cids { let id_key = id_key(&cid); - let id = if let Some(id) = self.db().get_pinned_cf(cf.id, &id_key)? { + let id = if let Some(id) = self.db.get_pinned_cf(cf.id, &id_key)? { u64::from_be_bytes(id.as_ref().try_into()?) } else { let id = self.next_id(); @@ -552,40 +575,18 @@ impl Store { }; ids.push(id); } - self.db().write(batch)?; + self.db.write(batch)?; Ok(ids) } #[tracing::instrument(skip(self))] fn next_id(&self) -> u64 { - let id = self.inner.next_id.fetch_add(1, Ordering::SeqCst); + let id = self.next_id.fetch_add(1, Ordering::SeqCst); // TODO: better handling assert!(id > 0, "this store is full"); id } - - fn cfs(&self) -> Result { - let db = self.db(); - Ok(ColumnFamilies { - id: db - .cf_handle(CF_ID_V0) - .context("missing column family: id")?, - metadata: db - .cf_handle(CF_METADATA_V0) - .context("missing column family: metadata")?, - graph: db - .cf_handle(CF_GRAPH_V0) - .context("missing column family: graph")?, - blobs: db - .cf_handle(CF_BLOBS_V0) - .context("missing column family: blobs")?, - }) - } - - fn db(&self) -> &RocksDb { - &self.inner.content - } } #[cfg(test)] @@ -626,17 +627,17 @@ mod tests { let links = [link]; - store.put(c, &data, links).await.unwrap(); + store.put(c, &data, links).unwrap(); values.push((c, data, links)); } for (i, (c, expected_data, expected_links)) in values.iter().enumerate() { dbg!(i); - assert!(store.has(c).await.unwrap()); - let data = store.get(c).await.unwrap().unwrap(); + assert!(store.has(c).unwrap()); + let data = store.get(c).unwrap().unwrap(); assert_eq!(expected_data, &data[..]); - let links = store.get_links(c).await.unwrap().unwrap(); + let links = store.get_links(c).unwrap().unwrap(); assert_eq!(expected_links, &links[..]); } } @@ -665,15 +666,15 @@ mod tests { let links = [link]; - store.put(c, &data, links).await.unwrap(); + store.put(c, &data, links).unwrap(); values.push((c, data, links)); } for (c, expected_data, expected_links) in values.iter() { - let data = store.get(c).await.unwrap().unwrap(); + let data = store.get(c).unwrap().unwrap(); assert_eq!(expected_data, &data[..]); - let links = store.get_links(c).await.unwrap().unwrap(); + let links = store.get_links(c).unwrap().unwrap(); assert_eq!(expected_links, &links[..]); } @@ -681,10 +682,10 @@ mod tests { let store = Store::open(config).await.unwrap(); for (c, expected_data, expected_links) in values.iter() { - let data = store.get(c).await.unwrap().unwrap(); + let data = store.get(c).unwrap().unwrap(); assert_eq!(expected_data, &data[..]); - let links = store.get_links(c).await.unwrap().unwrap(); + let links = store.get_links(c).unwrap().unwrap(); assert_eq!(expected_links, &links[..]); } @@ -698,15 +699,15 @@ mod tests { let links = [link]; - store.put(c, &data, links).await.unwrap(); + store.put(c, &data, links).unwrap(); values.push((c, data, links)); } for (c, expected_data, expected_links) in values.iter() { - let data = store.get(c).await.unwrap().unwrap(); + let data = store.get(c).unwrap().unwrap(); assert_eq!(expected_data, &data[..]); - let links = store.get_links(c).await.unwrap().unwrap(); + let links = store.get_links(c).unwrap().unwrap(); assert_eq!(expected_links, &links[..]); } } @@ -740,10 +741,10 @@ mod tests { let cbor_cid = Cid::new_v1(IpldCodec::DagCbor.into(), hash); let (store, _dir) = test_store().await?; - store.put(raw_cid, &blob, vec![]).await?; - store.put(cbor_cid, &blob, vec![link1, link2]).await?; - assert_eq!(store.get_links(&raw_cid).await?.unwrap().len(), 0); - assert_eq!(store.get_links(&cbor_cid).await?.unwrap().len(), 2); + store.put(raw_cid, &blob, vec![])?; + store.put(cbor_cid, &blob, vec![link1, link2])?; + assert_eq!(store.get_links(&raw_cid)?.unwrap().len(), 0); + assert_eq!(store.get_links(&cbor_cid)?.unwrap().len(), 2); let ids = store.get_ids_for_hash(&hash)?; assert_eq!(ids.count(), 2); @@ -767,18 +768,18 @@ mod tests { let (store, _dir) = test_store().await?; // we don't have it yet - assert!(!store.has_blob_for_hash(&hash).await?); - let actual = store.get_blob_by_hash(&hash).await?.map(|x| x.to_vec()); + assert!(!store.has_blob_for_hash(&hash)?); + let actual = store.get_blob_by_hash(&hash)?.map(|x| x.to_vec()); assert_eq!(actual, None); - store.put(raw_cid, &expected, vec![]).await?; - assert!(store.has_blob_for_hash(&hash).await?); - let actual = store.get_blob_by_hash(&hash).await?.map(|x| x.to_vec()); + store.put(raw_cid, &expected, vec![])?; + assert!(store.has_blob_for_hash(&hash)?); + let actual = store.get_blob_by_hash(&hash)?.map(|x| x.to_vec()); assert_eq!(actual, Some(expected.clone())); - store.put(cbor_cid, &expected, vec![link1, link2]).await?; - assert!(store.has_blob_for_hash(&hash).await?); - let actual = store.get_blob_by_hash(&hash).await?.map(|x| x.to_vec()); + store.put(cbor_cid, &expected, vec![link1, link2])?; + assert!(store.has_blob_for_hash(&hash)?); + let actual = store.get_blob_by_hash(&hash)?.map(|x| x.to_vec()); assert_eq!(actual, Some(expected)); Ok(()) }