Skip to content

Commit

Permalink
Merge pull request #478 from dora-rs/dataflow-parsing
Browse files Browse the repository at this point in the history
Allow top-level fields in node declaration
  • Loading branch information
haixuanTao authored May 29, 2024
2 parents ea47a55 + 7364860 commit e7858a6
Show file tree
Hide file tree
Showing 9 changed files with 231 additions and 102 deletions.
2 changes: 1 addition & 1 deletion binaries/cli/src/attach.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub fn attach_dataflow(
// Generate path hashmap
let mut node_path_lookup = HashMap::new();

let nodes = dataflow.resolve_aliases_and_set_defaults();
let nodes = dataflow.resolve_aliases_and_set_defaults()?;

let working_dir = dataflow_path
.canonicalize()
Expand Down
9 changes: 7 additions & 2 deletions binaries/cli/src/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,13 @@ pub fn build(dataflow: &Path) -> eyre::Result<()> {

let default_op_id = OperatorId::from(SINGLE_OPERATOR_DEFAULT_ID.to_string());

for node in &descriptor.nodes {
match &node.kind {
for node in descriptor.nodes {
match node.kind()? {
dora_core::descriptor::NodeKind::Standard(_) => {
run_build_command(node.build.as_deref(), working_dir).with_context(|| {
format!("build command failed for standard node `{}`", node.id)
})?
}
dora_core::descriptor::NodeKind::Runtime(runtime_node) => {
for operator in &runtime_node.operators {
run_build_command(operator.config.build.as_deref(), working_dir).with_context(
Expand Down
2 changes: 1 addition & 1 deletion binaries/coordinator/src/run/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub(super) async fn spawn_dataflow(
) -> eyre::Result<SpawnedDataflow> {
dataflow.check(&working_dir)?;

let nodes = dataflow.resolve_aliases_and_set_defaults();
let nodes = dataflow.resolve_aliases_and_set_defaults()?;
let uuid = Uuid::new_v7(Timestamp::now(NoContext));

let machines: BTreeSet<_> = nodes.iter().map(|n| n.deploy.machine.clone()).collect();
Expand Down
2 changes: 1 addition & 1 deletion binaries/daemon/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ impl Daemon {

let descriptor = Descriptor::read(dataflow_path).await?;
descriptor.check(&working_dir)?;
let nodes = descriptor.resolve_aliases_and_set_defaults();
let nodes = descriptor.resolve_aliases_and_set_defaults()?;

let spawn_command = SpawnDataflowNodes {
dataflow_id: Uuid::new_v7(Timestamp::now(NoContext)),
Expand Down
6 changes: 6 additions & 0 deletions binaries/daemon/src/spawn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,13 @@ pub async fn spawn_node(
);
// Injecting the env variable defined in the `yaml` into
// the node runtime.
if let Some(envs) = node.env {
for (key, value) in envs {
command.env(key, value.to_string());
}
}
if let Some(envs) = n.envs {
// node has some inner env variables -> add them too
for (key, value) in envs {
command.env(key, value.to_string());
}
Expand Down
45 changes: 21 additions & 24 deletions examples/rust-dataflow/dataflow.yml
Original file line number Diff line number Diff line change
@@ -1,31 +1,28 @@
nodes:
- id: rust-node
build: cargo build -p rust-dataflow-example-node
path: ../../target/debug/rust-dataflow-example-node
inputs:
tick: dora/timer/millis/10
outputs:
- random
- id: rust-status-node
custom:
build: cargo build -p rust-dataflow-example-node
source: ../../target/debug/rust-dataflow-example-node
build: cargo build -p rust-dataflow-example-status-node
source: ../../target/debug/rust-dataflow-example-status-node
inputs:
tick: dora/timer/millis/10
tick: dora/timer/millis/100
random: rust-node/random
outputs:
- random
- id: rust-status-node
custom:
build: cargo build -p rust-dataflow-example-status-node
source: ../../target/debug/rust-dataflow-example-status-node
inputs:
tick: dora/timer/millis/100
random: rust-node/random
outputs:
- status
- status
- id: rust-sink
custom:
build: cargo build -p rust-dataflow-example-sink
source: ../../target/debug/rust-dataflow-example-sink
inputs:
message: rust-status-node/status
build: cargo build -p rust-dataflow-example-sink
path: ../../target/debug/rust-dataflow-example-sink
inputs:
message: rust-status-node/status
- id: dora-record
custom:
build: cargo build -p dora-record
source: ../../target/debug/dora-record
inputs:
message: rust-status-node/status
random: rust-node/random
build: cargo build -p dora-record
path: ../../target/debug/dora-record
inputs:
message: rust-status-node/status
random: rust-node/random
117 changes: 71 additions & 46 deletions libraries/core/dora-schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
]
},
"envs": {
"description": "Environment variables for the custom nodes",
"description": "Environment variables for the custom nodes\n\nDeprecated, use outer-level `env` field instead.",
"type": [
"object",
"null"
Expand All @@ -46,13 +46,13 @@
}
},
"inputs": {
"description": "Inputs for the nodes as a map from input ID to `<node_id>/<output_id>`.\n\ne.g.\n\ninputs:\n\nexample_input: example_node/example_output1",
"description": "Inputs for the nodes as a map from input ID to `node_id/output_id`.\n\ne.g.\n\ninputs:\n\nexample_input: example_node/example_output1",
"default": {},
"type": "object",
"additionalProperties": true
},
"outputs": {
"description": "Outputs as a list of outputs.\n\ne.g.\n\noutputs:\n\n- output_1\n\n- output_2",
"description": "List of output IDs.\n\ne.g.\n\noutputs:\n\n- output_1\n\n- output_2",
"default": [],
"type": "array",
"items": {
Expand Down Expand Up @@ -169,52 +169,32 @@
"Node": {
"description": "Dora Node",
"type": "object",
"oneOf": [
{
"description": "Dora runtime node",
"type": "object",
"required": [
"operators"
],
"properties": {
"operators": {
"type": "array",
"items": {
"$ref": "#/definitions/OperatorDefinition"
}
}
},
"additionalProperties": true
},
{
"type": "object",
"required": [
"custom"
],
"properties": {
"custom": {
"$ref": "#/definitions/CustomNode"
}
},
"additionalProperties": true
},
{
"type": "object",
"required": [
"operator"
],
"properties": {
"operator": {
"$ref": "#/definitions/SingleOperatorDefinition"
}
},
"additionalProperties": true
}
],
"required": [
"id"
],
"properties": {
"args": {
"type": [
"string",
"null"
]
},
"build": {
"type": [
"string",
"null"
]
},
"custom": {
"anyOf": [
{
"$ref": "#/definitions/CustomNode"
},
{
"type": "null"
}
]
},
"description": {
"description": "Description of the node",
"type": [
Expand All @@ -240,14 +220,59 @@
}
]
},
"inputs": {
"default": {},
"type": "object",
"additionalProperties": true
},
"name": {
"description": "Node name",
"type": [
"string",
"null"
]
},
"operator": {
"anyOf": [
{
"$ref": "#/definitions/SingleOperatorDefinition"
},
{
"type": "null"
}
]
},
"operators": {
"type": [
"array",
"null"
],
"items": {
"$ref": "#/definitions/OperatorDefinition"
}
},
"outputs": {
"default": [],
"type": "array",
"items": {
"$ref": "#/definitions/DataId"
},
"uniqueItems": true
},
"path": {
"type": [
"string",
"null"
]
},
"send_stdout_as": {
"type": [
"string",
"null"
]
}
}
},
"additionalProperties": true
},
"NodeId": {
"type": "string"
Expand Down
Loading

0 comments on commit e7858a6

Please sign in to comment.