diff --git a/.gitignore b/.gitignore index 5cf5bbbb..1456dd0e 100644 --- a/.gitignore +++ b/.gitignore @@ -21,10 +21,10 @@ private .DS_Store homestar-functions/**/out/*.png homestar-wasm/out -homestar-invocation/test_* -homestar-runtime/fixtures/test_* +homestar-invocation/test* +homestar-runtime/fixtures/test* homestar-runtime/tests/fixtures/*.toml -homestar-workflow/fixtures/test_* +homestar-workflow/fixtures/test* .zed result-alejandra report.json diff --git a/homestar-invocation/src/error.rs b/homestar-invocation/src/error.rs index 2da82469..5f8a1eb6 100644 --- a/homestar-invocation/src/error.rs +++ b/homestar-invocation/src/error.rs @@ -33,6 +33,9 @@ pub enum Error { /// `Display` methods through to an underlying error. #[error("cannot convert from Ipld structure: {0}")] FromIpld(#[from] libipld::error::SerdeError), + /// Error with a [libipld::multibase] encoding/decoding. + #[error("failed to decode/encode structure: {0}")] + FromMultibase(#[from] libipld::multibase::Error), /// Invalid match discriminant or enumeration. #[error("invalid discriminant {0:#?}")] InvalidDiscriminant(T), diff --git a/homestar-invocation/src/ipld/dag_cbor.rs b/homestar-invocation/src/ipld/dag_cbor.rs index 086bcbbd..b87218b8 100644 --- a/homestar-invocation/src/ipld/dag_cbor.rs +++ b/homestar-invocation/src/ipld/dag_cbor.rs @@ -3,10 +3,15 @@ use crate::{consts::DAG_CBOR, Error, Unit}; use libipld::{ cbor::DagCborCodec, + json::DagJsonCodec, multihash::{Code, MultihashDigest}, - prelude::Codec, + prelude::{Codec, Decode}, Cid, Ipld, }; +use std::{ + fs, + io::{Cursor, Write}, +}; /// Trait for DagCbor-related encode/decode. pub trait DagCbor @@ -21,6 +26,45 @@ where let hash = Code::Sha3_256.digest(&bytes); Ok(Cid::new_v1(DAG_CBOR, hash)) } + + /// Serialize `Self` to JSON bytes. + fn to_dag_json(self) -> Result, Error> { + let ipld: Ipld = self.into(); + Ok(DagJsonCodec.encode(&ipld)?) + } + + /// Serialize `Self` to JSON [String]. + fn to_dagjson_string(self) -> Result> { + let encoded = self.to_dag_json()?; + // JSON spec requires UTF-8 support + let s = std::str::from_utf8(&encoded)?; + Ok(s.to_string()) + } + + /// Serialize `Self` to CBOR bytes. + fn to_cbor(self) -> Result, Error> { + let ipld: Ipld = self.into(); + Ok(DagCborCodec.encode(&ipld)?) + } + + /// Deserialize `Self` from CBOR bytes. + fn from_cbor(data: &[u8]) -> Result> + where + Self: TryFrom, + { + let ipld = Ipld::decode(DagCborCodec, &mut Cursor::new(data))?; + let from_ipld = Self::try_from(ipld).map_err(|_err| { + Error::::UnexpectedIpldType(Ipld::String( + "Failed to convert Ipld to expected type".to_string(), + )) + })?; + Ok(from_ipld) + } + + /// Serialize `Self` to a CBOR file. + fn to_cbor_file(self, filename: String) -> Result<(), Error> { + Ok(fs::File::create(filename)?.write_all(&self.to_cbor()?)?) + } } /// Trait for DagCbor-related encode/decode for references. diff --git a/homestar-invocation/src/task/instruction/nonce.rs b/homestar-invocation/src/task/instruction/nonce.rs index 46ffda07..fd5fc375 100644 --- a/homestar-invocation/src/task/instruction/nonce.rs +++ b/homestar-invocation/src/task/instruction/nonce.rs @@ -12,7 +12,7 @@ use generic_array::{ use libipld::{multibase::Base::Base32HexLower, Ipld}; use schemars::{ gen::SchemaGenerator, - schema::{InstanceType, Metadata, Schema, SchemaObject, SingleOrVec}, + schema::{InstanceType, Metadata, Schema, SchemaObject, SingleOrVec, StringValidation}, JsonSchema, }; use serde::{Deserialize, Serialize}; @@ -75,14 +75,22 @@ impl TryFrom for Nonce { type Error = Error; fn try_from(ipld: Ipld) -> Result { - if let Ipld::Bytes(v) = ipld { - match v.len() { + match ipld { + Ipld::String(s) if s.is_empty() => Ok(Nonce::Empty), + Ipld::String(s) => { + let bytes = Base32HexLower.decode(s)?; + match bytes.len() { + 12 => Ok(Nonce::Nonce96(*GenericArray::from_slice(&bytes))), + 16 => Ok(Nonce::Nonce128(*GenericArray::from_slice(&bytes))), + other => Err(Error::unexpected_ipld(other.to_owned().into())), + } + } + Ipld::Bytes(v) => match v.len() { 12 => Ok(Nonce::Nonce96(*GenericArray::from_slice(&v))), 16 => Ok(Nonce::Nonce128(*GenericArray::from_slice(&v))), other_ipld => Err(Error::unexpected_ipld(other_ipld.to_owned().into())), - } - } else { - Ok(Nonce::Empty) + }, + _ => Ok(Nonce::Empty), } } } @@ -122,9 +130,23 @@ impl JsonSchema for Nonce { ..Default::default() }; + let non_empty_string = SchemaObject { + instance_type: Some(SingleOrVec::Single(InstanceType::String.into())), + metadata: Some(Box::new(Metadata { + description: Some("A 12-byte or 16-byte nonce encoded as a string, which expects to be decoded with Base32hex lower".to_string()), + ..Default::default() + })), + string: Some(Box::new(StringValidation { + min_length: Some(1), + ..Default::default() + })), + ..Default::default() + }; + schema.subschemas().one_of = Some(vec![ gen.subschema_for::(), Schema::Object(empty_string), + Schema::Object(non_empty_string), ]); schema.into() @@ -199,4 +221,34 @@ mod test { let ipld = Ipld::from(nonce.clone()); assert_eq!(ipld, Ipld::Bytes(b.to_vec())); } + + #[test] + fn nonce_as_string_roundtrip() { + let nonce = Nonce::generate(); + let string = nonce.to_string(); + let from_string = Nonce::try_from(Ipld::String(string.clone())).unwrap(); + + assert_eq!(nonce, from_string); + assert_eq!(string, nonce.to_string()); + } + + #[test] + fn json_nonce_string_roundtrip() { + let in_nnc = "1sod60ml6g26mfhsrsa0"; + let json = json!({ + "nnc": in_nnc + }); + + let ipld: Ipld = DagJsonCodec.decode(json.to_string().as_bytes()).unwrap(); + let Ipld::Map(map) = ipld.clone() else { + panic!("IPLD is not a map"); + }; + let nnc = map.get("nnc").unwrap(); + let nnc: Nonce = Nonce::try_from(nnc.clone()).unwrap(); + assert_eq!(nnc.to_string(), in_nnc); + let nonce = Nonce::Nonce96(*GenericArray::from_slice( + Base32HexLower.decode(in_nnc).unwrap().as_slice(), + )); + assert_eq!(nnc, nonce); + } } diff --git a/homestar-runtime/src/network/webserver/listener.rs b/homestar-runtime/src/network/webserver/listener.rs index c49d2f6a..bfd457de 100644 --- a/homestar-runtime/src/network/webserver/listener.rs +++ b/homestar-runtime/src/network/webserver/listener.rs @@ -1,19 +1,25 @@ //! Listener for incoming requests types. +use anyhow::anyhow; use faststr::FastStr; -use homestar_invocation::ipld::DagJson; +use homestar_invocation::ipld::{DagCbor, DagJson}; use homestar_wasm::io::Arg; use homestar_workflow::Workflow; +use libipld::{serde::from_ipld, Ipld}; use names::{Generator, Name}; use serde::{de, Deserialize, Deserializer, Serialize}; use serde_json::value::RawValue; +use std::collections::BTreeMap; + +const NAME_KEY: &str = "name"; +const WORKFLOW_KEY: &str = "workflow"; /// A [Workflow] run command via a WebSocket channel. /// /// Note: We leverage the [RawValue] type in order to use our DagJson /// implementation, which is not a direct [Deserialize] implementation. #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] -pub(crate) struct Run<'a> { +pub(crate) struct JsonRun<'a> { #[serde(default = "default_name")] pub(crate) name: FastStr, #[serde(deserialize_with = "from_raw_value")] @@ -36,6 +42,60 @@ where Workflow::from_json(raw_value.get().as_bytes()).map_err(de::Error::custom) } +#[derive(Debug, Clone, PartialEq, Serialize)] +pub(crate) struct CborRun<'a> { + pub(crate) name: FastStr, + pub(crate) workflow: Workflow<'a, Arg>, +} + +impl<'a> From> for Ipld { + fn from(run: CborRun<'a>) -> Self { + Ipld::Map(BTreeMap::from([ + ("name".into(), Ipld::String(run.name.as_str().to_string())), + ("workflow".into(), run.workflow.into()), + ])) + } +} + +impl<'a> TryFrom for CborRun<'a> { + type Error = anyhow::Error; + + fn try_from(ipld: Ipld) -> Result { + let map = from_ipld::>(ipld)?; + let name: String = from_ipld( + map.get(NAME_KEY) + .ok_or_else(|| anyhow!("missing {NAME_KEY}"))? + .to_owned(), + )?; + let workflow = Workflow::try_from( + map.get(WORKFLOW_KEY) + .ok_or_else(|| anyhow!("missing {WORKFLOW_KEY}"))? + .to_owned(), + )?; + Ok(CborRun { + name: FastStr::from(name), + workflow, + }) + } +} + +impl DagCbor for CborRun<'_> {} +impl DagJson for CborRun<'_> {} + +impl<'a, 'de> Deserialize<'de> for CborRun<'a> { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + //let value: Ipld = Deserialize::deserialize(deserializer)?; + //let run = CborRun::try_from(value).map_err(de::Error::custom)?; + let value = Vec::::deserialize(deserializer)?; + let ipld: Ipld = serde_ipld_dagcbor::from_slice(&value).map_err(de::Error::custom)?; + let run = CborRun::try_from(ipld).map_err(de::Error::custom)?; + Ok(run) + } +} + /// Filter metrics by prefix. #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub(crate) struct MetricsPrefix { @@ -50,7 +110,7 @@ mod test { task::{instruction::RunInstruction, Resources}, test_utils, Task, }; - use std::assert_eq; + use std::{fs, path::PathBuf}; #[test] fn run_json() { @@ -70,7 +130,7 @@ mod test { ); let workflow = Workflow::new(vec![task1.clone(), task2.clone()]); - let run = Run { + let run = JsonRun { name: "test".into(), workflow: workflow.clone(), }; @@ -81,7 +141,26 @@ mod test { ); let post_run = serde_json::from_str(&run_str).unwrap(); - assert_eq!(run, post_run); } + + #[test] + fn write_cbor_to_file_and_read() { + let workflow_str = + fs::read_to_string("tests/fixtures/test-workflow-image-pipeline.json").unwrap(); + let json: serde_json::Value = serde_json::from_str(&workflow_str).unwrap(); + let json_string = serde_json::to_string(&json).unwrap(); + let run_str = format!(r#"{{"name": "test","workflow": {}}}"#, json_string); + let run1: CborRun<'_> = DagJson::from_json_string(run_str).unwrap(); + + let path = PathBuf::from("./fixtures/test.cbor"); + assert!(run1 + .clone() + .to_cbor_file(path.display().to_string()) + .is_ok()); + + let cbor_file = fs::read(path).unwrap(); + let run2: CborRun<'_> = DagCbor::from_cbor(&cbor_file).unwrap(); + assert_eq!(run1, run2); + } } diff --git a/homestar-runtime/src/network/webserver/rpc.rs b/homestar-runtime/src/network/webserver/rpc.rs index 4cdc1512..7b1436e2 100644 --- a/homestar-runtime/src/network/webserver/rpc.rs +++ b/homestar-runtime/src/network/webserver/rpc.rs @@ -5,7 +5,7 @@ use super::notifier::{self, Header, Notifier, SubscriptionTyp}; #[allow(unused_imports)] use super::{listener, prom::PrometheusData, Message}; #[cfg(feature = "websocket-notify")] -use crate::channel::AsyncChannel; +use crate::channel::{AsyncChannel, AsyncChannelReceiver}; use crate::{ db::Database, runner::{NodeInfo, WsSender}, @@ -21,12 +21,19 @@ use faststr::FastStr; use futures::StreamExt; #[cfg(feature = "websocket-notify")] use homestar_invocation::ipld::DagCbor; +#[cfg(feature = "websocket-notify")] +use homestar_wasm::io::Arg; +#[cfg(feature = "websocket-notify")] +use homestar_workflow::Workflow; use jsonrpsee::{ server::RpcModule, types::error::{ErrorCode, ErrorObject}, }; #[cfg(feature = "websocket-notify")] -use jsonrpsee::{types::SubscriptionId, SendTimeoutError, SubscriptionMessage, SubscriptionSink}; +use jsonrpsee::{ + types::SubscriptionId, PendingSubscriptionSink, SendTimeoutError, SubscriptionMessage, + SubscriptionSink, +}; #[cfg(feature = "websocket-notify")] use libipld::Cid; use metrics_exporter_prometheus::PrometheusHandle; @@ -238,8 +245,8 @@ where SUBSCRIBE_RUN_WORKFLOW_ENDPOINT, UNSUBSCRIBE_RUN_WORKFLOW_ENDPOINT, |params, pending, ctx| async move { - match params.one::>() { - Ok(listener::Run { name, workflow }) => { + match params.one::>() { + Ok(listener::JsonRun { name, workflow }) => { let (tx, rx) = AsyncChannel::oneshot(); ctx.runner_sender .send_async(( @@ -248,36 +255,29 @@ where )) .await?; - if let Ok(Message::AckWorkflow((cid, name))) = rx.recv_async().await { - let sink = pending.accept().await?; - ctx.workflow_listeners - .insert(sink.subscription_id(), (cid, name)); - let rx = ctx.workflow_msg_notifier.inner().subscribe(); - let stream = BroadcastStream::new(rx); - Self::handle_workflow_subscription(sink, stream, ctx).await?; - } else { - error!( - subject = "subscription.workflow.err", - category = "jsonrpc.subscription", - sub = SUBSCRIBE_RUN_WORKFLOW_ENDPOINT, - workflow_name = name.to_string(), - "did not acknowledge message in time" - ); - let _ = pending - .reject(busy_err(format!( - "not able to run workflow {}", - workflow.to_cid()? - ))) - .await; - } + Self::handle_run_workflow(name, workflow, rx, ctx, pending).await?; } - Err(err) => { - warn!(subject = "subscription.workflow.err", + + Err(_err) => match params.one::>() { + Ok(listener::CborRun { name, workflow }) => { + let (tx, rx) = AsyncChannel::oneshot(); + ctx.runner_sender + .send_async(( + Message::RunWorkflow((name.clone(), workflow.clone())), + Some(tx), + )) + .await?; + + Self::handle_run_workflow(name, workflow, rx, ctx, pending).await?; + } + Err(err) => { + warn!(subject = "subscription.workflow.err", category = "jsonrpc.subscription", err=?err, "failed to parse run workflow params"); - let _ = pending.reject(err).await; - } + let _ = pending.reject(err).await; + } + }, } Ok(()) }, @@ -286,6 +286,40 @@ where Ok(module) } + #[cfg(feature = "websocket-notify")] + async fn handle_run_workflow( + name: FastStr, + workflow: Workflow<'_, Arg>, + rx: AsyncChannelReceiver, + ctx: Arc>, + pending: PendingSubscriptionSink, + ) -> Result<()> { + if let Ok(Message::AckWorkflow((cid, name))) = rx.recv_async().await { + let sink = pending.accept().await?; + ctx.workflow_listeners + .insert(sink.subscription_id(), (cid, name)); + let rx = ctx.workflow_msg_notifier.inner().subscribe(); + let stream = BroadcastStream::new(rx); + Self::handle_workflow_subscription(sink, stream, ctx).await?; + } else { + error!( + subject = "subscription.workflow.err", + category = "jsonrpc.subscription", + sub = SUBSCRIBE_RUN_WORKFLOW_ENDPOINT, + workflow_name = name.to_string(), + "did not acknowledge message in time" + ); + let _ = pending + .reject(busy_err(format!( + "not able to run workflow {}", + workflow.to_cid()? + ))) + .await; + } + + Ok(()) + } + #[cfg(feature = "websocket-notify")] async fn handle_event_subscription( sink: SubscriptionSink, diff --git a/homestar-runtime/src/tasks/fetch.rs b/homestar-runtime/src/tasks/fetch.rs index 5df0245c..c06915cb 100644 --- a/homestar-runtime/src/tasks/fetch.rs +++ b/homestar-runtime/src/tasks/fetch.rs @@ -11,6 +11,9 @@ use fnv::FnvHashSet; use indexmap::IndexMap; use std::sync::Arc; +/// Fetch module for gathering data over the network related to [Task]. +/// +/// [Task]: homestar_invocation::Task pub(crate) struct Fetch; #[cfg(any(test, feature = "test-utils"))] diff --git a/homestar-runtime/src/workflow/info.rs b/homestar-runtime/src/workflow/info.rs index 7cbd53fc..fe7b4653 100644 --- a/homestar-runtime/src/workflow/info.rs +++ b/homestar-runtime/src/workflow/info.rs @@ -235,6 +235,23 @@ impl Info { } } + /// Get workflow progress as a vector of Cids. + pub fn progress(&self) -> &Vec { + &self.progress + } + + /// Get workflow progress as a number of receipts completed. + pub fn progress_count(&self) -> u32 { + self.progress_count + } + + /// Get the number of tasks in the [Workflow]. + /// + /// [Workflow]: homestar_workflow::Workflow + pub fn num_tasks(&self) -> u32 { + self.num_tasks + } + /// Get unique identifier, Cid, of [Workflow]. /// /// [Workflow]: homestar_workflow::Workflow diff --git a/homestar-runtime/tests/cli.rs b/homestar-runtime/tests/cli.rs index 55fd5b7d..345a9f00 100644 --- a/homestar-runtime/tests/cli.rs +++ b/homestar-runtime/tests/cli.rs @@ -1,5 +1,7 @@ #[cfg(not(windows))] use crate::utils::kill_homestar_daemon; +#[cfg(feature = "test-utils")] +use crate::utils::wait_for_asserts; use crate::{ make_config, utils::{ @@ -9,8 +11,14 @@ use crate::{ }; use anyhow::Result; use assert_cmd::prelude::*; +#[cfg(feature = "test-utils")] +use homestar_runtime::{db::Database, Db, Settings}; +#[cfg(feature = "test-utils")] +use libipld::Cid; use once_cell::sync::Lazy; use predicates::prelude::*; +#[cfg(feature = "test-utils")] +use std::str::FromStr; use std::{ path::PathBuf, process::{Command, Stdio}, @@ -254,6 +262,72 @@ fn test_workflow_run_integration() -> Result<()> { Ok(()) } +#[test] +#[serial_test::parallel] +#[cfg(feature = "test-utils")] +fn test_workflow_run_integration_nonced() -> Result<()> { + let proc_info = ProcInfo::new().unwrap(); + let rpc_port = proc_info.rpc_port; + let metrics_port = proc_info.metrics_port; + let ws_port = proc_info.ws_port; + let workflow_cid = "bafyrmicbtl7g4zrbarazdbjnk2gxxbbzin3iaf6y2zs6va5auqiyhu5m2e"; + let toml = format!( + r#" + [node] + [node.network.libp2p.mdns] + enable = false + [node.network.metrics] + port = {metrics_port} + [node.network.rpc] + port = {rpc_port} + [node.network.webserver] + port = {ws_port} + "# + ); + let config = make_config!(toml); + + let homestar_proc = Command::new(BIN.as_os_str()) + .arg("start") + .arg("-c") + .arg(config.filename()) + .arg("--db") + .arg(&proc_info.db_path) + .stdout(Stdio::piped()) + .spawn() + .unwrap(); + let _proc_guard = ChildGuard::new(homestar_proc); + + if wait_for_socket_connection_v6(rpc_port, 1000).is_err() { + panic!("Homestar server/runtime failed to start in time"); + } + + Command::new(BIN.as_os_str()) + .arg("run") + .arg("-p") + .arg(rpc_port.to_string()) + .arg("tests/fixtures/test-workflow-add-one-nonced.json") + .assert() + .success(); + + let settings = Settings::load_from_file(PathBuf::from(config.filename())).unwrap(); + let cid = Cid::from_str(workflow_cid).unwrap(); + let db = Db::setup_connection_pool( + settings.node(), + Some(proc_info.db_path.display().to_string()), + ) + .expect("Failed to connect to node two database"); + + wait_for_asserts(500, || { + let (name, info) = Db::get_workflow_info(cid, &mut db.conn().unwrap()).unwrap(); + name.unwrap().as_str() == workflow_cid + && info.progress().len() == 2 + && info.progress_count() == 2 + }) + .unwrap(); + + Ok(()) +} + #[test] #[serial_test::parallel] #[cfg(not(windows))] diff --git a/homestar-runtime/tests/fixtures/test-workflow-add-one-nonced.json b/homestar-runtime/tests/fixtures/test-workflow-add-one-nonced.json new file mode 100644 index 00000000..90e392b9 --- /dev/null +++ b/homestar-runtime/tests/fixtures/test-workflow-add-one-nonced.json @@ -0,0 +1,46 @@ +{ + "tasks": [ + { + "cause": null, + "meta": { + "fuel": 18446744073709552000, + "memory": 4294967296, + "time": 100000 + }, + "prf": [], + "run": { + "input": { + "args": [1], + "func": "add_one" + }, + "nnc": "1sod60ml6g26mfhsrsa0", + "op": "wasm/run", + "rsc": "ipfs://bafybeia32q3oy6u47x624rmsmgrrlpn7ulruissmz5z2ap6alv7goe7h3q" + } + }, + { + "cause": null, + "meta": { + "fuel": 18446744073709552000, + "memory": 4294967296, + "time": 100000 + }, + "prf": [], + "run": { + "input": { + "args": [ + { + "await/ok": { + "/": "bafyrmie3zzu3mhtyjghgqvskoc7maiiablp2htexigs2xr3lvgt6csslly" + } + } + ], + "func": "add_one" + }, + "nnc": "4ja3jhlhs3b9rk3app40", + "op": "wasm/run", + "rsc": "ipfs://bafybeia32q3oy6u47x624rmsmgrrlpn7ulruissmz5z2ap6alv7goe7h3q" + } + } + ] +} diff --git a/homestar-runtime/tests/fixtures/test-workflow-image-pipeline.cbor b/homestar-runtime/tests/fixtures/test-workflow-image-pipeline.cbor new file mode 100644 index 00000000..551628d2 Binary files /dev/null and b/homestar-runtime/tests/fixtures/test-workflow-image-pipeline.cbor differ diff --git a/homestar-runtime/tests/utils.rs b/homestar-runtime/tests/utils.rs index fb79cf20..787852db 100644 --- a/homestar-runtime/tests/utils.rs +++ b/homestar-runtime/tests/utils.rs @@ -364,7 +364,7 @@ impl Drop for ProcInfo { } } -/// Wait for socket connection or timeout +/// Wait for socket connection or timeout. pub(crate) fn wait_for_socket_connection(port: u16, exp_retry_base: u64) -> Result<(), ()> { let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port); let result = retry(Exponential::from_millis(exp_retry_base).take(10), || { @@ -374,7 +374,7 @@ pub(crate) fn wait_for_socket_connection(port: u16, exp_retry_base: u64) -> Resu result.map_or_else(|_| Err(()), |_| Ok(())) } -/// Wait for socket connection or timeout (ipv6) +/// Wait for socket connection or timeout (ipv6). pub(crate) fn wait_for_socket_connection_v6(port: u16, exp_retry_base: u64) -> Result<(), ()> { let socket = SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), port); let result = retry(Exponential::from_millis(exp_retry_base).take(10), || { @@ -384,6 +384,23 @@ pub(crate) fn wait_for_socket_connection_v6(port: u16, exp_retry_base: u64) -> R result.map_or_else(|_| Err(()), |_| Ok(())) } +/// Wait for asserts to pass or timeout expires. +#[allow(dead_code)] +pub(crate) fn wait_for_asserts( + exp_retry_base: u64, + assertion: impl Fn() -> bool, +) -> Result<(), ()> { + let result = retry(Exponential::from_millis(exp_retry_base).take(10), || { + if assertion() { + Ok(()) + } else { + Err(()) + } + }); + + result.map_or_else(|_| Err(()), |_| Ok(())) +} + /// Client and subscription. #[cfg(feature = "websocket-notify")] pub(crate) struct WsClientSub { diff --git a/homestar-runtime/tests/webserver.rs b/homestar-runtime/tests/webserver.rs index c9237555..fc07a160 100644 --- a/homestar-runtime/tests/webserver.rs +++ b/homestar-runtime/tests/webserver.rs @@ -90,7 +90,7 @@ fn test_workflow_run_integration() -> Result<()> { // we have 3 operations let mut received_cids = 0; loop { - if let Ok(msg) = sub1.next().with_timeout(Duration::from_secs(30)).await { + if let Ok(msg) = sub1.next().with_timeout(Duration::from_secs(45)).await { let json: serde_json::Value = serde_json::from_slice(&msg.unwrap().unwrap()).unwrap(); let check = json.get("metadata").unwrap(); @@ -118,7 +118,7 @@ fn test_workflow_run_integration() -> Result<()> { .unwrap(); loop { - if let Ok(msg) = sub2.next().with_timeout(Duration::from_secs(30)).await { + if let Ok(msg) = sub2.next().with_timeout(Duration::from_secs(45)).await { let json: serde_json::Value = serde_json::from_slice(&msg.unwrap().unwrap()).unwrap(); let check = json.get("metadata").unwrap(); @@ -135,7 +135,10 @@ fn test_workflow_run_integration() -> Result<()> { } } - let client2 = WsClientBuilder::default().build(ws_url).await.unwrap(); + let client2 = WsClientBuilder::default() + .build(ws_url.clone()) + .await + .unwrap(); let mut sub3: Subscription> = client2 .subscribe( SUBSCRIBE_RUN_WORKFLOW_ENDPOINT, @@ -152,7 +155,7 @@ fn test_workflow_run_integration() -> Result<()> { .is_err(); loop { - if let Ok(msg) = sub3.next().with_timeout(Duration::from_secs(30)).await { + if let Ok(msg) = sub3.next().with_timeout(Duration::from_secs(45)).await { let json: serde_json::Value = serde_json::from_slice(&msg.unwrap().unwrap()).unwrap(); let check = json.get("metadata").unwrap(); @@ -187,7 +190,7 @@ fn test_workflow_run_integration() -> Result<()> { .unwrap(); loop { - if let Ok(msg) = sub4.next().with_timeout(Duration::from_secs(30)).await { + if let Ok(msg) = sub4.next().with_timeout(Duration::from_secs(45)).await { let json: serde_json::Value = serde_json::from_slice(&msg.unwrap().unwrap()).unwrap(); let check = json.get("metadata").unwrap(); @@ -202,6 +205,12 @@ fn test_workflow_run_integration() -> Result<()> { break; } } + + let _ = sub4 + .next() + .with_timeout(Duration::from_secs(10)) + .await + .is_err(); }); // Collect logs then kill proceses. @@ -221,3 +230,88 @@ fn test_workflow_run_integration() -> Result<()> { Ok(()) } + +#[test] +#[serial_test::parallel] +fn test_workflow_run_integration_cbor() -> Result<()> { + let proc_info = ProcInfo::new().unwrap(); + let rpc_port = proc_info.rpc_port; + let metrics_port = proc_info.metrics_port; + let ws_port = proc_info.ws_port; + let toml = format!( + r#" + [node] + [node.network.libp2p.mdns] + enable = false + [node.network.metrics] + port = {metrics_port} + [node.network.rpc] + port = {rpc_port} + [node.network.webserver] + port = {ws_port} + "# + ); + + let config = make_config!(toml); + let homestar_proc = Command::new(BIN.as_os_str()) + .env("RUST_BACKTRACE", "0") + .arg("start") + .arg("-c") + .arg(config.filename()) + .arg("--db") + .arg(&proc_info.db_path) + .stdout(Stdio::piped()) + .spawn() + .unwrap(); + let _proc_guard = ChildGuard::new(homestar_proc); + + if wait_for_socket_connection(ws_port, 1000).is_err() { + panic!("Homestar server/runtime failed to start in time"); + } + + let ws_url = format!("ws://{}:{}", Ipv4Addr::LOCALHOST, ws_port); + + tokio_test::block_on(async { + let run_cbor = fs::read("tests/fixtures/test-workflow-image-pipeline.cbor").unwrap(); + let client = WsClientBuilder::default() + .build(ws_url.clone()) + .await + .unwrap(); + + let mut sub: Subscription> = client + .subscribe( + SUBSCRIBE_RUN_WORKFLOW_ENDPOINT, + rpc_params![run_cbor], + UNSUBSCRIBE_RUN_WORKFLOW_ENDPOINT, + ) + .await + .unwrap(); + + // we have 3 operations + let mut received_cids = 0; + loop { + if let Ok(msg) = sub.next().with_timeout(Duration::from_secs(45)).await { + let json: serde_json::Value = + serde_json::from_slice(&msg.unwrap().unwrap()).unwrap(); + let check = json.get("metadata").unwrap(); + let expected = serde_json::json!({"name": "test", "replayed": false, "workflow": {"/": format!("{AWAIT_CID}")}}); + assert_eq!(check, &expected); + received_cids += 1; + } else { + panic!("Node one did not publish receipt in time.") + } + + if received_cids == 3 { + break; + } + } + + let _ = sub + .next() + .with_timeout(Duration::from_secs(10)) + .await + .is_err(); + }); + + Ok(()) +} diff --git a/homestar-workflow/src/workflow.rs b/homestar-workflow/src/workflow.rs index 7168ba4c..0bd6c672 100644 --- a/homestar-workflow/src/workflow.rs +++ b/homestar-workflow/src/workflow.rs @@ -157,6 +157,34 @@ mod test { assert_eq!(workflow, wf_from_json2); } + #[test] + fn workflow_to_cbor_to_json_roundtrip() { + let config = Resources::default(); + let instruction1 = test_utils::instruction::(); + let (instruction2, _) = test_utils::wasm_instruction_with_nonce::(); + + let task1 = Task::new( + RunInstruction::Expanded(instruction1), + config.clone().into(), + UcanPrf::default(), + ); + let task2 = Task::new( + RunInstruction::Expanded(instruction2), + config.into(), + UcanPrf::default(), + ); + + let workflow = Workflow::new(vec![task1.clone(), task2.clone()]); + let cbor_bytes = workflow.clone().to_cbor().unwrap(); + let workflow_from_cbor = Workflow::::from_cbor(&cbor_bytes).unwrap(); + assert_eq!(workflow, workflow_from_cbor); + + let json_from_cbor_string = workflow_from_cbor.clone().to_dagjson_string().unwrap(); + let json_string = workflow.to_json_string().unwrap(); + + assert_eq!(json_from_cbor_string, json_string); + } + #[test] fn ipld_roundtrip_workflow() { let config = Resources::default();