Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow top-level fields in node declaration #478

Merged
merged 5 commits into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion binaries/cli/src/attach.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub fn attach_dataflow(
// Generate path hashmap
let mut node_path_lookup = HashMap::new();

let nodes = dataflow.resolve_aliases_and_set_defaults();
let nodes = dataflow.resolve_aliases_and_set_defaults()?;

let working_dir = dataflow_path
.canonicalize()
Expand Down
9 changes: 7 additions & 2 deletions binaries/cli/src/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,13 @@ pub fn build(dataflow: &Path) -> eyre::Result<()> {

let default_op_id = OperatorId::from(SINGLE_OPERATOR_DEFAULT_ID.to_string());

for node in &descriptor.nodes {
match &node.kind {
for node in descriptor.nodes {
match node.kind()? {
dora_core::descriptor::NodeKind::Standard(_) => {
run_build_command(node.build.as_deref(), working_dir).with_context(|| {
format!("build command failed for standard node `{}`", node.id)
})?
}
dora_core::descriptor::NodeKind::Runtime(runtime_node) => {
for operator in &runtime_node.operators {
run_build_command(operator.config.build.as_deref(), working_dir).with_context(
Expand Down
2 changes: 1 addition & 1 deletion binaries/coordinator/src/run/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub(super) async fn spawn_dataflow(
) -> eyre::Result<SpawnedDataflow> {
dataflow.check(&working_dir)?;

let nodes = dataflow.resolve_aliases_and_set_defaults();
let nodes = dataflow.resolve_aliases_and_set_defaults()?;
let uuid = Uuid::new_v7(Timestamp::now(NoContext));

let machines: BTreeSet<_> = nodes.iter().map(|n| n.deploy.machine.clone()).collect();
Expand Down
2 changes: 1 addition & 1 deletion binaries/daemon/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ impl Daemon {

let descriptor = Descriptor::read(dataflow_path).await?;
descriptor.check(&working_dir)?;
let nodes = descriptor.resolve_aliases_and_set_defaults();
let nodes = descriptor.resolve_aliases_and_set_defaults()?;

let spawn_command = SpawnDataflowNodes {
dataflow_id: Uuid::new_v7(Timestamp::now(NoContext)),
Expand Down
45 changes: 21 additions & 24 deletions examples/rust-dataflow/dataflow.yml
Original file line number Diff line number Diff line change
@@ -1,31 +1,28 @@
nodes:
- id: rust-node
build: cargo build -p rust-dataflow-example-node
path: ../../target/debug/rust-dataflow-example-node
inputs:
tick: dora/timer/millis/10
outputs:
- random
- id: rust-status-node
custom:
build: cargo build -p rust-dataflow-example-node
source: ../../target/debug/rust-dataflow-example-node
build: cargo build -p rust-dataflow-example-status-node
source: ../../target/debug/rust-dataflow-example-status-node
inputs:
tick: dora/timer/millis/10
tick: dora/timer/millis/100
random: rust-node/random
outputs:
- random
- id: rust-status-node
custom:
build: cargo build -p rust-dataflow-example-status-node
source: ../../target/debug/rust-dataflow-example-status-node
inputs:
tick: dora/timer/millis/100
random: rust-node/random
outputs:
- status
- status
- id: rust-sink
custom:
build: cargo build -p rust-dataflow-example-sink
source: ../../target/debug/rust-dataflow-example-sink
inputs:
message: rust-status-node/status
build: cargo build -p rust-dataflow-example-sink
path: ../../target/debug/rust-dataflow-example-sink
inputs:
message: rust-status-node/status
- id: dora-record
custom:
build: cargo build -p dora-record
source: ../../target/debug/dora-record
inputs:
message: rust-status-node/status
random: rust-node/random
build: cargo build -p dora-record
path: ../../target/debug/dora-record
inputs:
message: rust-status-node/status
random: rust-node/random
148 changes: 122 additions & 26 deletions libraries/core/src/descriptor/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::config::{
CommunicationConfig, DataId, Input, InputMapping, NodeId, NodeRunConfig, OperatorId,
};
use eyre::{bail, eyre, Context, Result};
use eyre::{bail, eyre, Context, OptionExt, Result};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use serde_with_expand_env::with_expand_envs;
Expand Down Expand Up @@ -34,29 +34,32 @@ pub struct Descriptor {
pub const SINGLE_OPERATOR_DEFAULT_ID: &str = "op";

impl Descriptor {
pub fn resolve_aliases_and_set_defaults(&self) -> Vec<ResolvedNode> {
pub fn resolve_aliases_and_set_defaults(&self) -> eyre::Result<Vec<ResolvedNode>> {
let default_op_id = OperatorId::from(SINGLE_OPERATOR_DEFAULT_ID.to_string());

let single_operator_nodes: HashMap<_, _> = self
.nodes
.iter()
.filter_map(|n| match &n.kind {
NodeKind::Operator(op) => Some((&n.id, op.id.as_ref().unwrap_or(&default_op_id))),
_ => None,
.filter_map(|n| {
n.operator
.as_ref()
.map(|op| (&n.id, op.id.as_ref().unwrap_or(&default_op_id)))
})
.collect();

let mut resolved = vec![];
for mut node in self.nodes.clone() {
// adjust input mappings
let input_mappings: Vec<_> = match &mut node.kind {
NodeKind::Runtime(node) => node
let mut node_kind = node.kind_mut()?;
let input_mappings: Vec<_> = match &mut node_kind {
NodeKindMut::Standard { path: _, inputs } => inputs.values_mut().collect(),
NodeKindMut::Runtime(node) => node
.operators
.iter_mut()
.flat_map(|op| op.config.inputs.values_mut())
.collect(),
NodeKind::Custom(node) => node.run_config.inputs.values_mut().collect(),
NodeKind::Operator(operator) => operator.config.inputs.values_mut().collect(),
NodeKindMut::Custom(node) => node.run_config.inputs.values_mut().collect(),
NodeKindMut::Operator(operator) => operator.config.inputs.values_mut().collect(),
};
for mapping in input_mappings
.into_iter()
Expand All @@ -71,13 +74,24 @@ impl Descriptor {
}

// resolve nodes
let kind = match node.kind {
NodeKind::Custom(node) => CoreNodeKind::Custom(node),
NodeKind::Runtime(node) => CoreNodeKind::Runtime(node),
NodeKind::Operator(op) => CoreNodeKind::Runtime(RuntimeNode {
let kind = match node_kind {
NodeKindMut::Standard { path, inputs: _ } => CoreNodeKind::Custom(CustomNode {
source: path.clone(),
args: node.args,
envs: node.envs,
build: node.build,
send_stdout_as: node.send_stdout_as,
run_config: NodeRunConfig {
inputs: node.inputs,
outputs: node.outputs,
},
}),
NodeKindMut::Custom(node) => CoreNodeKind::Custom(node.clone()),
NodeKindMut::Runtime(node) => CoreNodeKind::Runtime(node.clone()),
NodeKindMut::Operator(op) => CoreNodeKind::Runtime(RuntimeNode {
operators: vec![OperatorDefinition {
id: op.id.unwrap_or_else(|| default_op_id.clone()),
config: op.config,
id: op.id.clone().unwrap_or_else(|| default_op_id.clone()),
config: op.config.clone(),
}],
}),
};
Expand All @@ -92,11 +106,11 @@ impl Descriptor {
});
}

resolved
Ok(resolved)
}

pub fn visualize_as_mermaid(&self) -> eyre::Result<String> {
let resolved = self.resolve_aliases_and_set_defaults();
let resolved = self.resolve_aliases_and_set_defaults()?;
let flowchart = visualize::visualize_nodes(&resolved);

Ok(flowchart)
Expand Down Expand Up @@ -131,6 +145,7 @@ pub struct Deploy {

/// Dora Node
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(deny_unknown_fields)]
pub struct Node {
/// Node identifier
pub id: NodeId,
Expand All @@ -146,18 +161,99 @@ pub struct Node {
#[serde(default, rename = "_unstable_deploy")]
pub deploy: Deploy,

#[serde(flatten)]
pub kind: NodeKind,
#[serde(default, skip_serializing_if = "Option::is_none")]
operators: Option<RuntimeNode>,
#[serde(default, skip_serializing_if = "Option::is_none")]
custom: Option<CustomNode>,
#[serde(default, skip_serializing_if = "Option::is_none")]
operator: Option<SingleOperatorDefinition>,

#[serde(default, skip_serializing_if = "Option::is_none")]
pub path: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub args: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub envs: Option<BTreeMap<String, EnvValue>>,
phil-opp marked this conversation as resolved.
Show resolved Hide resolved
#[serde(default, skip_serializing_if = "Option::is_none")]
pub build: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub send_stdout_as: Option<String>,
#[serde(default)]
pub inputs: BTreeMap<DataId, Input>,
#[serde(default)]
pub outputs: BTreeSet<DataId>,
}

#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "lowercase")]
pub enum NodeKind {
impl Node {
pub fn kind(&self) -> eyre::Result<NodeKind> {
match (&self.path, &self.operators, &self.custom, &self.operator) {
(None, None, None, None) => {
eyre::bail!(
"node `{}` requires a `path`, `custom`, or `operators` field",
self.id
)
}
(None, None, None, Some(operator)) => Ok(NodeKind::Operator(operator)),
(None, None, Some(custom), None) => Ok(NodeKind::Custom(custom)),
(None, Some(runtime), None, None) => Ok(NodeKind::Runtime(runtime)),
(Some(path), None, None, None) => Ok(NodeKind::Standard(path)),
_ => {
eyre::bail!(
"node `{}` has multiple exclusive fields set, only one of `path`, `custom`, `operators` and `operator` is allowed",
self.id
)
}
}
}

fn kind_mut(&mut self) -> eyre::Result<NodeKindMut> {
match self.kind()? {
NodeKind::Standard(_) => self
.path
.as_ref()
.map(|path| NodeKindMut::Standard {
path,
inputs: &mut self.inputs,
})
.ok_or_eyre("no path"),
NodeKind::Runtime(_) => self
.operators
.as_mut()
.map(NodeKindMut::Runtime)
.ok_or_eyre("no operators"),
NodeKind::Custom(_) => self
.custom
.as_mut()
.map(NodeKindMut::Custom)
.ok_or_eyre("no custom"),
NodeKind::Operator(_) => self
.operator
.as_mut()
.map(NodeKindMut::Operator)
.ok_or_eyre("no operator"),
}
}
}

#[derive(Debug)]
pub enum NodeKind<'a> {
Standard(&'a String),
/// Dora runtime node
#[serde(rename = "operators")]
Runtime(RuntimeNode),
Custom(CustomNode),
Operator(SingleOperatorDefinition),
Runtime(&'a RuntimeNode),
Custom(&'a CustomNode),
Operator(&'a SingleOperatorDefinition),
}

#[derive(Debug)]
enum NodeKindMut<'a> {
Standard {
path: &'a String,
inputs: &'a mut BTreeMap<DataId, Input>,
},
/// Dora runtime node
Runtime(&'a mut RuntimeNode),
Custom(&'a mut CustomNode),
Operator(&'a mut SingleOperatorDefinition),
}

#[derive(Debug, Clone, Serialize, Deserialize)]
Expand Down
2 changes: 1 addition & 1 deletion libraries/core/src/descriptor/validate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use super::{resolve_path, Descriptor, SHELL_SOURCE};
const VERSION: &str = env!("CARGO_PKG_VERSION");

pub fn check_dataflow(dataflow: &Descriptor, working_dir: &Path) -> eyre::Result<()> {
let nodes = dataflow.resolve_aliases_and_set_defaults();
let nodes = dataflow.resolve_aliases_and_set_defaults()?;
let mut has_python_operator = false;

// check that nodes and operators exist
Expand Down
Loading