Skip to content

Commit

Permalink
refactor: handle nonce as incoming string/arraybuf
Browse files Browse the repository at this point in the history
Includes:
   * flexibly handling normcore-json or dag-json, but more flexible
     for the former.
   * cbor as input over the websocket listeners (vs just json).
   * Additional testing across the board, including with cbor.
  • Loading branch information
zeeshanlakhani committed Mar 11, 2024
1 parent 4118586 commit 0c0306a
Show file tree
Hide file tree
Showing 14 changed files with 543 additions and 52 deletions.
6 changes: 3 additions & 3 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ private
.DS_Store
homestar-functions/**/out/*.png
homestar-wasm/out
homestar-invocation/test_*
homestar-runtime/fixtures/test_*
homestar-invocation/test*
homestar-runtime/fixtures/test*
homestar-runtime/tests/fixtures/*.toml
homestar-workflow/fixtures/test_*
homestar-workflow/fixtures/test*
.zed
result-alejandra
report.json
Expand Down
3 changes: 3 additions & 0 deletions homestar-invocation/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ pub enum Error<T> {
/// `Display` methods through to an underlying error.
#[error("cannot convert from Ipld structure: {0}")]
FromIpld(#[from] libipld::error::SerdeError),
/// Error with a [libipld::multibase] encoding/decoding.
#[error("failed to decode/encode structure: {0}")]
FromMultibase(#[from] libipld::multibase::Error),
/// Invalid match discriminant or enumeration.
#[error("invalid discriminant {0:#?}")]
InvalidDiscriminant(T),
Expand Down
46 changes: 45 additions & 1 deletion homestar-invocation/src/ipld/dag_cbor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,15 @@
use crate::{consts::DAG_CBOR, Error, Unit};
use libipld::{
cbor::DagCborCodec,
json::DagJsonCodec,
multihash::{Code, MultihashDigest},
prelude::Codec,
prelude::{Codec, Decode},
Cid, Ipld,
};
use std::{
fs,
io::{Cursor, Write},
};

/// Trait for DagCbor-related encode/decode.
pub trait DagCbor
Expand All @@ -21,6 +26,45 @@ where
let hash = Code::Sha3_256.digest(&bytes);
Ok(Cid::new_v1(DAG_CBOR, hash))
}

/// Serialize `Self` to JSON bytes.
fn to_dag_json(self) -> Result<Vec<u8>, Error<Unit>> {
let ipld: Ipld = self.into();
Ok(DagJsonCodec.encode(&ipld)?)
}

/// Serialize `Self` to JSON [String].
fn to_dagjson_string(self) -> Result<String, Error<Unit>> {
let encoded = self.to_dag_json()?;
// JSON spec requires UTF-8 support
let s = std::str::from_utf8(&encoded)?;
Ok(s.to_string())
}

/// Serialize `Self` to CBOR bytes.
fn to_cbor(self) -> Result<Vec<u8>, Error<Unit>> {
let ipld: Ipld = self.into();
Ok(DagCborCodec.encode(&ipld)?)
}

/// Deserialize `Self` from CBOR bytes.
fn from_cbor(data: &[u8]) -> Result<Self, Error<Unit>>
where
Self: TryFrom<Ipld>,
{
let ipld = Ipld::decode(DagCborCodec, &mut Cursor::new(data))?;
let from_ipld = Self::try_from(ipld).map_err(|_err| {
Error::<Unit>::UnexpectedIpldType(Ipld::String(
"Failed to convert Ipld to expected type".to_string(),
))
})?;
Ok(from_ipld)
}

/// Serialize `Self` to a CBOR file.
fn to_cbor_file(self, filename: String) -> Result<(), Error<Unit>> {
Ok(fs::File::create(filename)?.write_all(&self.to_cbor()?)?)
}
}

/// Trait for DagCbor-related encode/decode for references.
Expand Down
64 changes: 58 additions & 6 deletions homestar-invocation/src/task/instruction/nonce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use generic_array::{
use libipld::{multibase::Base::Base32HexLower, Ipld};
use schemars::{
gen::SchemaGenerator,
schema::{InstanceType, Metadata, Schema, SchemaObject, SingleOrVec},
schema::{InstanceType, Metadata, Schema, SchemaObject, SingleOrVec, StringValidation},
JsonSchema,
};
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -75,14 +75,22 @@ impl TryFrom<Ipld> for Nonce {
type Error = Error<Unit>;

fn try_from(ipld: Ipld) -> Result<Self, Self::Error> {
if let Ipld::Bytes(v) = ipld {
match v.len() {
match ipld {
Ipld::String(s) if s.is_empty() => Ok(Nonce::Empty),
Ipld::String(s) => {
let bytes = Base32HexLower.decode(s)?;
match bytes.len() {
12 => Ok(Nonce::Nonce96(*GenericArray::from_slice(&bytes))),
16 => Ok(Nonce::Nonce128(*GenericArray::from_slice(&bytes))),
other => Err(Error::unexpected_ipld(other.to_owned().into())),
}
}
Ipld::Bytes(v) => match v.len() {
12 => Ok(Nonce::Nonce96(*GenericArray::from_slice(&v))),
16 => Ok(Nonce::Nonce128(*GenericArray::from_slice(&v))),
other_ipld => Err(Error::unexpected_ipld(other_ipld.to_owned().into())),
}
} else {
Ok(Nonce::Empty)
},
_ => Ok(Nonce::Empty),
}
}
}
Expand Down Expand Up @@ -122,9 +130,23 @@ impl JsonSchema for Nonce {
..Default::default()
};

let non_empty_string = SchemaObject {
instance_type: Some(SingleOrVec::Single(InstanceType::String.into())),
metadata: Some(Box::new(Metadata {
description: Some("A 12-byte or 16-byte nonce encoded as a string, which expects to be decoded with Base32hex lower".to_string()),
..Default::default()
})),
string: Some(Box::new(StringValidation {
min_length: Some(1),
..Default::default()
})),
..Default::default()
};

schema.subschemas().one_of = Some(vec![
gen.subschema_for::<schema::IpldBytesStub>(),
Schema::Object(empty_string),
Schema::Object(non_empty_string),
]);

schema.into()
Expand Down Expand Up @@ -199,4 +221,34 @@ mod test {
let ipld = Ipld::from(nonce.clone());
assert_eq!(ipld, Ipld::Bytes(b.to_vec()));
}

#[test]
fn nonce_as_string_roundtrip() {
let nonce = Nonce::generate();
let string = nonce.to_string();
let from_string = Nonce::try_from(Ipld::String(string.clone())).unwrap();

assert_eq!(nonce, from_string);
assert_eq!(string, nonce.to_string());
}

#[test]
fn json_nonce_string_roundtrip() {
let in_nnc = "1sod60ml6g26mfhsrsa0";
let json = json!({
"nnc": in_nnc
});

let ipld: Ipld = DagJsonCodec.decode(json.to_string().as_bytes()).unwrap();
let Ipld::Map(map) = ipld.clone() else {
panic!("IPLD is not a map");
};
let nnc = map.get("nnc").unwrap();
let nnc: Nonce = Nonce::try_from(nnc.clone()).unwrap();
assert_eq!(nnc.to_string(), in_nnc);
let nonce = Nonce::Nonce96(*GenericArray::from_slice(
Base32HexLower.decode(in_nnc).unwrap().as_slice(),
));
assert_eq!(nnc, nonce);
}
}
89 changes: 84 additions & 5 deletions homestar-runtime/src/network/webserver/listener.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,25 @@
//! Listener for incoming requests types.
use anyhow::anyhow;
use faststr::FastStr;
use homestar_invocation::ipld::DagJson;
use homestar_invocation::ipld::{DagCbor, DagJson};
use homestar_wasm::io::Arg;
use homestar_workflow::Workflow;
use libipld::{serde::from_ipld, Ipld};
use names::{Generator, Name};
use serde::{de, Deserialize, Deserializer, Serialize};
use serde_json::value::RawValue;
use std::collections::BTreeMap;

const NAME_KEY: &str = "name";
const WORKFLOW_KEY: &str = "workflow";

/// A [Workflow] run command via a WebSocket channel.
///
/// Note: We leverage the [RawValue] type in order to use our DagJson
/// implementation, which is not a direct [Deserialize] implementation.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub(crate) struct Run<'a> {
pub(crate) struct JsonRun<'a> {
#[serde(default = "default_name")]
pub(crate) name: FastStr,
#[serde(deserialize_with = "from_raw_value")]
Expand All @@ -36,6 +42,60 @@ where
Workflow::from_json(raw_value.get().as_bytes()).map_err(de::Error::custom)
}

#[derive(Debug, Clone, PartialEq, Serialize)]
pub(crate) struct CborRun<'a> {
pub(crate) name: FastStr,
pub(crate) workflow: Workflow<'a, Arg>,
}

impl<'a> From<CborRun<'a>> for Ipld {
fn from(run: CborRun<'a>) -> Self {
Ipld::Map(BTreeMap::from([
("name".into(), Ipld::String(run.name.as_str().to_string())),
("workflow".into(), run.workflow.into()),
]))
}
}

impl<'a> TryFrom<Ipld> for CborRun<'a> {
type Error = anyhow::Error;

fn try_from(ipld: Ipld) -> Result<Self, Self::Error> {
let map = from_ipld::<BTreeMap<String, Ipld>>(ipld)?;
let name: String = from_ipld(
map.get(NAME_KEY)
.ok_or_else(|| anyhow!("missing {NAME_KEY}"))?
.to_owned(),
)?;
let workflow = Workflow::try_from(
map.get(WORKFLOW_KEY)
.ok_or_else(|| anyhow!("missing {WORKFLOW_KEY}"))?
.to_owned(),
)?;
Ok(CborRun {
name: FastStr::from(name),
workflow,
})
}
}

impl DagCbor for CborRun<'_> {}
impl DagJson for CborRun<'_> {}

impl<'a, 'de> Deserialize<'de> for CborRun<'a> {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
//let value: Ipld = Deserialize::deserialize(deserializer)?;
//let run = CborRun::try_from(value).map_err(de::Error::custom)?;
let value = Vec::<u8>::deserialize(deserializer)?;
let ipld: Ipld = serde_ipld_dagcbor::from_slice(&value).map_err(de::Error::custom)?;
let run = CborRun::try_from(ipld).map_err(de::Error::custom)?;
Ok(run)
}
}

/// Filter metrics by prefix.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub(crate) struct MetricsPrefix {
Expand All @@ -50,7 +110,7 @@ mod test {
task::{instruction::RunInstruction, Resources},
test_utils, Task,
};
use std::assert_eq;
use std::{fs, path::PathBuf};

#[test]
fn run_json() {
Expand All @@ -70,7 +130,7 @@ mod test {
);

let workflow = Workflow::new(vec![task1.clone(), task2.clone()]);
let run = Run {
let run = JsonRun {
name: "test".into(),
workflow: workflow.clone(),
};
Expand All @@ -81,7 +141,26 @@ mod test {
);

let post_run = serde_json::from_str(&run_str).unwrap();

assert_eq!(run, post_run);
}

#[test]
fn write_cbor_to_file_and_read() {
let workflow_str =
fs::read_to_string("tests/fixtures/test-workflow-image-pipeline.json").unwrap();
let json: serde_json::Value = serde_json::from_str(&workflow_str).unwrap();
let json_string = serde_json::to_string(&json).unwrap();
let run_str = format!(r#"{{"name": "test","workflow": {}}}"#, json_string);
let run1: CborRun<'_> = DagJson::from_json_string(run_str).unwrap();

let path = PathBuf::from("./fixtures/test.cbor");
assert!(run1
.clone()
.to_cbor_file(path.display().to_string())
.is_ok());

let cbor_file = fs::read(path).unwrap();
let run2: CborRun<'_> = DagCbor::from_cbor(&cbor_file).unwrap();
assert_eq!(run1, run2);
}
}
Loading

0 comments on commit 0c0306a

Please sign in to comment.