diff --git a/pipeless/src/data.rs b/pipeless/src/data.rs index 94506ef..45daf63 100644 --- a/pipeless/src/data.rs +++ b/pipeless/src/data.rs @@ -66,13 +66,10 @@ impl RgbFrame { pub fn set_original_pixels(&mut self, original_pixels: ndarray::Array3) { self.original = original_pixels } - pub fn get_original_pixels(&self) -> &ndarray::Array3 { - &self.original + pub fn get_original_pixels(&self) -> ndarray::ArrayView3 { + self.original.view() } - pub fn get_modified_pixels(&self) -> &ndarray::Array3 { - &self.modified - } - pub fn get_mutable_pixels(&mut self) -> ndarray::ArrayViewMut3 { + pub fn get_modified_pixels(&mut self) -> ndarray::ArrayViewMut3 { self.modified.view_mut() } pub fn update_mutable_pixels( @@ -148,7 +145,7 @@ impl Frame { Frame::RgbFrame(frame) => { frame.set_original_pixels(original_pixels) }, } } - pub fn get_original_pixels(&self) -> &ndarray::Array3 { + pub fn get_original_pixels(&mut self) -> ndarray::ArrayView3 { match self { Frame::RgbFrame(frame) => frame.get_original_pixels() } diff --git a/pipeless/src/gst/utils.rs b/pipeless/src/gst/utils.rs index f5d38dd..03c6650 100644 --- a/pipeless/src/gst/utils.rs +++ b/pipeless/src/gst/utils.rs @@ -1,12 +1,12 @@ use glib::BoolError; use gstreamer as gst; -use log::{error, warn}; +use log::{error, warn, debug}; pub fn create_generic_component(ctype: &str, cname: &str) -> Result { let component = gst::ElementFactory::make(ctype) .name(cname) .build().or_else(|err| { - error!("Failed to create component {} of type {}", cname, ctype); + debug!("Failed to create component {} of type {}", cname, ctype); Err(err) }); @@ -52,37 +52,6 @@ pub fn fraction_from_caps_structure( }; } -/* -/// Taginject does not accept the output of taglist.to_string() -/// this method produces the expected string -pub fn tag_list_to_string(tag_list: &gst::TagList) -> String { - let result: String = tag_list.iter() - .filter(|(tag_name, _)| tag_name.to_string() != "taglist") - .filter_map(|(tag_name, _)| { - let mut formatted_values = Vec::new(); - - for i in 0..tag_list.size_by_name(tag_name) { - if let Some(tag_value) = tag_list.index_generic(tag_name, i) { - formatted_values.push(format!("{:?}", tag_value)); - } - } - - if !formatted_values.is_empty() { - Some(format!("{}={}", tag_name, formatted_values.join(","))) - } else { - None - } - - // FIXME: this loses tag values if there is more than one value in the tag - //tag_list.index_generic(tag_name, 0) - // .map(|tag_value| format!("{}={:?}", tag_name, tag_value)) - }) - .collect::>() - .join(";"); - - result -}*/ - pub fn tag_list_to_string(tag_list: &gst::TagList) -> String { let mut formatted_tags: Vec = Vec::new(); diff --git a/pipeless/src/output/pipeline.rs b/pipeless/src/output/pipeline.rs index 771e0cd..33f5abf 100644 --- a/pipeless/src/output/pipeline.rs +++ b/pipeless/src/output/pipeline.rs @@ -417,7 +417,7 @@ impl Pipeline { pub fn on_new_frame(&self, frame: pipeless::data::Frame) -> Result<(), OutputPipelineError>{ match frame { - pipeless::data::Frame::RgbFrame(rgb_frame) => { + pipeless::data::Frame::RgbFrame(mut rgb_frame) => { let modified_pixels = rgb_frame.get_modified_pixels(); let out_frame_data = modified_pixels.as_slice() .ok_or_else(|| { OutputPipelineError::new("Unable to get bytes data from RGB frame. Is your output image of the same shape as the input?") })?; diff --git a/pipeless/src/stages/languages/python.rs b/pipeless/src/stages/languages/python.rs index bbd93af..3306309 100644 --- a/pipeless/src/stages/languages/python.rs +++ b/pipeless/src/stages/languages/python.rs @@ -1,6 +1,6 @@ use log::{error, warn}; use pyo3::prelude::*; -use numpy; +use numpy::{self, ToPyArray}; use crate::{data::{RgbFrame, Frame}, stages::{hook::{HookTrait, HookType}, stage::ContextTrait}, stages::stage::Context, kvs::store}; @@ -26,11 +26,11 @@ impl<'source> FromPyObject<'source> for Frame { } /// Allows the RgbFrame variant of Frame to be converted from Rust to Python impl IntoPy> for RgbFrame { - fn into_py(self, py: Python) -> Py { + fn into_py(mut self, py: Python) -> Py { let dict = pyo3::types::PyDict::new(py); dict.set_item("uuid", self.get_uuid().to_string()).unwrap(); - dict.set_item("original", numpy::PyArray3::from_array(py, self.get_original_pixels())).unwrap(); - dict.set_item("modified", numpy::PyArray3::from_array(py, self.get_modified_pixels())).unwrap(); + dict.set_item("original", self.get_original_pixels().to_pyarray(py)).unwrap(); + dict.set_item("modified", self.get_modified_pixels().to_pyarray(py)).unwrap(); dict.set_item("width", self.get_width()).unwrap(); dict.set_item("height", self.get_height()).unwrap(); dict.set_item("pts", self.get_pts().mseconds()).unwrap(); @@ -38,18 +38,19 @@ impl IntoPy> for RgbFrame { dict.set_item("duration", self.get_duration().mseconds()).unwrap(); dict.set_item("fps", self.get_fps()).unwrap(); dict.set_item("input_ts", self.get_input_ts()).unwrap(); - dict.set_item("inference_input", numpy::PyArrayDyn::from_array(py, self.get_inference_input())).unwrap(); - dict.set_item("inference_output", numpy::PyArrayDyn::from_array(py, self.get_inference_output())).unwrap(); + dict.set_item("inference_input", self.get_inference_input().to_pyarray(py)).unwrap(); + dict.set_item("inference_output", self.get_inference_output().to_pyarray(py)).unwrap(); dict.set_item("pipeline_id", self.get_pipeline_id().to_string()).unwrap(); dict.into() } } + /// Allows the RgbFrame variant of Frame to be converted from Python to Rust impl<'source> FromPyObject<'source> for RgbFrame { fn extract(ob: &'source PyAny) -> PyResult { - let original_py_array: &numpy::PyArray3 = ob.get_item("original")?.extract()?; + let original_py_array: &numpy::PyArray3 = ob.get_item("original")?.downcast::>()?; let original_ndarray: ndarray::Array3 = original_py_array.to_owned_array(); - let modified_py_array: &numpy::PyArray3 = ob.get_item("modified")?.extract()?; + let modified_py_array: &numpy::PyArray3 = ob.get_item("modified")?.downcast::>()?; let modified_ndarray: ndarray::Array3 = modified_py_array.to_owned_array(); let inference_input_ndarray: ndarray::ArrayBase<_, ndarray::Dim>; @@ -153,6 +154,7 @@ impl PythonHook { let module_file_name = format!("{}.py", module_name); let wrapper_module_name = format!("{}_wrapper", module_name); let wrapper_module_file_name = format!("{}.py", wrapper_module_name); + // TODO: create a KVS module for python using PYo3 and expose it via pyo3::append_to_inittab!(make_person_module); so users can import it on their hooks let wrapper_py_code = format!(" import {0} @@ -200,7 +202,7 @@ def hook_wrapper(frame, context): } impl HookTrait for PythonHook { /// Executes a Python hook by obtaining the GIL and passes the provided frame and stage context to it - fn exec_hook(&self, frame: Frame, _stage_context: &Context) -> Option { + fn exec_hook(&self, mut frame: Frame, _stage_context: &Context) -> Option { let py_module = self.get_module(); let out_frame = Python::with_gil(|py| -> Option { let stage_context = match _stage_context { @@ -213,7 +215,7 @@ impl HookTrait for PythonHook { }; if let Ok(hook_func) = py_module.getattr(py, "hook_wrapper") { - let original_pixels = frame.get_original_pixels().to_owned(); + let original_pixels = frame.get_original_pixels().to_owned(); // it seems that to_owned implies to copy data // TODO: we will probably need a pool of interpreters to avoid initializing a new one ever time because o cold starts // TODO: this acquires the Python GIL, breaking the concurrency, except when we are running on different cores thanks // to how we invoke frame processing using the Tokio thread pool, becuase it runs threads on all the cores. @@ -222,7 +224,7 @@ impl HookTrait for PythonHook { Ok(ret) => { match ret.extract::(py) { Ok(mut f) => { - // Avoid the user to accidentally the original frame + // Avoid the user to accidentally modify the original frame f.set_original_pixels(original_pixels); return Some(f) }, diff --git a/pipeless/src/stages/path.rs b/pipeless/src/stages/path.rs index fe8e264..eb5a770 100644 --- a/pipeless/src/stages/path.rs +++ b/pipeless/src/stages/path.rs @@ -102,12 +102,7 @@ async fn run_hook( frame: Option, ) -> Option { if let Some(frame) = frame { - // Offload CPU bounded task to a worker thread let context = stage.get_context(); - // NOTE: I hcnaged this and it works faster, however the process time is like 4 times bigger - // Using spawn_blocking there are not several models instantiated. Using spawn, everal models are instantiated. - // Testing without any of those, since we have await -> frames run one after the other - // All the cases take the same time to run the whole stream... return hook.exec_hook(frame, context).await; }