Skip to content

Commit

Permalink
feat: use the store to cache received data
Browse files Browse the repository at this point in the history
  • Loading branch information
dignifiedquire authored May 19, 2022
1 parent 3f724a5 commit 9dac53a
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 28 deletions.
119 changes: 114 additions & 5 deletions iroh-resolver/src/resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ use std::str::FromStr;

use anyhow::{anyhow, bail, ensure, Context, Result};
use bytes::Bytes;
use cid::multihash::{Code, MultihashDigest};
use cid::Cid;
use iroh_rpc_client::Client;
use libipld::codec::Encode;
use libipld::codec::{Decode, Encode};
use libipld::prelude::Codec as _;
use libipld::{Ipld, IpldCodec};
use tracing::{debug, trace, warn};

use crate::codecs::Codec;
use crate::unixfs::{DataType, UnixfsNode};
Expand Down Expand Up @@ -288,10 +290,74 @@ impl Resolver {
/// Loads the actual content of a given cid.
#[tracing::instrument(skip(self))]
async fn load_cid(&self, cid: &Cid) -> Result<Bytes> {
// TODO: better strategies
let providers = None;
let res = self.rpc.p2p.fetch_bitswap(*cid, providers).await?;
Ok(res)
trace!("loading cid");
// TODO: better strategy

let cid = *cid;
match self.rpc.store.get(cid).await {
Ok(Some(data)) => {
trace!("retrieved from store");
return Ok(data);
}
Ok(None) => {}
Err(err) => {
warn!("failed to fetch data from store {}: {:?}", cid, err);
}
}

let providers = self.rpc.p2p.fetch_providers(&cid).await?;
let bytes = self.rpc.p2p.fetch_bitswap(cid, Some(providers)).await?;

// TODO: is this the right place?
// verify cid
match Code::try_from(cid.hash().code()) {
Ok(code) => {
let bytes = bytes.clone();
tokio::task::spawn_blocking(move || {
let calculated_hash = code.digest(&bytes);
if &calculated_hash != cid.hash() {
bail!(
"invalid data returned {:?} != {:?}",
calculated_hash,
cid.hash()
);
}
Ok(())
})
.await??;
}
Err(_) => {
warn!(
"unable to verify hash, unknown hash function {} for {}",
cid.hash().code(),
cid
);
}
}

// trigger storage in the background
let cloned = bytes.clone();
let rpc = self.rpc.clone();
tokio::spawn(async move {
let clone2 = cloned.clone();
let links =
tokio::task::spawn_blocking(move || parse_links(&cid, &clone2).unwrap_or_default())
.await
.unwrap_or_default();

let len = cloned.len();
let links_len = links.len();
match rpc.store.put(cid, cloned, links).await {
Ok(_) => debug!("stored {} ({}bytes, {}links)", cid, len, links_len),
Err(err) => {
warn!("failed to store {}: {:?}", cid, err);
}
}
});

trace!("retrieved from p2p");

Ok(bytes)
}

/// Resolves a dnslink at the given domain.
Expand All @@ -301,6 +367,22 @@ impl Resolver {
}
}

fn parse_links(cid: &Cid, bytes: &[u8]) -> Result<Vec<Cid>> {
let codec = Codec::try_from(cid.codec()).context("unknown codec")?;
let codec = match codec {
Codec::DagPb => IpldCodec::DagPb,
Codec::DagCbor => IpldCodec::DagCbor,
Codec::DagJson => IpldCodec::DagJson,
_ => bail!("unsupported codec {:?}", codec),
};

let decoded: Ipld = Ipld::decode(codec, &mut std::io::Cursor::new(bytes))?;
let mut links = Vec::new();
decoded.references(&mut links);

Ok(links)
}

