diff --git a/Cargo.toml b/Cargo.toml index 1bfe57f7b8..536019652c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,6 +2,7 @@ members = [ "iroh", "iroh-bitswap", + "iroh-car", "iroh-rpc", "iroh-gateway", "iroh-metrics", diff --git a/iroh-car/Cargo.toml b/iroh-car/Cargo.toml new file mode 100644 index 0000000000..978d3074f2 --- /dev/null +++ b/iroh-car/Cargo.toml @@ -0,0 +1,24 @@ +[package] +name = "iroh-car" +version = "0.1.0" +edition = "2021" +authors = ["Friedel Ziegelmayer "] +license = "Apache-2.0/MIT" +repository = "https://github.com/n0-computer/iroh" +description = "Implementation the car files for iroh" + +[dependencies] +cid = "0.8" +ipld = { package = "libipld", version = "0.13"} +ipld-cbor = { package = "libipld-cbor", version = "0.13" } +thiserror = "1.0" +futures = "0.3.5" +integer-encoding = { version = "3.0", features = ["tokio_async"] } +multihash = "0.16" +tokio = { version = "1.0.1", features = ["io-util"] } + +[dev-dependencies] +tokio = { version = "1.0.1", features = ["macros", "sync", "rt", "fs", "io-util"] } +async-channel = "1.6.1" + +[features] diff --git a/iroh-car/README.md b/iroh-car/README.md new file mode 100644 index 0000000000..a125fcf256 --- /dev/null +++ b/iroh-car/README.md @@ -0,0 +1,21 @@ +# iroh-car + +> [CAR file](https://ipld.io/specs/transport/car/) support for iroh. + +Currently supports only [v1](https://ipld.io/specs/transport/car/carv1/). + +## License + + +Licensed under either of Apache License, Version +2.0 or MIT license at your option. + + +
+ + +Unless you explicitly state otherwise, any contribution intentionally submitted +for inclusion in this crate by you, as defined in the Apache-2.0 license, shall +be dual licensed as above, without any additional terms or conditions. + + diff --git a/iroh-car/src/error.rs b/iroh-car/src/error.rs new file mode 100644 index 0000000000..524bdd2924 --- /dev/null +++ b/iroh-car/src/error.rs @@ -0,0 +1,26 @@ +use thiserror::Error; + +/// Car utility error +#[derive(Debug, Error)] +pub enum Error { + #[error("Failed to parse CAR file: {0}")] + Parsing(String), + #[error("Invalid CAR file: {0}")] + InvalidFile(String), + #[error("Io error: {0}")] + Io(#[from] std::io::Error), + #[error("Cbor encoding error: {0}")] + Cbor(#[from] ipld::error::Error), +} + +impl From for Error { + fn from(err: cid::Error) -> Error { + Error::Parsing(err.to_string()) + } +} + +impl From for Error { + fn from(err: cid::multihash::Error) -> Error { + Error::Parsing(err.to_string()) + } +} diff --git a/iroh-car/src/header.rs b/iroh-car/src/header.rs new file mode 100644 index 0000000000..21a582783f --- /dev/null +++ b/iroh-car/src/header.rs @@ -0,0 +1,100 @@ +use cid::Cid; +use ipld::codec::Codec; +use ipld_cbor::DagCborCodec; + +use crate::error::Error; + +/// A car header. +#[derive(Debug)] +#[non_exhaustive] +pub enum CarHeader { + V1(CarHeaderV1), +} + +impl CarHeader { + pub fn decode(buffer: &[u8]) -> Result { + let header: CarHeaderV1 = DagCborCodec + .decode(buffer) + .map_err(|e| Error::Parsing(e.to_string()))?; + + if header.roots.is_empty() { + return Err(Error::Parsing("empty CAR file".to_owned())); + } + + if header.version != 1 { + return Err(Error::InvalidFile( + "Only CAR file version 1 is supported".to_string(), + )); + } + + Ok(CarHeader::V1(header)) + } + + pub fn encode(&self) -> Result, Error> { + match self { + CarHeader::V1(ref header) => { + let res = DagCborCodec.encode(header)?; + Ok(res) + } + } + } + + pub fn roots(&self) -> &[Cid] { + match self { + CarHeader::V1(header) => &header.roots, + } + } + + pub fn version(&self) -> u64 { + match self { + CarHeader::V1(_) => 1, + } + } +} + +/// CAR file header version 1. +#[derive(Debug, Default, ipld::DagCbor, PartialEq)] +pub struct CarHeaderV1 { + #[ipld] + pub roots: Vec, + #[ipld] + pub version: u64, +} + +impl CarHeaderV1 { + /// Creates a new CAR file header + pub fn new(roots: Vec, version: u64) -> Self { + Self { roots, version } + } +} + +impl From> for CarHeaderV1 { + fn from(roots: Vec) -> Self { + Self { roots, version: 1 } + } +} + +#[cfg(test)] +mod tests { + use ipld::codec::{Decode, Encode}; + use ipld_cbor::DagCborCodec; + use multihash::MultihashDigest; + + use super::*; + + #[test] + fn symmetric_header_v1() { + let digest = multihash::Code::Blake2b256.digest(b"test"); + let cid = Cid::new_v1(DagCborCodec.into(), digest); + + let header = CarHeaderV1::from(vec![cid]); + + let mut bytes = Vec::new(); + header.encode(DagCborCodec, &mut bytes).unwrap(); + + assert_eq!( + CarHeaderV1::decode(DagCborCodec, &mut std::io::Cursor::new(&bytes)).unwrap(), + header + ); + } +} diff --git a/iroh-car/src/lib.rs b/iroh-car/src/lib.rs new file mode 100644 index 0000000000..b28867480a --- /dev/null +++ b/iroh-car/src/lib.rs @@ -0,0 +1,19 @@ +//! Implementation of the [car](https://ipld.io/specs/transport/car/) format. + +mod error; +mod header; +mod reader; +mod util; +mod writer; + +pub use crate::header::CarHeader; +pub use crate::reader::CarReader; +pub use crate::writer::CarWriter; + +/// IPLD Block +// TODO: find a better home for this +#[derive(Clone, Debug)] +pub struct Block { + pub cid: cid::Cid, + pub data: Vec, +} diff --git a/iroh-car/src/reader.rs b/iroh-car/src/reader.rs new file mode 100644 index 0000000000..9ae6bccc0b --- /dev/null +++ b/iroh-car/src/reader.rs @@ -0,0 +1,98 @@ +use futures::Stream; +use tokio::io::AsyncRead; + +use crate::{ + error::Error, + header::CarHeader, + util::{ld_read, read_node}, + Block, +}; + +/// Reads CAR files that are in a BufReader +pub struct CarReader { + reader: R, + header: CarHeader, + buffer: Vec, +} + +impl CarReader +where + R: AsyncRead + Send + Unpin, +{ + /// Creates a new CarReader and parses the CarHeader + pub async fn new(mut reader: R) -> Result { + let mut buffer = Vec::new(); + + if !ld_read(&mut reader, &mut buffer).await? { + return Err(Error::Parsing( + "failed to parse uvarint for header".to_string(), + )); + } + + let header = CarHeader::decode(&buffer)?; + + Ok(CarReader { + reader, + header, + buffer, + }) + } + + /// Returns the header of this car file. + pub fn header(&self) -> &CarHeader { + &self.header + } + + /// Returns the next IPLD Block in the buffer + pub async fn next_block(&mut self) -> Result, Error> { + read_node(&mut self.reader, &mut self.buffer).await + } + + pub fn stream(self) -> impl Stream> { + futures::stream::try_unfold(self, |mut this| async move { + let maybe_block = read_node(&mut this.reader, &mut this.buffer).await?; + Ok(maybe_block.map(|b| (b, this))) + }) + } +} + +#[cfg(test)] +mod tests { + use std::io::Cursor; + + use cid::Cid; + use futures::TryStreamExt; + use ipld_cbor::DagCborCodec; + use multihash::MultihashDigest; + + use crate::{header::CarHeaderV1, writer::CarWriter}; + + use super::*; + + #[tokio::test] + async fn car_write_read() { + let digest_test = multihash::Code::Blake2b256.digest(b"test"); + let cid_test = Cid::new_v1(DagCborCodec.into(), digest_test); + + let digest_foo = multihash::Code::Blake2b256.digest(b"foo"); + let cid_foo = Cid::new_v1(DagCborCodec.into(), digest_foo); + + let header = CarHeader::V1(CarHeaderV1::from(vec![cid_foo])); + + let mut buffer = Vec::new(); + let mut writer = CarWriter::new(header, &mut buffer); + writer.write(cid_test, b"test").await.unwrap(); + writer.write(cid_foo, b"foo").await.unwrap(); + writer.finish().await.unwrap(); + + let reader = Cursor::new(&buffer); + let car_reader = CarReader::new(reader).await.unwrap(); + let files: Vec = car_reader.stream().try_collect().await.unwrap(); + + assert_eq!(files.len(), 2); + assert_eq!(files[0].cid, cid_test); + assert_eq!(files[0].data, b"test"); + assert_eq!(files[1].cid, cid_foo); + assert_eq!(files[1].data, b"foo"); + } +} diff --git a/iroh-car/src/util.rs b/iroh-car/src/util.rs new file mode 100644 index 0000000000..f2901832be --- /dev/null +++ b/iroh-car/src/util.rs @@ -0,0 +1,78 @@ +use cid::Cid; +use integer_encoding::VarIntAsyncReader; +use tokio::io::{AsyncRead, AsyncReadExt}; + +use super::error::Error; +use crate::Block; + +pub(crate) async fn ld_read(mut reader: R, buf: &mut Vec) -> Result +where + R: AsyncRead + Send + Unpin, +{ + let l: usize = match VarIntAsyncReader::read_varint_async(&mut reader).await { + Ok(len) => len, + Err(e) => { + if e.kind() == std::io::ErrorKind::UnexpectedEof { + return Ok(false); + } + return Err(Error::Parsing(e.to_string())); + } + }; + + buf.clear(); + reader + .take(l as u64) + .read_to_end(buf) + .await + .map_err(|e| Error::Parsing(e.to_string()))?; + Ok(true) +} + +pub(crate) async fn read_node( + buf_reader: &mut R, + buf: &mut Vec, +) -> Result, Error> +where + R: AsyncRead + Send + Unpin, +{ + if ld_read(buf_reader, buf).await? { + let mut cursor = std::io::Cursor::new(&buf); + let c = Cid::read_bytes(&mut cursor)?; + let pos = cursor.position() as usize; + + return Ok(Some(Block { + cid: c, + data: buf[pos..].to_vec(), + })); + } + Ok(None) +} + +#[cfg(test)] +mod tests { + use integer_encoding::VarIntAsyncWriter; + use tokio::io::{AsyncWrite, AsyncWriteExt}; + + use super::*; + + async fn ld_write<'a, W>(writer: &mut W, bytes: &[u8]) -> Result<(), Error> + where + W: AsyncWrite + Send + Unpin, + { + writer.write_varint_async(bytes.len()).await?; + writer.write_all(bytes).await?; + writer.flush().await?; + Ok(()) + } + + #[tokio::test] + async fn ld_read_write() { + let mut buffer = Vec::::new(); + ld_write(&mut buffer, b"test bytes").await.unwrap(); + let reader = std::io::Cursor::new(buffer); + let mut buffer = Vec::new(); + let read = ld_read(reader, &mut buffer).await.unwrap(); + assert!(read); + assert_eq!(&buffer, b"test bytes"); + } +} diff --git a/iroh-car/src/writer.rs b/iroh-car/src/writer.rs new file mode 100644 index 0000000000..9f17eb9e93 --- /dev/null +++ b/iroh-car/src/writer.rs @@ -0,0 +1,71 @@ +use cid::Cid; +use integer_encoding::VarIntAsyncWriter; +use tokio::io::{AsyncWrite, AsyncWriteExt}; + +use crate::{error::Error, header::CarHeader}; + +#[derive(Debug)] +pub struct CarWriter { + header: CarHeader, + writer: W, + cid_buffer: Vec, + is_header_written: bool, +} + +impl CarWriter +where + W: AsyncWrite + Send + Unpin, +{ + pub fn new(header: CarHeader, writer: W) -> Self { + CarWriter { + header, + writer, + cid_buffer: Vec::new(), + is_header_written: false, + } + } + + /// Writes header and stream of data to writer in Car format. + pub async fn write(&mut self, cid: Cid, data: T) -> Result<(), Error> + where + T: AsRef<[u8]>, + { + if !self.is_header_written { + // Write header bytes + let header_bytes = self.header.encode()?; + self.writer.write_varint_async(header_bytes.len()).await?; + self.writer.write_all(&header_bytes).await?; + self.is_header_written = true; + } + + // Write the given block. + self.cid_buffer.clear(); + cid.write_bytes(&mut self.cid_buffer).expect("vec write"); + + let data = data.as_ref(); + let len = self.cid_buffer.len() + data.len(); + + self.writer.write_varint_async(len).await?; + self.writer.write_all(&self.cid_buffer).await?; + self.writer.write_all(data).await?; + + Ok(()) + } + + /// Finishes writing, including flushing and returns the writer. + pub async fn finish(mut self) -> Result { + self.flush().await?; + Ok(self.writer) + } + + /// Flushes the underlying writer. + pub async fn flush(&mut self) -> Result<(), Error> { + self.writer.flush().await?; + Ok(()) + } + + /// Consumes the [`CarWriter`] and returns the underlying writer. + pub fn into_inner(self) -> W { + self.writer + } +} diff --git a/iroh-car/tests/car_file_test.rs b/iroh-car/tests/car_file_test.rs new file mode 100644 index 0000000000..c32a65da2a --- /dev/null +++ b/iroh-car/tests/car_file_test.rs @@ -0,0 +1,52 @@ +use futures::TryStreamExt; +use iroh_car::*; +use tokio::fs::File; +use tokio::io::BufReader; + +#[tokio::test] +async fn read_carv1_test_file() { + let file = File::open("tests/testv1.car").await.unwrap(); + let buf_reader = BufReader::new(file); + + let car_reader = CarReader::new(buf_reader).await.unwrap(); + let files: Vec<_> = car_reader.stream().try_collect().await.unwrap(); + assert_eq!(files.len(), 35); +} + +#[tokio::test] +async fn read_carv1_basic_fixtures_file() { + let file = File::open("tests/carv1_basic.car").await.unwrap(); + let buf_reader = BufReader::new(file); + + let car_reader = CarReader::new(buf_reader).await.unwrap(); + + assert_eq!( + car_reader.header().roots(), + [ + "bafyreihyrpefhacm6kkp4ql6j6udakdit7g3dmkzfriqfykhjw6cad5lrm" + .parse() + .unwrap(), + "bafyreidj5idub6mapiupjwjsyyxhyhedxycv4vihfsicm2vt46o7morwlm" + .parse() + .unwrap() + ] + ); + + let files: Vec<_> = car_reader.stream().try_collect().await.unwrap(); + assert_eq!(files.len(), 8); + + let cids = [ + "bafyreihyrpefhacm6kkp4ql6j6udakdit7g3dmkzfriqfykhjw6cad5lrm", + "QmNX6Tffavsya4xgBi2VJQnSuqy9GsxongxZZ9uZBqp16d", + "bafkreifw7plhl6mofk6sfvhnfh64qmkq73oeqwl6sloru6rehaoujituke", + "QmWXZxVQ9yZfhQxLD35eDR8LiMRsYtHxYqTFCBbJoiJVys", + "bafkreiebzrnroamgos2adnbpgw5apo3z4iishhbdx77gldnbk57d4zdio4", + "QmdwjhxpxzcMsR3qUuj7vUL8pbA7MgR3GAxWi2GLHjsKCT", + "bafkreidbxzk2ryxwwtqxem4l3xyyjvw35yu4tcct4cqeqxwo47zhxgxqwq", + "bafyreidj5idub6mapiupjwjsyyxhyhedxycv4vihfsicm2vt46o7morwlm", + ]; + + for (cid, file) in cids.iter().zip(files) { + assert_eq!(file.cid, cid.parse().unwrap()); + } +} diff --git a/iroh-car/tests/carv1_basic.car b/iroh-car/tests/carv1_basic.car new file mode 100644 index 0000000000..48c67a3d8d Binary files /dev/null and b/iroh-car/tests/carv1_basic.car differ diff --git a/iroh-car/tests/testv1.car b/iroh-car/tests/testv1.car new file mode 100644 index 0000000000..cdc9a136c5 Binary files /dev/null and b/iroh-car/tests/testv1.car differ