diff --git a/binaries/cli/src/attach.rs b/binaries/cli/src/attach.rs index 0ee9e57b7..62745e149 100644 --- a/binaries/cli/src/attach.rs +++ b/binaries/cli/src/attach.rs @@ -23,7 +23,7 @@ pub fn attach_dataflow( // Generate path hashmap let mut node_path_lookup = HashMap::new(); - let nodes = dataflow.resolve_aliases_and_set_defaults(); + let nodes = dataflow.resolve_aliases_and_set_defaults()?; let working_dir = dataflow_path .canonicalize() diff --git a/binaries/cli/src/build.rs b/binaries/cli/src/build.rs index b235f3dda..496402a81 100644 --- a/binaries/cli/src/build.rs +++ b/binaries/cli/src/build.rs @@ -16,8 +16,13 @@ pub fn build(dataflow: &Path) -> eyre::Result<()> { let default_op_id = OperatorId::from(SINGLE_OPERATOR_DEFAULT_ID.to_string()); - for node in &descriptor.nodes { - match &node.kind { + for node in descriptor.nodes { + match node.kind()? { + dora_core::descriptor::NodeKind::Standard(_) => { + run_build_command(node.build.as_deref(), working_dir).with_context(|| { + format!("build command failed for standard node `{}`", node.id) + })? + } dora_core::descriptor::NodeKind::Runtime(runtime_node) => { for operator in &runtime_node.operators { run_build_command(operator.config.build.as_deref(), working_dir).with_context( diff --git a/binaries/coordinator/src/run/mod.rs b/binaries/coordinator/src/run/mod.rs index 0d8301841..f37613581 100644 --- a/binaries/coordinator/src/run/mod.rs +++ b/binaries/coordinator/src/run/mod.rs @@ -26,7 +26,7 @@ pub(super) async fn spawn_dataflow( ) -> eyre::Result { dataflow.check(&working_dir)?; - let nodes = dataflow.resolve_aliases_and_set_defaults(); + let nodes = dataflow.resolve_aliases_and_set_defaults()?; let uuid = Uuid::new_v7(Timestamp::now(NoContext)); let machines: BTreeSet<_> = nodes.iter().map(|n| n.deploy.machine.clone()).collect(); diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index aa4e32794..8b85ee8e6 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -132,7 +132,7 @@ impl Daemon { let descriptor = Descriptor::read(dataflow_path).await?; descriptor.check(&working_dir)?; - let nodes = descriptor.resolve_aliases_and_set_defaults(); + let nodes = descriptor.resolve_aliases_and_set_defaults()?; let spawn_command = SpawnDataflowNodes { dataflow_id: Uuid::new_v7(Timestamp::now(NoContext)), diff --git a/binaries/daemon/src/spawn.rs b/binaries/daemon/src/spawn.rs index 0dc353dea..0455e7ee8 100644 --- a/binaries/daemon/src/spawn.rs +++ b/binaries/daemon/src/spawn.rs @@ -131,7 +131,13 @@ pub async fn spawn_node( ); // Injecting the env variable defined in the `yaml` into // the node runtime. + if let Some(envs) = node.env { + for (key, value) in envs { + command.env(key, value.to_string()); + } + } if let Some(envs) = n.envs { + // node has some inner env variables -> add them too for (key, value) in envs { command.env(key, value.to_string()); } diff --git a/examples/rust-dataflow/dataflow.yml b/examples/rust-dataflow/dataflow.yml index 7b7239d26..30c01ced1 100644 --- a/examples/rust-dataflow/dataflow.yml +++ b/examples/rust-dataflow/dataflow.yml @@ -1,31 +1,28 @@ nodes: - id: rust-node + build: cargo build -p rust-dataflow-example-node + path: ../../target/debug/rust-dataflow-example-node + inputs: + tick: dora/timer/millis/10 + outputs: + - random + - id: rust-status-node custom: - build: cargo build -p rust-dataflow-example-node - source: ../../target/debug/rust-dataflow-example-node + build: cargo build -p rust-dataflow-example-status-node + source: ../../target/debug/rust-dataflow-example-status-node inputs: - tick: dora/timer/millis/10 + tick: dora/timer/millis/100 + random: rust-node/random outputs: - - random - - id: rust-status-node - custom: - build: cargo build -p rust-dataflow-example-status-node - source: ../../target/debug/rust-dataflow-example-status-node - inputs: - tick: dora/timer/millis/100 - random: rust-node/random - outputs: - - status + - status - id: rust-sink - custom: - build: cargo build -p rust-dataflow-example-sink - source: ../../target/debug/rust-dataflow-example-sink - inputs: - message: rust-status-node/status + build: cargo build -p rust-dataflow-example-sink + path: ../../target/debug/rust-dataflow-example-sink + inputs: + message: rust-status-node/status - id: dora-record - custom: - build: cargo build -p dora-record - source: ../../target/debug/dora-record - inputs: - message: rust-status-node/status - random: rust-node/random \ No newline at end of file + build: cargo build -p dora-record + path: ../../target/debug/dora-record + inputs: + message: rust-status-node/status + random: rust-node/random diff --git a/libraries/core/dora-schema.json b/libraries/core/dora-schema.json index 476ebf2b6..84c68fd81 100644 --- a/libraries/core/dora-schema.json +++ b/libraries/core/dora-schema.json @@ -36,7 +36,7 @@ ] }, "envs": { - "description": "Environment variables for the custom nodes", + "description": "Environment variables for the custom nodes\n\nDeprecated, use outer-level `env` field instead.", "type": [ "object", "null" @@ -46,13 +46,13 @@ } }, "inputs": { - "description": "Inputs for the nodes as a map from input ID to `/`.\n\ne.g.\n\ninputs:\n\nexample_input: example_node/example_output1", + "description": "Inputs for the nodes as a map from input ID to `node_id/output_id`.\n\ne.g.\n\ninputs:\n\nexample_input: example_node/example_output1", "default": {}, "type": "object", "additionalProperties": true }, "outputs": { - "description": "Outputs as a list of outputs.\n\ne.g.\n\noutputs:\n\n- output_1\n\n- output_2", + "description": "List of output IDs.\n\ne.g.\n\noutputs:\n\n- output_1\n\n- output_2", "default": [], "type": "array", "items": { @@ -169,52 +169,32 @@ "Node": { "description": "Dora Node", "type": "object", - "oneOf": [ - { - "description": "Dora runtime node", - "type": "object", - "required": [ - "operators" - ], - "properties": { - "operators": { - "type": "array", - "items": { - "$ref": "#/definitions/OperatorDefinition" - } - } - }, - "additionalProperties": true - }, - { - "type": "object", - "required": [ - "custom" - ], - "properties": { - "custom": { - "$ref": "#/definitions/CustomNode" - } - }, - "additionalProperties": true - }, - { - "type": "object", - "required": [ - "operator" - ], - "properties": { - "operator": { - "$ref": "#/definitions/SingleOperatorDefinition" - } - }, - "additionalProperties": true - } - ], "required": [ "id" ], "properties": { + "args": { + "type": [ + "string", + "null" + ] + }, + "build": { + "type": [ + "string", + "null" + ] + }, + "custom": { + "anyOf": [ + { + "$ref": "#/definitions/CustomNode" + }, + { + "type": "null" + } + ] + }, "description": { "description": "Description of the node", "type": [ @@ -240,14 +220,59 @@ } ] }, + "inputs": { + "default": {}, + "type": "object", + "additionalProperties": true + }, "name": { "description": "Node name", "type": [ "string", "null" ] + }, + "operator": { + "anyOf": [ + { + "$ref": "#/definitions/SingleOperatorDefinition" + }, + { + "type": "null" + } + ] + }, + "operators": { + "type": [ + "array", + "null" + ], + "items": { + "$ref": "#/definitions/OperatorDefinition" + } + }, + "outputs": { + "default": [], + "type": "array", + "items": { + "$ref": "#/definitions/DataId" + }, + "uniqueItems": true + }, + "path": { + "type": [ + "string", + "null" + ] + }, + "send_stdout_as": { + "type": [ + "string", + "null" + ] } - } + }, + "additionalProperties": true }, "NodeId": { "type": "string" diff --git a/libraries/core/src/descriptor/mod.rs b/libraries/core/src/descriptor/mod.rs index 0f2d0b169..fc836412a 100644 --- a/libraries/core/src/descriptor/mod.rs +++ b/libraries/core/src/descriptor/mod.rs @@ -1,7 +1,7 @@ use crate::config::{ CommunicationConfig, DataId, Input, InputMapping, NodeId, NodeRunConfig, OperatorId, }; -use eyre::{bail, eyre, Context, Result}; +use eyre::{bail, eyre, Context, OptionExt, Result}; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use serde_with_expand_env::with_expand_envs; @@ -34,29 +34,32 @@ pub struct Descriptor { pub const SINGLE_OPERATOR_DEFAULT_ID: &str = "op"; impl Descriptor { - pub fn resolve_aliases_and_set_defaults(&self) -> Vec { + pub fn resolve_aliases_and_set_defaults(&self) -> eyre::Result> { let default_op_id = OperatorId::from(SINGLE_OPERATOR_DEFAULT_ID.to_string()); let single_operator_nodes: HashMap<_, _> = self .nodes .iter() - .filter_map(|n| match &n.kind { - NodeKind::Operator(op) => Some((&n.id, op.id.as_ref().unwrap_or(&default_op_id))), - _ => None, + .filter_map(|n| { + n.operator + .as_ref() + .map(|op| (&n.id, op.id.as_ref().unwrap_or(&default_op_id))) }) .collect(); let mut resolved = vec![]; for mut node in self.nodes.clone() { // adjust input mappings - let input_mappings: Vec<_> = match &mut node.kind { - NodeKind::Runtime(node) => node + let mut node_kind = node.kind_mut()?; + let input_mappings: Vec<_> = match &mut node_kind { + NodeKindMut::Standard { path: _, inputs } => inputs.values_mut().collect(), + NodeKindMut::Runtime(node) => node .operators .iter_mut() .flat_map(|op| op.config.inputs.values_mut()) .collect(), - NodeKind::Custom(node) => node.run_config.inputs.values_mut().collect(), - NodeKind::Operator(operator) => operator.config.inputs.values_mut().collect(), + NodeKindMut::Custom(node) => node.run_config.inputs.values_mut().collect(), + NodeKindMut::Operator(operator) => operator.config.inputs.values_mut().collect(), }; for mapping in input_mappings .into_iter() @@ -71,13 +74,24 @@ impl Descriptor { } // resolve nodes - let kind = match node.kind { - NodeKind::Custom(node) => CoreNodeKind::Custom(node), - NodeKind::Runtime(node) => CoreNodeKind::Runtime(node), - NodeKind::Operator(op) => CoreNodeKind::Runtime(RuntimeNode { + let kind = match node_kind { + NodeKindMut::Standard { path, inputs: _ } => CoreNodeKind::Custom(CustomNode { + source: path.clone(), + args: node.args, + build: node.build, + send_stdout_as: node.send_stdout_as, + run_config: NodeRunConfig { + inputs: node.inputs, + outputs: node.outputs, + }, + envs: None, + }), + NodeKindMut::Custom(node) => CoreNodeKind::Custom(node.clone()), + NodeKindMut::Runtime(node) => CoreNodeKind::Runtime(node.clone()), + NodeKindMut::Operator(op) => CoreNodeKind::Runtime(RuntimeNode { operators: vec![OperatorDefinition { - id: op.id.unwrap_or_else(|| default_op_id.clone()), - config: op.config, + id: op.id.clone().unwrap_or_else(|| default_op_id.clone()), + config: op.config.clone(), }], }), }; @@ -92,11 +106,11 @@ impl Descriptor { }); } - resolved + Ok(resolved) } pub fn visualize_as_mermaid(&self) -> eyre::Result { - let resolved = self.resolve_aliases_and_set_defaults(); + let resolved = self.resolve_aliases_and_set_defaults()?; let flowchart = visualize::visualize_nodes(&resolved); Ok(flowchart) @@ -131,6 +145,7 @@ pub struct Deploy { /// Dora Node #[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] +#[serde(deny_unknown_fields)] pub struct Node { /// Node identifier pub id: NodeId, @@ -146,18 +161,97 @@ pub struct Node { #[serde(default, rename = "_unstable_deploy")] pub deploy: Deploy, - #[serde(flatten)] - pub kind: NodeKind, + #[serde(default, skip_serializing_if = "Option::is_none")] + operators: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + custom: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + operator: Option, + + #[serde(default, skip_serializing_if = "Option::is_none")] + pub path: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub args: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub build: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub send_stdout_as: Option, + #[serde(default)] + pub inputs: BTreeMap, + #[serde(default)] + pub outputs: BTreeSet, } -#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] -#[serde(rename_all = "lowercase")] -pub enum NodeKind { +impl Node { + pub fn kind(&self) -> eyre::Result { + match (&self.path, &self.operators, &self.custom, &self.operator) { + (None, None, None, None) => { + eyre::bail!( + "node `{}` requires a `path`, `custom`, or `operators` field", + self.id + ) + } + (None, None, None, Some(operator)) => Ok(NodeKind::Operator(operator)), + (None, None, Some(custom), None) => Ok(NodeKind::Custom(custom)), + (None, Some(runtime), None, None) => Ok(NodeKind::Runtime(runtime)), + (Some(path), None, None, None) => Ok(NodeKind::Standard(path)), + _ => { + eyre::bail!( + "node `{}` has multiple exclusive fields set, only one of `path`, `custom`, `operators` and `operator` is allowed", + self.id + ) + } + } + } + + fn kind_mut(&mut self) -> eyre::Result { + match self.kind()? { + NodeKind::Standard(_) => self + .path + .as_ref() + .map(|path| NodeKindMut::Standard { + path, + inputs: &mut self.inputs, + }) + .ok_or_eyre("no path"), + NodeKind::Runtime(_) => self + .operators + .as_mut() + .map(NodeKindMut::Runtime) + .ok_or_eyre("no operators"), + NodeKind::Custom(_) => self + .custom + .as_mut() + .map(NodeKindMut::Custom) + .ok_or_eyre("no custom"), + NodeKind::Operator(_) => self + .operator + .as_mut() + .map(NodeKindMut::Operator) + .ok_or_eyre("no operator"), + } + } +} + +#[derive(Debug)] +pub enum NodeKind<'a> { + Standard(&'a String), /// Dora runtime node - #[serde(rename = "operators")] - Runtime(RuntimeNode), - Custom(CustomNode), - Operator(SingleOperatorDefinition), + Runtime(&'a RuntimeNode), + Custom(&'a CustomNode), + Operator(&'a SingleOperatorDefinition), +} + +#[derive(Debug)] +enum NodeKindMut<'a> { + Standard { + path: &'a String, + inputs: &'a mut BTreeMap, + }, + /// Dora runtime node + Runtime(&'a mut RuntimeNode), + Custom(&'a mut CustomNode), + Operator(&'a mut SingleOperatorDefinition), } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -368,6 +462,8 @@ pub struct CustomNode { #[serde(default, skip_serializing_if = "Option::is_none")] pub args: Option, /// Environment variables for the custom nodes + /// + /// Deprecated, use outer-level `env` field instead. pub envs: Option>, #[serde(default, skip_serializing_if = "Option::is_none")] pub build: Option, diff --git a/libraries/core/src/descriptor/validate.rs b/libraries/core/src/descriptor/validate.rs index 5eddc5314..fe5580967 100644 --- a/libraries/core/src/descriptor/validate.rs +++ b/libraries/core/src/descriptor/validate.rs @@ -13,7 +13,7 @@ use super::{resolve_path, Descriptor, SHELL_SOURCE}; const VERSION: &str = env!("CARGO_PKG_VERSION"); pub fn check_dataflow(dataflow: &Descriptor, working_dir: &Path) -> eyre::Result<()> { - let nodes = dataflow.resolve_aliases_and_set_defaults(); + let nodes = dataflow.resolve_aliases_and_set_defaults()?; let mut has_python_operator = false; // check that nodes and operators exist