#[cfg(test)]
mod tests {
use std::collections::BTreeMap;
Expand Down Expand Up @@ -353,10 +435,37 @@ mod tests {
let mut map = BTreeMap::new();
map.insert("name".to_string(), Ipld::String("Foo".to_string()));
map.insert("details".to_string(), Ipld::List(vec![Ipld::Integer(1)]));
map.insert(
"my-link".to_string(),
Ipld::Link(
"bafkreigh2akiscaildcqabsyg3dfr6chu3fgpregiymsck7e7aqa4s52zy"
.parse()
.unwrap(),
),
);

Ipld::Map(map)
}

#[test]
fn test_parse_links() {
for codec in [IpldCodec::DagCbor, IpldCodec::DagJson] {
let ipld = make_ipld();

let mut bytes = Vec::new();
ipld.encode(codec, &mut bytes).unwrap();
let digest = Code::Blake3_256.digest(&bytes);
let c = Cid::new_v1(codec.into(), digest);

let links = parse_links(&c, &bytes).unwrap();
assert_eq!(links.len(), 1);
assert_eq!(
links[0].to_string(),
"bafkreigh2akiscaildcqabsyg3dfr6chu3fgpregiymsck7e7aqa4s52zy"
);
}
}

#[tokio::test]
async fn test_resolve_ipld() {
for codec in [IpldCodec::DagCbor, IpldCodec::DagJson] {
Expand Down
6 changes: 4 additions & 2 deletions iroh-rpc-client/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,10 @@ impl P2pClient {
}

#[tracing::instrument(skip(self))]
pub async fn fetch_provider(&self, key: &[u8]) -> Result<HashSet<PeerId>> {
let req = iroh_metrics::req::trace_tonic_req(Key { key: key.into() });
pub async fn fetch_providers(&self, key: &Cid) -> Result<HashSet<PeerId>> {
let req = iroh_metrics::req::trace_tonic_req(Key {
key: key.to_bytes(),
});
let res = self.0.lock().await.fetch_provider(req).await?;
let mut providers = HashSet::new();
for provider in res.into_inner().providers.into_iter() {
Expand Down
40 changes: 19 additions & 21 deletions iroh-store/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::sync::{
Arc,
};

use anyhow::{bail, Context, Result};
use anyhow::{anyhow, bail, Context, Result};
use cid::Cid;
use iroh_rpc_client::Client as RpcClient;
use rocksdb::{DBPinnableSlice, IteratorMode, Options, WriteBatch, DB as RocksDb};
Expand Down Expand Up @@ -97,7 +97,7 @@ impl Store {
let next_id = {
let cf_meta = db
.cf_handle(CF_METADATA_V0)
.ok_or_else(|| anyhow::anyhow!("missing column family: metadata"))?;
.ok_or_else(|| anyhow!("missing column family: metadata"))?;

let mut iter = db.full_iterator_cf(&cf_meta, IteratorMode::End);
let last_id = iter
Expand All @@ -117,7 +117,7 @@ impl Store {
.await
// TODO: first conflict between `anyhow` & `anyhow`
// .map_err(|e| e.context("Error creating rpc client for store"))?;
.map_err(|e| anyhow::anyhow!("Error creating rpc client for store: {:?}", e))?;
.map_err(|e| anyhow!("Error creating rpc client for store: {:?}", e))?;

Ok(Store {
inner: Arc::new(InnerStore {
Expand All @@ -141,34 +141,34 @@ impl Store {
codec: cid.codec(),
multihash: cid.hash().to_bytes(),
});
let metadata_bytes = rkyv::to_bytes::<_, 1024>(&metadata)?; // TODO: is 64 bytes the write amount of scratch space?
let metadata_bytes = rkyv::to_bytes::<_, 1024>(&metadata)?; // TODO: is this the right amount of scratch space?
let multihash = &metadata.0.multihash;

let children = self.ensure_id_many(links.into_iter()).await?;

let graph = Versioned(GraphV0 { children });
let graph_bytes = rkyv::to_bytes::<_, 1024>(&graph)?; // TODO: is 64 bytes the write amount of scratch space?
let graph_bytes = rkyv::to_bytes::<_, 1024>(&graph)?; // TODO: is this the right amount of scratch space?

let cf_id = self
.inner
.content
.cf_handle(CF_ID_V0)
.ok_or_else(|| anyhow::anyhow!("missing column family: id"))?;
.ok_or_else(|| anyhow!("missing column family: id"))?;
let cf_blobs = self
.inner
.content
.cf_handle(CF_BLOBS_V0)
.ok_or_else(|| anyhow::anyhow!("missing column family: blobs"))?;
.ok_or_else(|| anyhow!("missing column family: blobs"))?;
let cf_meta = self
.inner
.content
.cf_handle(CF_METADATA_V0)
.ok_or_else(|| anyhow::anyhow!("missing column family: metadata"))?;
.ok_or_else(|| anyhow!("missing column family: metadata"))?;
let cf_graph = self
.inner
.content
.cf_handle(CF_GRAPH_V0)
.ok_or_else(|| anyhow::anyhow!("missing column family: metadata"))?;
.ok_or_else(|| anyhow!("missing column family: metadata"))?;

let mut batch = WriteBatch::default();
batch.put_cf(cf_id, multihash, &id_bytes);
Expand Down Expand Up @@ -207,14 +207,12 @@ impl Store {
.inner
.content
.cf_handle(CF_ID_V0)
.ok_or_else(|| anyhow::anyhow!("missing column family: id"))?;
.ok_or_else(|| anyhow!("missing column family: id"))?;
let multihash = cid.hash().to_bytes();
let maybe_id_bytes = self.inner.content.get_pinned_cf(cf_id, multihash)?;
match maybe_id_bytes {
Some(bytes) => {
let arr = bytes[..8]
.try_into()
.map_err(|e| anyhow::anyhow!("{:?}", e))?;
let arr = bytes[..8].try_into().map_err(|e| anyhow!("{:?}", e))?;
Ok(Some(u64::from_be_bytes(arr)))
}
None => Ok(None),
Expand All @@ -226,7 +224,7 @@ impl Store {
.inner
.content
.cf_handle(CF_BLOBS_V0)
.ok_or_else(|| anyhow::anyhow!("missing column family: blobs"))?;
.ok_or_else(|| anyhow!("missing column family: blobs"))?;
let maybe_blob = self
.inner
.content
Expand All @@ -240,7 +238,7 @@ impl Store {
.inner
.content
.cf_handle(CF_GRAPH_V0)
.ok_or_else(|| anyhow::anyhow!("missing column family: graph"))?;
.ok_or_else(|| anyhow!("missing column family: graph"))?;
let id_bytes = id.to_be_bytes();
// FIXME: can't use pinned because otherwise this can trigger alignment issues :/
match self.inner.content.get_cf(cf_graph, &id_bytes)? {
Expand All @@ -249,10 +247,10 @@ impl Store {
.inner
.content
.cf_handle(CF_METADATA_V0)
.ok_or_else(|| anyhow::anyhow!("missing column family: metadata"))?;
.ok_or_else(|| anyhow!("missing column family: metadata"))?;

let graph = rkyv::check_archived_root::<Versioned<GraphV0>>(&links_id)
.map_err(|e| anyhow::anyhow!("{:?}", e))?;
.map_err(|e| anyhow!("{:?}", e))?;
let keys = graph
.0
.children
Expand All @@ -264,7 +262,7 @@ impl Store {
match meta? {
Some(meta) => {
let meta = rkyv::check_archived_root::<Versioned<MetadataV0>>(&meta)
.map_err(|e| anyhow::anyhow!("{:?}", e))?;
.map_err(|e| anyhow!("{:?}", e))?;
let multihash =
cid::multihash::Multihash::from_bytes(&meta.0.multihash)?;
let c = cid::Cid::new_v1(meta.0.codec, multihash);
Expand All @@ -290,13 +288,13 @@ impl Store {
.inner
.content
.cf_handle(CF_ID_V0)
.ok_or_else(|| anyhow::anyhow!("missing column family: id"))?;
.ok_or_else(|| anyhow!("missing column family: id"))?;

let cf_meta = self
.inner
.content
.cf_handle(CF_METADATA_V0)
.ok_or_else(|| anyhow::anyhow!("missing column family: metadata"))?;
.ok_or_else(|| anyhow!("missing column family: metadata"))?;

let mut ids = Vec::new();
let mut batch = WriteBatch::default();
Expand All @@ -308,7 +306,7 @@ impl Store {
codec: cid.codec(),
multihash: cid.hash().to_bytes(),
});
let metadata_bytes = rkyv::to_bytes::<_, 1024>(&metadata)?; // TODO: is 64 bytes the write amount of scratch space?
let metadata_bytes = rkyv::to_bytes::<_, 1024>(&metadata)?; // TODO: is this the right amount of scratch space?

let multihash = &metadata.0.multihash;
// TODO: is it worth to check for existence instead of just writing?
Expand Down

0 comments on commit 9dac53a

Please sign in to comment.