Skip to content

Commit

Permalink
feat: Map id to cid instead of multihash (#336)
Browse files Browse the repository at this point in the history
* test: ensure that different cids with the same hash work

Currently failing because of #335

* fix: use code and hash (so basically cid) as id key

That way you can have 2 cids with the same hash but different links.
Downside is that you might store the same data twice in the very unlikely
case where you have the same data as both raw and dag-cb. ¯\_(ツ)_/¯

* perf: avoid allocation when creating an id key

* feat: Add way to get data by hash

This is one of the two ways of doing this:

- downside: in the rare case where there are 2 cids with the same hash, the data gets stored twice
- upside: storing the graph can be done using just u64 ids instead of (code, id) tuples

* docs: update comment to match new db layout

* refactor: address PR review comments
  • Loading branch information
rklaehn authored Oct 17, 2022
1 parent fdf2170 commit 092bdf7
Show file tree
Hide file tree
Showing 3 changed files with 181 additions and 13 deletions.
3 changes: 3 additions & 0 deletions iroh-store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,14 @@ git-version = "0.3.5"
serde = { version = "1.0", features = ["derive"] }
config = "0.13.1"
async-trait = "0.1.56"
smallvec = { version = "1.10.0", features = ["write"] }
multihash = "0.16.3"

[dev-dependencies]
criterion = { version = "0.4.0", features = ["async_tokio"] }
tempfile = "3.3.0"
tokio = { version = "1", features = ["rt", "macros", "rt-multi-thread"] }
libipld = "0.14.0"

[features]
default = ["rpc-grpc", "rpc-mem"]
Expand Down
4 changes: 3 additions & 1 deletion iroh-store/src/cf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ pub const CF_METADATA_V0: &str = "metadata-v0";
/// Column familty that stores the graph for a blob
/// - indexed by id (u64)
pub const CF_GRAPH_V0: &str = "graph-v0";
/// Column family that stores the mapping multihash to id.
/// Column family that stores the mapping (multihash, code) to id.
///
/// By storing multihash first we can search for ids either by cid = (multihash, code) or by multihash.
pub const CF_ID_V0: &str = "id-v0";

// This wrapper type serializes the contained value out-of-line so that newer
Expand Down
187 changes: 175 additions & 12 deletions iroh-store/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@ use iroh_metrics::{
store::{StoreHistograms, StoreMetrics},
};
use iroh_rpc_client::Client as RpcClient;
use multihash::Multihash;
use rocksdb::{
BlockBasedOptions, Cache, ColumnFamily, DBPinnableSlice, IteratorMode, Options, WriteBatch,
DB as RocksDb,
BlockBasedOptions, Cache, ColumnFamily, DBPinnableSlice, Direction, IteratorMode, Options,
WriteBatch, DB as RocksDb,
};
use smallvec::SmallVec;
use tokio::task;

use crate::cf::{
Expand Down Expand Up @@ -71,6 +73,27 @@ fn default_blob_opts() -> Options {
opts
}

/// The key used in CF_ID_V0
///
/// The multihash followed by the be encoded code. This allows both looking up an id by multihash and code (aka Cid),
/// and looking up all codes and ids for a multihash, for the rare case that there are mulitple cids with the same
/// multihash but different codes.
fn id_key(cid: &Cid) -> SmallVec<[u8; 64]> {
let mut key = SmallVec::new();
cid.hash().write(&mut key).unwrap();
key.extend_from_slice(&cid.codec().to_be_bytes());
key
}

/// Struct used to iterate over all the ids for a multihash
struct CodeAndId {
// the ipld code of the id
#[allow(dead_code)]
code: u64,
// the id for the cid, used in most other column families
id: u64,
}

impl Store {
/// Creates a new database.
#[tracing::instrument]
Expand Down Expand Up @@ -192,7 +215,7 @@ impl Store {
multihash: cid.hash().to_bytes(),
});
let metadata_bytes = rkyv::to_bytes::<_, 1024>(&metadata)?; // TODO: is this the right amount of scratch space?
let multihash = &metadata.0.multihash;
let id_key = id_key(&cid);

let children = self.ensure_id_many(links.into_iter()).await?;

Expand All @@ -206,7 +229,7 @@ impl Store {
let blob_size = blob.as_ref().len();

let mut batch = WriteBatch::default();
batch.put_cf(cf_id, multihash, &id_bytes);
batch.put_cf(cf_id, id_key, &id_bytes);
batch.put_cf(cf_blobs, &id_bytes, blob);
batch.put_cf(cf_meta, &id_bytes, metadata_bytes);
batch.put_cf(cf_graph, &id_bytes, graph_bytes);
Expand All @@ -217,6 +240,40 @@ impl Store {
Ok(())
}

#[tracing::instrument(skip(self))]
pub async fn get_blob_by_hash(&self, hash: &Multihash) -> Result<Option<DBPinnableSlice<'_>>> {
let cf_blobs = self
.inner
.content
.cf_handle(CF_BLOBS_V0)
.ok_or_else(|| anyhow!("missing column family: blobs"))?;
for elem in self.get_ids_for_hash(hash)? {
let id = elem?.id;
let id_bytes = id.to_be_bytes();
if let Some(blob) = self.inner.content.get_pinned_cf(&cf_blobs, &id_bytes)? {
return Ok(Some(blob));
}
}
Ok(None)
}

#[tracing::instrument(skip(self))]
pub async fn has_blob_for_hash(&self, hash: &Multihash) -> Result<bool> {
let cf_blobs = self
.inner
.content
.cf_handle(CF_BLOBS_V0)
.ok_or_else(|| anyhow!("missing column family: blobs"))?;
for elem in self.get_ids_for_hash(hash)? {
let id = elem?.id;
let id_bytes = id.to_be_bytes();
if let Some(_blob) = self.inner.content.get_pinned_cf(&cf_blobs, &id_bytes)? {
return Ok(true);
}
}
Ok(false)
}

#[tracing::instrument(skip(self))]
pub async fn get(&self, cid: &Cid) -> Result<Option<DBPinnableSlice<'_>>> {
inc!(StoreMetrics::GetRequests);
Expand Down Expand Up @@ -280,8 +337,8 @@ impl Store {
#[tracing::instrument(skip(self))]
async fn get_id(&self, cid: &Cid) -> Result<Option<u64>> {
let cf_id = self.cf_id()?;
let multihash = cid.hash().to_bytes();
let maybe_id_bytes = self.db().get_pinned_cf(cf_id, multihash)?;
let id_key = id_key(cid);
let maybe_id_bytes = self.db().get_pinned_cf(cf_id, id_key)?;
match maybe_id_bytes {
Some(bytes) => {
let arr = bytes[..8].try_into().map_err(|e| anyhow!("{:?}", e))?;
Expand All @@ -291,6 +348,38 @@ impl Store {
}
}

fn get_ids_for_hash(
&self,
hash: &Multihash,
) -> Result<impl Iterator<Item = Result<CodeAndId>> + '_> {
let hash = hash.to_bytes();
let cf_id = self
.inner
.content
.cf_handle(CF_ID_V0)
.ok_or_else(|| anyhow!("missing column family: id"))?;
let iter = self
.inner
.content
.iterator_cf(cf_id, IteratorMode::From(&hash, Direction::Forward));
let hash_len = hash.len();
Ok(iter
.take_while(move |elem| {
if let Ok((k, _)) = elem {
k.len() == hash_len + 8 && k.starts_with(&hash)
} else {
// we don't want to swallow errors. An error is not the same as no result!
true
}
})
.map(move |elem| {
let (k, v) = elem?;
let code = u64::from_be_bytes(k[hash_len..].try_into()?);
let id = u64::from_be_bytes(v[..8].try_into()?);
Ok(CodeAndId { code, id })
}))
}

#[tracing::instrument(skip(self))]
async fn get_by_id(&self, id: u64) -> Result<Option<DBPinnableSlice<'_>>> {
let cf_blobs = self.cf_blobs()?;
Expand Down Expand Up @@ -349,21 +438,19 @@ impl Store {
let mut ids = Vec::new();
let mut batch = WriteBatch::default();
for cid in cids {
let multihash = cid.hash().to_bytes();
let id = if let Some(id) = self.db().get_pinned_cf(cf_id, &multihash)? {
let id_key = id_key(&cid);
let id = if let Some(id) = self.db().get_pinned_cf(cf_id, &id_key)? {
u64::from_be_bytes(id.as_ref().try_into()?)
} else {
let id = self.next_id();
let id_bytes = id.to_be_bytes();

let metadata = Versioned(MetadataV0 {
codec: cid.codec(),
multihash,
multihash: cid.hash().to_bytes(),
});
let metadata_bytes = rkyv::to_bytes::<_, 1024>(&metadata)?; // TODO: is this the right amount of scratch space?

let multihash = &metadata.0.multihash;
batch.put_cf(&cf_id, multihash, &id_bytes);
batch.put_cf(&cf_id, id_key, &id_bytes);
batch.put_cf(&cf_meta, &id_bytes, metadata_bytes);
id
};
Expand Down Expand Up @@ -413,12 +500,16 @@ impl Store {

#[cfg(test)]
mod tests {
use std::str::FromStr;

use super::*;

use iroh_metrics::config::Config as MetricsConfig;
use iroh_rpc_client::Config as RpcClientConfig;

use cid::multihash::{Code, MultihashDigest};
use libipld::{prelude::Encode, IpldCodec};
use tempfile::TempDir;
const RAW: u64 = 0x55;

#[tokio::test]
Expand Down Expand Up @@ -529,4 +620,76 @@ mod tests {
assert_eq!(expected_links, &links[..]);
}
}

async fn test_store() -> anyhow::Result<(Store, TempDir)> {
let dir = tempfile::tempdir()?;
let rpc_client = RpcClientConfig::default();
let config = Config {
path: dir.path().into(),
rpc_client,
metrics: MetricsConfig::default(),
};

let store = Store::create(config).await?;
Ok((store, dir))
}

#[tokio::test]
async fn test_multiple_cids_same_hash() -> anyhow::Result<()> {
let link1 = Cid::from_str("bafybeib4tddkl4oalrhe7q66rrz5dcpz4qwv5lmpstuqrls3djikw566y4")?;
let link2 = Cid::from_str("QmcBphfXUFUNLcfAm31WEqYjrjEh19G5x4iAQANSK151DD")?;
// some data with links
let data = libipld::ipld!({
"link1": link1,
"link2": link2,
});
let mut blob = Vec::new();
data.encode(IpldCodec::DagCbor, &mut blob)?;
let hash = Code::Sha2_256.digest(&blob);
let raw_cid = Cid::new_v1(IpldCodec::Raw.into(), hash);
let cbor_cid = Cid::new_v1(IpldCodec::DagCbor.into(), hash);

let (store, _dir) = test_store().await?;
store.put(raw_cid, &blob, vec![]).await?;
store.put(cbor_cid, &blob, vec![link1, link2]).await?;
assert_eq!(store.get_links(&raw_cid).await?.unwrap().len(), 0);
assert_eq!(store.get_links(&cbor_cid).await?.unwrap().len(), 2);

let ids = store.get_ids_for_hash(&hash)?;
assert_eq!(ids.count(), 2);
Ok(())
}

#[tokio::test]
async fn test_blob_by_hash() -> anyhow::Result<()> {
let link1 = Cid::from_str("bafybeib4tddkl4oalrhe7q66rrz5dcpz4qwv5lmpstuqrls3djikw566y4")?;
let link2 = Cid::from_str("QmcBphfXUFUNLcfAm31WEqYjrjEh19G5x4iAQANSK151DD")?;
// some data with links
let data = libipld::ipld!({
"link1": link1,
"link2": link2,
});
let mut expected = Vec::new();
data.encode(IpldCodec::DagCbor, &mut expected)?;
let hash = Code::Sha2_256.digest(&expected);
let raw_cid = Cid::new_v1(IpldCodec::Raw.into(), hash);
let cbor_cid = Cid::new_v1(IpldCodec::DagCbor.into(), hash);

let (store, _dir) = test_store().await?;
// we don't have it yet
assert!(!store.has_blob_for_hash(&hash).await?);
let actual = store.get_blob_by_hash(&hash).await?.map(|x| x.to_vec());
assert_eq!(actual, None);

store.put(raw_cid, &expected, vec![]).await?;
assert!(store.has_blob_for_hash(&hash).await?);
let actual = store.get_blob_by_hash(&hash).await?.map(|x| x.to_vec());
assert_eq!(actual, Some(expected.clone()));

store.put(cbor_cid, &expected, vec![link1, link2]).await?;
assert!(store.has_blob_for_hash(&hash).await?);
let actual = store.get_blob_by_hash(&hash).await?.map(|x| x.to_vec());
assert_eq!(actual, Some(expected));
Ok(())
}
}

0 comments on commit 092bdf7

Please sign in to comment.