Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow top-level fields in node declaration #478

Merged
merged 5 commits into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion binaries/cli/src/attach.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub fn attach_dataflow(
// Generate path hashmap
let mut node_path_lookup = HashMap::new();

let nodes = dataflow.resolve_aliases_and_set_defaults();
let nodes = dataflow.resolve_aliases_and_set_defaults()?;

let working_dir = dataflow_path
.canonicalize()
Expand Down
9 changes: 7 additions & 2 deletions binaries/cli/src/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,13 @@ pub fn build(dataflow: &Path) -> eyre::Result<()> {

let default_op_id = OperatorId::from(SINGLE_OPERATOR_DEFAULT_ID.to_string());

for node in &descriptor.nodes {
match &node.kind {
for node in descriptor.nodes {
match node.kind()? {
dora_core::descriptor::NodeKind::Standard(_) => {
run_build_command(node.build.as_deref(), working_dir).with_context(|| {
format!("build command failed for standard node `{}`", node.id)
})?
}
dora_core::descriptor::NodeKind::Runtime(runtime_node) => {
for operator in &runtime_node.operators {
run_build_command(operator.config.build.as_deref(), working_dir).with_context(
Expand Down
2 changes: 1 addition & 1 deletion binaries/coordinator/src/run/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub(super) async fn spawn_dataflow(
) -> eyre::Result<SpawnedDataflow> {
dataflow.check(&working_dir)?;

let nodes = dataflow.resolve_aliases_and_set_defaults();
let nodes = dataflow.resolve_aliases_and_set_defaults()?;
let uuid = Uuid::new_v7(Timestamp::now(NoContext));

let machines: BTreeSet<_> = nodes.iter().map(|n| n.deploy.machine.clone()).collect();
Expand Down
2 changes: 1 addition & 1 deletion binaries/daemon/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ impl Daemon {

let descriptor = Descriptor::read(dataflow_path).await?;
descriptor.check(&working_dir)?;
let nodes = descriptor.resolve_aliases_and_set_defaults();
let nodes = descriptor.resolve_aliases_and_set_defaults()?;

let spawn_command = SpawnDataflowNodes {
dataflow_id: Uuid::new_v7(Timestamp::now(NoContext)),
Expand Down
6 changes: 6 additions & 0 deletions binaries/daemon/src/spawn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,13 @@ pub async fn spawn_node(
);
// Injecting the env variable defined in the `yaml` into
// the node runtime.
if let Some(envs) = node.env {
for (key, value) in envs {
command.env(key, value.to_string());
}
}
if let Some(envs) = n.envs {
// node has some inner env variables -> add them too
for (key, value) in envs {
command.env(key, value.to_string());
}
Expand Down
45 changes: 21 additions & 24 deletions examples/rust-dataflow/dataflow.yml
Original file line number Diff line number Diff line change
@@ -1,31 +1,28 @@
nodes:
- id: rust-node
build: cargo build -p rust-dataflow-example-node
path: ../../target/debug/rust-dataflow-example-node
inputs:
tick: dora/timer/millis/10
outputs:
- random
- id: rust-status-node
custom:
build: cargo build -p rust-dataflow-example-node
source: ../../target/debug/rust-dataflow-example-node
build: cargo build -p rust-dataflow-example-status-node
source: ../../target/debug/rust-dataflow-example-status-node
inputs:
tick: dora/timer/millis/10
tick: dora/timer/millis/100
random: rust-node/random
outputs:
- random
- id: rust-status-node
custom:
build: cargo build -p rust-dataflow-example-status-node
source: ../../target/debug/rust-dataflow-example-status-node
inputs:
tick: dora/timer/millis/100
random: rust-node/random
outputs:
- status
- status
- id: rust-sink
custom:
build: cargo build -p rust-dataflow-example-sink
source: ../../target/debug/rust-dataflow-example-sink
inputs:
message: rust-status-node/status
build: cargo build -p rust-dataflow-example-sink
path: ../../target/debug/rust-dataflow-example-sink
inputs:
message: rust-status-node/status
- id: dora-record
custom:
build: cargo build -p dora-record
source: ../../target/debug/dora-record
inputs:
message: rust-status-node/status
random: rust-node/random
build: cargo build -p dora-record
path: ../../target/debug/dora-record
inputs:
message: rust-status-node/status
random: rust-node/random
117 changes: 71 additions & 46 deletions libraries/core/dora-schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
]
},
"envs": {
"description": "Environment variables for the custom nodes",
"description": "Environment variables for the custom nodes\n\nDeprecated, use outer-level `env` field instead.",
"type": [
"object",
"null"
Expand All @@ -46,13 +46,13 @@
}
},
"inputs": {
"description": "Inputs for the nodes as a map from input ID to `<node_id>/<output_id>`.\n\ne.g.\n\ninputs:\n\nexample_input: example_node/example_output1",
"description": "Inputs for the nodes as a map from input ID to `node_id/output_id`.\n\ne.g.\n\ninputs:\n\nexample_input: example_node/example_output1",
"default": {},
"type": "object",
"additionalProperties": true
},
"outputs": {
"description": "Outputs as a list of outputs.\n\ne.g.\n\noutputs:\n\n- output_1\n\n- output_2",
"description": "List of output IDs.\n\ne.g.\n\noutputs:\n\n- output_1\n\n- output_2",
"default": [],
"type": "array",
"items": {
Expand Down Expand Up @@ -169,52 +169,32 @@
"Node": {
"description": "Dora Node",
"type": "object",
"oneOf": [
{
"description": "Dora runtime node",
"type": "object",
"required": [
"operators"
],
"properties": {
"operators": {
"type": "array",
"items": {
"$ref": "#/definitions/OperatorDefinition"
}
}
},
"additionalProperties": true
},
{
"type": "object",
"required": [
"custom"
],
"properties": {
"custom": {
"$ref": "#/definitions/CustomNode"
}
},
"additionalProperties": true
},
{
"type": "object",
"required": [
"operator"
],
"properties": {
"operator": {
"$ref": "#/definitions/SingleOperatorDefinition"
}
},
"additionalProperties": true
}
],
"required": [
"id"
],
"properties": {
"args": {
"type": [
"string",
"null"
]
},
"build": {
"type": [
"string",
"null"
]
},
"custom": {
"anyOf": [
{
"$ref": "#/definitions/CustomNode"
},
{
"type": "null"
}
]
},
"description": {
"description": "Description of the node",
"type": [
Expand All @@ -240,14 +220,59 @@
}
]
},
"inputs": {
"default": {},
"type": "object",
"additionalProperties": true
},
"name": {
"description": "Node name",
"type": [
"string",
"null"
]
},
"operator": {
"anyOf": [
{
"$ref": "#/definitions/SingleOperatorDefinition"
},
{
"type": "null"
}
]
},
"operators": {
"type": [
"array",
"null"
],
"items": {
"$ref": "#/definitions/OperatorDefinition"
}
},
"outputs": {
"default": [],
"type": "array",
"items": {
"$ref": "#/definitions/DataId"
},
"uniqueItems": true
},
"path": {
"type": [
"string",
"null"
]
},
"send_stdout_as": {
"type": [
"string",
"null"
]
}
}
},
"additionalProperties": true
},
"NodeId": {
"type": "string"
Expand Down
Loading
Loading