Skip to content

Commit

Permalink
feat: add iroh-car
Browse files Browse the repository at this point in the history
Implements v1 car file support for iroh.

Closes #9
  • Loading branch information
dignifiedquire committed May 3, 2022
1 parent 8b9c172 commit 52cfa39
Show file tree
Hide file tree
Showing 12 changed files with 490 additions and 0 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
members = [
"iroh",
"iroh-bitswap",
"iroh-car",
"iroh-rpc",
"iroh-gateway",
"iroh-metrics",
Expand Down
24 changes: 24 additions & 0 deletions iroh-car/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
[package]
name = "iroh-car"
version = "0.1.0"
edition = "2021"
authors = ["Friedel Ziegelmayer <[email protected]>"]
license = "Apache-2.0/MIT"
repository = "https://github.com/n0-computer/iroh"
description = "Implementation the car files for iroh"

[dependencies]
cid = "0.8"
ipld = { package = "libipld", version = "0.13"}
ipld-cbor = { package = "libipld-cbor", version = "0.13" }
thiserror = "1.0"
futures = "0.3.5"
integer-encoding = { version = "3.0", features = ["tokio_async"] }
multihash = "0.16"
tokio = { version = "1.0.1", features = ["io-util"] }

[dev-dependencies]
tokio = { version = "1.0.1", features = ["macros", "sync", "rt", "fs", "io-util"] }
async-channel = "1.6.1"

[features]
21 changes: 21 additions & 0 deletions iroh-car/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# iroh-car

> [CAR file](https://ipld.io/specs/transport/car/) support for iroh.
Currently supports only [v1](https://ipld.io/specs/transport/car/carv1/).

## License

<sup>
Licensed under either of <a href="LICENSE-APACHE">Apache License, Version
2.0</a> or <a href="LICENSE-MIT">MIT license</a> at your option.
</sup>

<br/>

<sub>
Unless you explicitly state otherwise, any contribution intentionally submitted
for inclusion in this crate by you, as defined in the Apache-2.0 license, shall
be dual licensed as above, without any additional terms or conditions.
</sub>

26 changes: 26 additions & 0 deletions iroh-car/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
use thiserror::Error;

/// Car utility error
#[derive(Debug, Error)]
pub enum Error {
#[error("Failed to parse CAR file: {0}")]
Parsing(String),
#[error("Invalid CAR file: {0}")]
InvalidFile(String),
#[error("Io error: {0}")]
Io(#[from] std::io::Error),
#[error("Cbor encoding error: {0}")]
Cbor(#[from] ipld::error::Error),
}

impl From<cid::Error> for Error {
fn from(err: cid::Error) -> Error {
Error::Parsing(err.to_string())
}
}

impl From<cid::multihash::Error> for Error {
fn from(err: cid::multihash::Error) -> Error {
Error::Parsing(err.to_string())
}
}
100 changes: 100 additions & 0 deletions iroh-car/src/header.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
use cid::Cid;
use ipld::codec::Codec;
use ipld_cbor::DagCborCodec;

use crate::error::Error;

/// A car header.
#[derive(Debug)]
#[non_exhaustive]
pub enum CarHeader {
V1(CarHeaderV1),
}

impl CarHeader {
pub fn decode(buffer: &[u8]) -> Result<Self, Error> {
let header: CarHeaderV1 = DagCborCodec
.decode(buffer)
.map_err(|e| Error::Parsing(e.to_string()))?;

if header.roots.is_empty() {
return Err(Error::Parsing("empty CAR file".to_owned()));
}

if header.version != 1 {
return Err(Error::InvalidFile(
"Only CAR file version 1 is supported".to_string(),
));
}

Ok(CarHeader::V1(header))
}

pub fn encode(&self) -> Result<Vec<u8>, Error> {
match self {
CarHeader::V1(ref header) => {
let res = DagCborCodec.encode(header)?;
Ok(res)
}
}
}

pub fn roots(&self) -> &[Cid] {
match self {
CarHeader::V1(header) => &header.roots,
}
}

pub fn version(&self) -> u64 {
match self {
CarHeader::V1(_) => 1,
}
}
}

/// CAR file header version 1.
#[derive(Debug, Default, ipld::DagCbor, PartialEq)]
pub struct CarHeaderV1 {
#[ipld]
pub roots: Vec<Cid>,
#[ipld]
pub version: u64,
}

impl CarHeaderV1 {
/// Creates a new CAR file header
pub fn new(roots: Vec<Cid>, version: u64) -> Self {
Self { roots, version }
}
}

impl From<Vec<Cid>> for CarHeaderV1 {
fn from(roots: Vec<Cid>) -> Self {
Self { roots, version: 1 }
}
}

#[cfg(test)]
mod tests {
use ipld::codec::{Decode, Encode};
use ipld_cbor::DagCborCodec;
use multihash::MultihashDigest;

use super::*;

#[test]
fn symmetric_header_v1() {
let digest = multihash::Code::Blake2b256.digest(b"test");
let cid = Cid::new_v1(DagCborCodec.into(), digest);

let header = CarHeaderV1::from(vec![cid]);

let mut bytes = Vec::new();
header.encode(DagCborCodec, &mut bytes).unwrap();

assert_eq!(
CarHeaderV1::decode(DagCborCodec, &mut std::io::Cursor::new(&bytes)).unwrap(),
header
);
}
}
19 changes: 19 additions & 0 deletions iroh-car/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
//! Implementation of the [car](https://ipld.io/specs/transport/car/) format.
mod error;
mod header;
mod reader;
mod util;
mod writer;

pub use crate::header::CarHeader;
pub use crate::reader::CarReader;
pub use crate::writer::CarWriter;

/// IPLD Block
// TODO: find a better home for this
#[derive(Clone, Debug)]
pub struct Block {
pub cid: cid::Cid,
pub data: Vec<u8>,
}
98 changes: 98 additions & 0 deletions iroh-car/src/reader.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
use futures::Stream;
use tokio::io::AsyncRead;

use crate::{
error::Error,
header::CarHeader,
util::{ld_read, read_node},
Block,
};

/// Reads CAR files that are in a BufReader
pub struct CarReader<R> {
reader: R,
header: CarHeader,
buffer: Vec<u8>,
}

impl<R> CarReader<R>
where
R: AsyncRead + Send + Unpin,
{
/// Creates a new CarReader and parses the CarHeader
pub async fn new(mut reader: R) -> Result<Self, Error> {
let mut buffer = Vec::new();

if !ld_read(&mut reader, &mut buffer).await? {
return Err(Error::Parsing(
"failed to parse uvarint for header".to_string(),
));
}

let header = CarHeader::decode(&buffer)?;

Ok(CarReader {
reader,
header,
buffer,
})
}

/// Returns the header of this car file.
pub fn header(&self) -> &CarHeader {
&self.header
}

/// Returns the next IPLD Block in the buffer
pub async fn next_block(&mut self) -> Result<Option<Block>, Error> {
read_node(&mut self.reader, &mut self.buffer).await
}

pub fn stream(self) -> impl Stream<Item = Result<Block, Error>> {
futures::stream::try_unfold(self, |mut this| async move {
let maybe_block = read_node(&mut this.reader, &mut this.buffer).await?;
Ok(maybe_block.map(|b| (b, this)))
})
}
}

#[cfg(test)]
mod tests {
use std::io::Cursor;

use cid::Cid;
use futures::TryStreamExt;
use ipld_cbor::DagCborCodec;
use multihash::MultihashDigest;

use crate::{header::CarHeaderV1, writer::CarWriter};

use super::*;

#[tokio::test]
async fn car_write_read() {
let digest_test = multihash::Code::Blake2b256.digest(b"test");
let cid_test = Cid::new_v1(DagCborCodec.into(), digest_test);

let digest_foo = multihash::Code::Blake2b256.digest(b"foo");
let cid_foo = Cid::new_v1(DagCborCodec.into(), digest_foo);

let header = CarHeader::V1(CarHeaderV1::from(vec![cid_foo]));

let mut buffer = Vec::new();
let mut writer = CarWriter::new(header, &mut buffer);
writer.write(cid_test, b"test").await.unwrap();
writer.write(cid_foo, b"foo").await.unwrap();
writer.finish().await.unwrap();

let reader = Cursor::new(&buffer);
let car_reader = CarReader::new(reader).await.unwrap();
let files: Vec<Block> = car_reader.stream().try_collect().await.unwrap();

assert_eq!(files.len(), 2);
assert_eq!(files[0].cid, cid_test);
assert_eq!(files[0].data, b"test");
assert_eq!(files[1].cid, cid_foo);
assert_eq!(files[1].data, b"foo");
}
}
78 changes: 78 additions & 0 deletions iroh-car/src/util.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
use cid::Cid;
use integer_encoding::VarIntAsyncReader;
use tokio::io::{AsyncRead, AsyncReadExt};

use super::error::Error;
use crate::Block;

pub(crate) async fn ld_read<R>(mut reader: R, buf: &mut Vec<u8>) -> Result<bool, Error>
where
R: AsyncRead + Send + Unpin,
{
let l: usize = match VarIntAsyncReader::read_varint_async(&mut reader).await {
Ok(len) => len,
Err(e) => {
if e.kind() == std::io::ErrorKind::UnexpectedEof {
return Ok(false);
}
return Err(Error::Parsing(e.to_string()));
}
};

buf.clear();
reader
.take(l as u64)
.read_to_end(buf)
.await
.map_err(|e| Error::Parsing(e.to_string()))?;
Ok(true)
}

pub(crate) async fn read_node<R>(
buf_reader: &mut R,
buf: &mut Vec<u8>,
) -> Result<Option<Block>, Error>
where
R: AsyncRead + Send + Unpin,
{
if ld_read(buf_reader, buf).await? {
let mut cursor = std::io::Cursor::new(&buf);
let c = Cid::read_bytes(&mut cursor)?;
let pos = cursor.position() as usize;

return Ok(Some(Block {
cid: c,
data: buf[pos..].to_vec(),
}));
}
Ok(None)
}

#[cfg(test)]
mod tests {
use integer_encoding::VarIntAsyncWriter;
use tokio::io::{AsyncWrite, AsyncWriteExt};

use super::*;

async fn ld_write<'a, W>(writer: &mut W, bytes: &[u8]) -> Result<(), Error>
where
W: AsyncWrite + Send + Unpin,
{
writer.write_varint_async(bytes.len()).await?;
writer.write_all(bytes).await?;
writer.flush().await?;
Ok(())
}

#[tokio::test]
async fn ld_read_write() {
let mut buffer = Vec::<u8>::new();
ld_write(&mut buffer, b"test bytes").await.unwrap();
let reader = std::io::Cursor::new(buffer);
let mut buffer = Vec::new();
let read = ld_read(reader, &mut buffer).await.unwrap();
assert!(read);
assert_eq!(&buffer, b"test bytes");
}
}
Loading

0 comments on commit 52cfa39

Please sign in to comment.