From a1b9586a3b9829c0989232c9e910c4d6945b7802 Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Mon, 2 May 2022 19:38:15 +0200 Subject: [PATCH] feat: implement basic iroh-store --- Cargo.toml | 1 + iroh-store/Cargo.toml | 29 +++ iroh-store/README.md | 27 +++ iroh-store/benches/store.rs | 84 ++++++++ iroh-store/src/cf.rs | 37 ++++ iroh-store/src/config.rs | 8 + iroh-store/src/lib.rs | 6 + iroh-store/src/main.rs | 1 + iroh-store/src/store.rs | 418 ++++++++++++++++++++++++++++++++++++ 9 files changed, 611 insertions(+) create mode 100644 iroh-store/Cargo.toml create mode 100644 iroh-store/README.md create mode 100644 iroh-store/benches/store.rs create mode 100644 iroh-store/src/cf.rs create mode 100644 iroh-store/src/config.rs create mode 100644 iroh-store/src/lib.rs create mode 100644 iroh-store/src/main.rs create mode 100644 iroh-store/src/store.rs diff --git a/Cargo.toml b/Cargo.toml index 26995be581..1bfe57f7b8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,5 +6,6 @@ members = [ "iroh-gateway", "iroh-metrics", "iroh-p2p", + "iroh-store", "stores/*" ] diff --git a/iroh-store/Cargo.toml b/iroh-store/Cargo.toml new file mode 100644 index 0000000000..05a6899a6d --- /dev/null +++ b/iroh-store/Cargo.toml @@ -0,0 +1,29 @@ +[package] +name = "iroh-store" +version = "0.1.0" +edition = "2021" +authors = ["Friedel Ziegelmayer "] +license = "Apache-2.0/MIT" +repository = "https://github.com/n0-computer/iroh" +description = "Implementation of the storage part of iroh" + +[dependencies] +rocksdb = { git = "https://github.com/rust-rocksdb/rust-rocksdb", branch = "master" } +eyre = "0.6.6" +tokio = { version = "1.18.0", features = ["rt"] } +cid = "0.8.4" +rkyv = { version = "0.7.37", features = ["validation"] } +bytecheck = "0.6.7" + +[dev-dependencies] +criterion = { version = "0.3.5", features = ["async_tokio"] } +tempfile = "3.3.0" +tokio = { version = "1.18.0", features = ["rt", "macros", "rt-multi-thread"] } + +[features] +default = [] + + +[[bench]] +name = "store" +harness = false \ No newline at end of file diff --git a/iroh-store/README.md b/iroh-store/README.md new file mode 100644 index 0000000000..d1fb164059 --- /dev/null +++ b/iroh-store/README.md @@ -0,0 +1,27 @@ +# iroh-store + +> Storage for iroh. + + +## How to run + +```sh +# From the root of the workspace +> cargo run --release -p iroh-store +``` + +## License + + +Licensed under either of Apache License, Version +2.0 or MIT license at your option. + + +
+ + +Unless you explicitly state otherwise, any contribution intentionally submitted +for inclusion in this crate by you, as defined in the Apache-2.0 license, shall +be dual licensed as above, without any additional terms or conditions. + + diff --git a/iroh-store/benches/store.rs b/iroh-store/benches/store.rs new file mode 100644 index 0000000000..5b31100235 --- /dev/null +++ b/iroh-store/benches/store.rs @@ -0,0 +1,84 @@ +use std::time::Instant; + +use cid::multihash::{Code, MultihashDigest}; +use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion}; +use iroh_store::{Config, Store}; +use tokio::runtime::Runtime; + +const RAW: u64 = 0x55; + +pub fn put_benchmark(c: &mut Criterion) { + let mut group = c.benchmark_group("store_put"); + for value_size in [32, 128, 512, 1024].iter() { + let value = vec![8u8; *value_size]; + let hash = Code::Sha2_256.digest(&value); + let key = cid::Cid::new_v1(RAW, hash); + + group.throughput(criterion::Throughput::Bytes(*value_size as u64)); + group.bench_with_input( + BenchmarkId::new("value_size", *value_size as u64), + &(key, value), + |b, (key, value)| { + let executor = Runtime::new().unwrap(); + let dir = tempfile::tempdir().unwrap(); + let config = Config { + path: dir.path().into(), + }; + let store = executor.block_on(async { Store::create(config).await.unwrap() }); + let store_ref = &store; + b.to_async(&executor).iter(|| async move { + store_ref.put(*key, black_box(value), []).await.unwrap() + }); + }, + ); + } + group.finish(); +} + +pub fn get_benchmark(c: &mut Criterion) { + let mut group = c.benchmark_group("store_get"); + for value_size in [32, 128, 512, 1024].iter() { + group.throughput(criterion::Throughput::Bytes(*value_size as u64)); + group.bench_with_input( + BenchmarkId::new("value_size", *value_size as u64), + &(), + |b, _| { + let executor = Runtime::new().unwrap(); + let dir = tempfile::tempdir().unwrap(); + let config = Config { + path: dir.path().into(), + }; + let store = executor.block_on(async { Store::create(config).await.unwrap() }); + let store_ref = &store; + let keys = executor.block_on(async { + let mut keys = Vec::new(); + for i in 0..1000 { + let value = vec![i as u8; *value_size]; + let hash = Code::Sha2_256.digest(&value); + let key = cid::Cid::new_v1(RAW, hash); + keys.push(key); + store_ref.put(key, &value, []).await.unwrap(); + } + keys + }); + + let keys_ref = &keys[..]; + b.to_async(&executor).iter_custom(|iters| async move { + let l = keys_ref.len(); + + let start = Instant::now(); + for i in 0..iters { + let key = &keys_ref[(i as usize) % l]; + let res = store_ref.get(key).await.unwrap().unwrap(); + black_box(res); + } + start.elapsed() + }); + }, + ); + } + group.finish(); +} + +criterion_group!(benches, put_benchmark, get_benchmark); +criterion_main!(benches); diff --git a/iroh-store/src/cf.rs b/iroh-store/src/cf.rs new file mode 100644 index 0000000000..e63e4450ae --- /dev/null +++ b/iroh-store/src/cf.rs @@ -0,0 +1,37 @@ +use bytecheck::CheckBytes; +use rkyv::{with::AsBox, Archive, Deserialize, Serialize}; + +/// Column family to store actual data. +/// - Maps id (u64) to bytes +pub const CF_BLOBS_V0: &str = "blobs-v0"; +/// Column family that stores metdata about a given blob. +/// - indexed by id (u64) +pub const CF_METADATA_V0: &str = "metadata-v0"; +/// Column familty that stores the graph for a blob +/// - indexed by id (u64) +pub const CF_GRAPH_V0: &str = "graph-v0"; +/// Column family that stores the mapping multihash to id +pub const CF_ID_V0: &str = "id-v0"; + +// This wrapper type serializes the contained value out-of-line so that newer +// versions can be viewed as the older version. +#[derive(Debug, Archive, Deserialize, Serialize)] +#[repr(transparent)] +#[archive_attr(repr(transparent), derive(CheckBytes))] +pub struct Versioned(#[with(AsBox)] pub T); + +#[derive(Debug, Archive, Deserialize, Serialize)] +#[repr(C)] +#[archive_attr(repr(C), derive(CheckBytes))] +pub struct MetadataV0 { + pub codec: u64, + pub multihash: Vec, +} + +#[derive(Debug, Archive, Deserialize, Serialize)] +#[repr(C)] +#[archive_attr(repr(C), derive(CheckBytes))] +pub struct GraphV0 { + /// The codec of the original CID. + pub children: Vec, +} diff --git a/iroh-store/src/config.rs b/iroh-store/src/config.rs new file mode 100644 index 0000000000..f2d477a54c --- /dev/null +++ b/iroh-store/src/config.rs @@ -0,0 +1,8 @@ +use std::path::PathBuf; + +/// The configuration for the store. +#[derive(Debug, Clone)] +pub struct Config { + /// The location of the content database. + pub path: PathBuf, +} diff --git a/iroh-store/src/lib.rs b/iroh-store/src/lib.rs new file mode 100644 index 0000000000..8757db30f5 --- /dev/null +++ b/iroh-store/src/lib.rs @@ -0,0 +1,6 @@ +mod cf; +mod config; +mod store; + +pub use crate::config::Config; +pub use crate::store::Store; diff --git a/iroh-store/src/main.rs b/iroh-store/src/main.rs new file mode 100644 index 0000000000..f328e4d9d0 --- /dev/null +++ b/iroh-store/src/main.rs @@ -0,0 +1 @@ +fn main() {} diff --git a/iroh-store/src/store.rs b/iroh-store/src/store.rs new file mode 100644 index 0000000000..fd40e3ad6e --- /dev/null +++ b/iroh-store/src/store.rs @@ -0,0 +1,418 @@ +use std::sync::{ + atomic::{AtomicU64, Ordering}, + Arc, +}; + +use cid::Cid; +use eyre::{bail, Result}; +use rocksdb::{DBPinnableSlice, IteratorMode, Options, WriteBatch, DB as RocksDb}; +use tokio::task; + +use crate::cf::{ + GraphV0, MetadataV0, Versioned, CF_BLOBS_V0, CF_GRAPH_V0, CF_ID_V0, CF_METADATA_V0, +}; +use crate::Config; + +#[derive(Debug, Clone)] +pub struct Store { + inner: Arc, +} +#[derive(Debug)] +struct InnerStore { + content: RocksDb, + #[allow(dead_code)] + config: Config, + next_id: AtomicU64, +} + +impl Store { + /// Creates a new database. + pub async fn create(config: Config) -> Result { + let mut options = Options::default(); + options.create_if_missing(true); + // TODO: more options + + let path = config.path.clone(); + let db = task::spawn_blocking(move || -> Result<_> { + let mut db = RocksDb::open(&options, path)?; + { + let mut opts = Options::default(); + opts.set_enable_blob_files(true); + opts.set_blob_file_size(1024); + db.create_cf(CF_BLOBS_V0, &opts)?; + } + { + let opts = Options::default(); + db.create_cf(CF_METADATA_V0, &opts)?; + } + { + let opts = Options::default(); + db.create_cf(CF_GRAPH_V0, &opts)?; + } + { + let opts = Options::default(); + db.create_cf(CF_ID_V0, &opts)?; + } + + Ok(db) + }) + .await??; + + Ok(Store { + inner: Arc::new(InnerStore { + content: db, + config, + next_id: 1.into(), + }), + }) + } + + /// Opens an existing database. + pub async fn open(config: Config) -> Result { + let mut options = Options::default(); + options.create_if_missing(false); + // TODO: more options + + // TODO: find a way to read existing options + + let path = config.path.clone(); + let (db, next_id) = task::spawn_blocking(move || -> Result<_> { + let db = RocksDb::open_cf( + &options, + path, + [CF_BLOBS_V0, CF_METADATA_V0, CF_GRAPH_V0, CF_ID_V0], + )?; + + // read last inserted id + let next_id = { + let cf_meta = db + .cf_handle(CF_METADATA_V0) + .ok_or_else(|| eyre::eyre!("missing column family: metadata"))?; + + let mut iter = db.full_iterator_cf(&cf_meta, IteratorMode::End); + let last_id = iter + .next() + .and_then(|(key, _)| key[..8].try_into().ok()) + .map(u64::from_be_bytes) + .unwrap_or_default(); + + last_id + 1 + }; + + Ok((db, next_id)) + }) + .await??; + + Ok(Store { + inner: Arc::new(InnerStore { + content: db, + config, + next_id: next_id.into(), + }), + }) + } + + pub async fn put, L>(&self, cid: Cid, blob: T, links: L) -> Result<()> + where + L: IntoIterator, + { + let id = self.next_id(); + + let id_bytes = id.to_be_bytes(); + let metadata = Versioned(MetadataV0 { + codec: cid.codec(), + multihash: cid.hash().to_bytes(), + }); + let metadata_bytes = rkyv::to_bytes::<_, 1024>(&metadata)?; // TODO: is 64 bytes the write amount of scratch space? + let multihash = &metadata.0.multihash; + + let children = self.ensure_id_many(links.into_iter()).await?; + + let graph = Versioned(GraphV0 { children }); + let graph_bytes = rkyv::to_bytes::<_, 1024>(&graph)?; // TODO: is 64 bytes the write amount of scratch space? + + let cf_id = self + .inner + .content + .cf_handle(CF_ID_V0) + .ok_or_else(|| eyre::eyre!("missing column family: id"))?; + let cf_blobs = self + .inner + .content + .cf_handle(CF_BLOBS_V0) + .ok_or_else(|| eyre::eyre!("missing column family: blobs"))?; + let cf_meta = self + .inner + .content + .cf_handle(CF_METADATA_V0) + .ok_or_else(|| eyre::eyre!("missing column family: metadata"))?; + let cf_graph = self + .inner + .content + .cf_handle(CF_GRAPH_V0) + .ok_or_else(|| eyre::eyre!("missing column family: metadata"))?; + + let mut batch = WriteBatch::default(); + batch.put_cf(cf_id, multihash, &id_bytes); + batch.put_cf(cf_blobs, &id_bytes, blob); + batch.put_cf(cf_meta, &id_bytes, metadata_bytes); + batch.put_cf(cf_graph, &id_bytes, graph_bytes); + self.inner.content.write(batch)?; + + Ok(()) + } + + pub async fn get(&self, cid: &Cid) -> Result>> { + match self.get_id(cid).await? { + Some(id) => { + let maybe_blob = self.get_by_id(id).await?; + Ok(maybe_blob) + } + None => Ok(None), + } + } + + pub async fn get_links(&self, cid: &Cid) -> Result>> { + match self.get_id(cid).await? { + Some(id) => { + let maybe_links = self.get_links_by_id(id).await?; + Ok(maybe_links) + } + None => Ok(None), + } + } + + async fn get_id(&self, cid: &Cid) -> Result> { + let cf_id = self + .inner + .content + .cf_handle(CF_ID_V0) + .ok_or_else(|| eyre::eyre!("missing column family: id"))?; + let multihash = cid.hash().to_bytes(); + let maybe_id_bytes = self.inner.content.get_pinned_cf(cf_id, multihash)?; + match maybe_id_bytes { + Some(bytes) => { + let arr = bytes[..8].try_into().map_err(|e| eyre::eyre!("{:?}", e))?; + Ok(Some(u64::from_be_bytes(arr))) + } + None => Ok(None), + } + } + + async fn get_by_id(&self, id: u64) -> Result>> { + let cf_blobs = self + .inner + .content + .cf_handle(CF_BLOBS_V0) + .ok_or_else(|| eyre::eyre!("missing column family: blobs"))?; + let maybe_blob = self + .inner + .content + .get_pinned_cf(cf_blobs, id.to_be_bytes())?; + + Ok(maybe_blob) + } + + async fn get_links_by_id(&self, id: u64) -> Result>> { + let cf_graph = self + .inner + .content + .cf_handle(CF_GRAPH_V0) + .ok_or_else(|| eyre::eyre!("missing column family: graph"))?; + let id_bytes = id.to_be_bytes(); + // FIXME: can't use pinned because otherwise this can trigger alignment issues :/ + match self.inner.content.get_cf(cf_graph, &id_bytes)? { + Some(links_id) => { + let cf_meta = self + .inner + .content + .cf_handle(CF_METADATA_V0) + .ok_or_else(|| eyre::eyre!("missing column family: metadata"))?; + + let graph = rkyv::check_archived_root::>(&links_id) + .map_err(|e| eyre::eyre!("{:?}", e))?; + let keys = graph + .0 + .children + .iter() + .map(|id| (&cf_meta, id.to_be_bytes())); + let meta = self.inner.content.multi_get_cf(keys); + let mut links = Vec::with_capacity(meta.len()); + for (i, meta) in meta.into_iter().enumerate() { + match meta? { + Some(meta) => { + let meta = rkyv::check_archived_root::>(&meta) + .map_err(|e| eyre::eyre!("{:?}", e))?; + let multihash = + cid::multihash::Multihash::from_bytes(&meta.0.multihash)?; + let c = cid::Cid::new_v1(meta.0.codec, multihash); + links.push(c); + } + None => { + bail!("invalid link: {}", graph.0.children[i]); + } + } + } + Ok(Some(links)) + } + None => Ok(None), + } + } + + /// Takes a list of cids and gives them ids, which are boths stored and then returned. + async fn ensure_id_many(&self, cids: I) -> Result> + where + I: IntoIterator, + { + let cf_id = self + .inner + .content + .cf_handle(CF_ID_V0) + .ok_or_else(|| eyre::eyre!("missing column family: id"))?; + + let cf_meta = self + .inner + .content + .cf_handle(CF_METADATA_V0) + .ok_or_else(|| eyre::eyre!("missing column family: metadata"))?; + + let mut ids = Vec::new(); + let mut batch = WriteBatch::default(); + for cid in cids { + let id = self.next_id(); + let id_bytes = id.to_be_bytes(); + + let metadata = Versioned(MetadataV0 { + codec: cid.codec(), + multihash: cid.hash().to_bytes(), + }); + let metadata_bytes = rkyv::to_bytes::<_, 1024>(&metadata)?; // TODO: is 64 bytes the write amount of scratch space? + + let multihash = &metadata.0.multihash; + // TODO: is it worth to check for existence instead of just writing? + batch.put_cf(&cf_id, multihash, &id_bytes); + batch.put_cf(&cf_meta, &id_bytes, metadata_bytes); + ids.push(id); + } + self.inner.content.write(batch)?; + + Ok(ids) + } + + fn next_id(&self) -> u64 { + let id = self.inner.next_id.fetch_add(1, Ordering::SeqCst); + // TODO: better handling + assert!(id > 0, "this store is full"); + id + } +} + +#[cfg(test)] +mod tests { + use super::*; + + use cid::multihash::{Code, MultihashDigest}; + const RAW: u64 = 0x55; + + #[tokio::test] + async fn test_basics() { + let dir = tempfile::tempdir().unwrap(); + let config = Config { + path: dir.path().into(), + }; + + let store = Store::create(config).await.unwrap(); + + let mut values = Vec::new(); + + for i in 0..100 { + let data = vec![i as u8; i * 16]; + let hash = Code::Sha2_256.digest(&data); + let c = cid::Cid::new_v1(RAW, hash); + + let link_hash = Code::Sha2_256.digest(&[(i + 1) as u8; 64]); + let link = cid::Cid::new_v1(RAW, link_hash); + + let links = [link]; + + store.put(c, &data, links).await.unwrap(); + values.push((c, data, links)); + } + + for (i, (c, expected_data, expected_links)) in values.iter().enumerate() { + dbg!(i); + let data = store.get(c).await.unwrap().unwrap(); + assert_eq!(expected_data, &data[..]); + + let links = store.get_links(c).await.unwrap().unwrap(); + assert_eq!(expected_links, &links[..]); + } + } + + #[tokio::test] + async fn test_reopen() { + let dir = tempfile::tempdir().unwrap(); + let config = Config { + path: dir.path().into(), + }; + + let store = Store::create(config.clone()).await.unwrap(); + + let mut values = Vec::new(); + + for i in 0..100 { + let data = vec![i as u8; i * 16]; + let hash = Code::Sha2_256.digest(&data); + let c = cid::Cid::new_v1(RAW, hash); + + let link_hash = Code::Sha2_256.digest(&[(i + 1) as u8; 64]); + let link = cid::Cid::new_v1(RAW, link_hash); + + let links = [link]; + + store.put(c, &data, links).await.unwrap(); + values.push((c, data, links)); + } + + for (c, expected_data, expected_links) in values.iter() { + let data = store.get(c).await.unwrap().unwrap(); + assert_eq!(expected_data, &data[..]); + + let links = store.get_links(c).await.unwrap().unwrap(); + assert_eq!(expected_links, &links[..]); + } + + drop(store); + + let store = Store::open(config).await.unwrap(); + for (c, expected_data, expected_links) in values.iter() { + let data = store.get(c).await.unwrap().unwrap(); + assert_eq!(expected_data, &data[..]); + + let links = store.get_links(c).await.unwrap().unwrap(); + assert_eq!(expected_links, &links[..]); + } + + for i in 100..200 { + let data = vec![i as u8; i * 16]; + let hash = Code::Sha2_256.digest(&data); + let c = cid::Cid::new_v1(RAW, hash); + + let link_hash = Code::Sha2_256.digest(&[(i + 1) as u8; 64]); + let link = cid::Cid::new_v1(RAW, link_hash); + + let links = [link]; + + store.put(c, &data, links).await.unwrap(); + values.push((c, data, links)); + } + + for (c, expected_data, expected_links) in values.iter() { + let data = store.get(c).await.unwrap().unwrap(); + assert_eq!(expected_data, &data[..]); + + let links = store.get_links(c).await.unwrap().unwrap(); + assert_eq!(expected_links, &links[..]); + } + } +}