From 9b3b7af986f487d36b129487cc2766d486caab88 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Wed, 28 Feb 2024 12:33:21 +0100 Subject: [PATCH] Add possibility to send stdout for operators and add warnings when there is multiple operators --- binaries/daemon/src/lib.rs | 6 ++++- binaries/daemon/src/spawn.rs | 2 +- .../python-operator-dataflow/dataflow.yml | 3 ++- libraries/core/src/descriptor/mod.rs | 27 ++++++++++++++++--- 4 files changed, 32 insertions(+), 6 deletions(-) diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index 10c3c5f8c..702373bab 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -985,7 +985,11 @@ impl Daemon { }; let Some(subscribers) = dataflow.mappings.get(&output_id) else { - tracing::warn!("No subscribers found for {:?} in {:?}", output_id, dataflow.mappings); + tracing::warn!( + "No subscribers found for {:?} in {:?}", + output_id, + dataflow.mappings + ); return Ok(RunStatus::Continue); }; diff --git a/binaries/daemon/src/spawn.rs b/binaries/daemon/src/spawn.rs index 765da3a48..9d736b5a9 100644 --- a/binaries/daemon/src/spawn.rs +++ b/binaries/daemon/src/spawn.rs @@ -58,7 +58,7 @@ pub async fn spawn_node( clock.clone(), ) .await?; - let send_stdout_to = node.send_stdout_as().map(ToOwned::to_owned); + let send_stdout_to = node.send_stdout_as(); let mut child = match node.kind { dora_core::descriptor::CoreNodeKind::Custom(n) => { diff --git a/examples/python-operator-dataflow/dataflow.yml b/examples/python-operator-dataflow/dataflow.yml index 27d274266..c75009442 100644 --- a/examples/python-operator-dataflow/dataflow.yml +++ b/examples/python-operator-dataflow/dataflow.yml @@ -10,11 +10,12 @@ nodes: - id: object_detection operator: python: object_detection.py + send_stdout_as: stdout inputs: image: webcam/image outputs: - bbox - - logs + - stdout - id: plot operator: diff --git a/libraries/core/src/descriptor/mod.rs b/libraries/core/src/descriptor/mod.rs index 9fae48850..7f114e594 100644 --- a/libraries/core/src/descriptor/mod.rs +++ b/libraries/core/src/descriptor/mod.rs @@ -10,6 +10,7 @@ use std::{ fmt, path::{Path, PathBuf}, }; +use tracing::warn; pub use visualize::collect_dora_timers; mod validate; @@ -165,10 +166,28 @@ pub struct ResolvedNode { } impl ResolvedNode { - pub fn send_stdout_as(&self) -> Option<&str> { + pub fn send_stdout_as(&self) -> Option { match &self.kind { - CoreNodeKind::Runtime(_) => None, // todo: add support for operator-level stdout capture - CoreNodeKind::Custom(n) => n.send_stdout_as.as_deref(), + // TODO: Split stdout between operators + CoreNodeKind::Runtime(n) => { + let count = n + .operators + .iter() + .filter(|op| op.config.send_stdout_as.is_some()) + .count(); + if count == 1 && n.operators.len() > 1 { + warn!("All stdout from all operators of a runtime are going to be sent in the selected `send_stdout_as` operator.") + } else if count > 1 { + warn!("More than one `send_stdout_as` operators for a runtime node. Selecting the first stdout operator.") + } + n.operators.iter().find_map(|op| { + op.config + .send_stdout_as + .clone() + .map(|stdout| format!("{}/{}", op.id, stdout)) + }) + } + CoreNodeKind::Custom(n) => n.send_stdout_as.clone(), } } } @@ -233,6 +252,8 @@ pub struct OperatorConfig { #[serde(default, skip_serializing_if = "Option::is_none")] pub build: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub send_stdout_as: Option, } #[derive(Debug, Serialize, Deserialize, Clone)]