Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: Minor code improvements #95

Merged
merged 2 commits into from
Dec 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 4 additions & 7 deletions pipeless/src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,10 @@ impl RgbFrame {
pub fn set_original_pixels(&mut self, original_pixels: ndarray::Array3<u8>) {
self.original = original_pixels
}
pub fn get_original_pixels(&self) -> &ndarray::Array3<u8> {
&self.original
pub fn get_original_pixels(&self) -> ndarray::ArrayView3<u8> {
self.original.view()
}
pub fn get_modified_pixels(&self) -> &ndarray::Array3<u8> {
&self.modified
}
pub fn get_mutable_pixels(&mut self) -> ndarray::ArrayViewMut3<u8> {
pub fn get_modified_pixels(&mut self) -> ndarray::ArrayViewMut3<u8> {
self.modified.view_mut()
}
pub fn update_mutable_pixels(
Expand Down Expand Up @@ -148,7 +145,7 @@ impl Frame {
Frame::RgbFrame(frame) => { frame.set_original_pixels(original_pixels) },
}
}
pub fn get_original_pixels(&self) -> &ndarray::Array3<u8> {
pub fn get_original_pixels(&mut self) -> ndarray::ArrayView3<u8> {
match self {
Frame::RgbFrame(frame) => frame.get_original_pixels()
}
Expand Down
35 changes: 2 additions & 33 deletions pipeless/src/gst/utils.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use glib::BoolError;
use gstreamer as gst;
use log::{error, warn};
use log::{error, warn, debug};

pub fn create_generic_component(ctype: &str, cname: &str) -> Result<gst::Element, BoolError> {
let component = gst::ElementFactory::make(ctype)
.name(cname)
.build().or_else(|err| {
error!("Failed to create component {} of type {}", cname, ctype);
debug!("Failed to create component {} of type {}", cname, ctype);
Err(err)
});

Expand Down Expand Up @@ -52,37 +52,6 @@ pub fn fraction_from_caps_structure(
};
}

/*
/// Taginject does not accept the output of taglist.to_string()
/// this method produces the expected string
pub fn tag_list_to_string(tag_list: &gst::TagList) -> String {
let result: String = tag_list.iter()
.filter(|(tag_name, _)| tag_name.to_string() != "taglist")
.filter_map(|(tag_name, _)| {
let mut formatted_values = Vec::new();

for i in 0..tag_list.size_by_name(tag_name) {
if let Some(tag_value) = tag_list.index_generic(tag_name, i) {
formatted_values.push(format!("{:?}", tag_value));
}
}

if !formatted_values.is_empty() {
Some(format!("{}={}", tag_name, formatted_values.join(",")))
} else {
None
}

// FIXME: this loses tag values if there is more than one value in the tag
//tag_list.index_generic(tag_name, 0)
// .map(|tag_value| format!("{}={:?}", tag_name, tag_value))
})
.collect::<Vec<String>>()
.join(";");

result
}*/

pub fn tag_list_to_string(tag_list: &gst::TagList) -> String {
let mut formatted_tags: Vec<String> = Vec::new();

Expand Down
2 changes: 1 addition & 1 deletion pipeless/src/output/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ impl Pipeline {

pub fn on_new_frame(&self, frame: pipeless::data::Frame) -> Result<(), OutputPipelineError>{
match frame {
pipeless::data::Frame::RgbFrame(rgb_frame) => {
pipeless::data::Frame::RgbFrame(mut rgb_frame) => {
let modified_pixels = rgb_frame.get_modified_pixels();
let out_frame_data = modified_pixels.as_slice()
.ok_or_else(|| { OutputPipelineError::new("Unable to get bytes data from RGB frame. Is your output image of the same shape as the input?") })?;
Expand Down
24 changes: 13 additions & 11 deletions pipeless/src/stages/languages/python.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use log::{error, warn};
use pyo3::prelude::*;
use numpy;
use numpy::{self, ToPyArray};

use crate::{data::{RgbFrame, Frame}, stages::{hook::{HookTrait, HookType}, stage::ContextTrait}, stages::stage::Context, kvs::store};

Expand All @@ -26,30 +26,31 @@ impl<'source> FromPyObject<'source> for Frame {
}
/// Allows the RgbFrame variant of Frame to be converted from Rust to Python
impl IntoPy<Py<PyAny>> for RgbFrame {
fn into_py(self, py: Python) -> Py<PyAny> {
fn into_py(mut self, py: Python) -> Py<PyAny> {
let dict = pyo3::types::PyDict::new(py);
dict.set_item("uuid", self.get_uuid().to_string()).unwrap();
dict.set_item("original", numpy::PyArray3::from_array(py, self.get_original_pixels())).unwrap();
dict.set_item("modified", numpy::PyArray3::from_array(py, self.get_modified_pixels())).unwrap();
dict.set_item("original", self.get_original_pixels().to_pyarray(py)).unwrap();
dict.set_item("modified", self.get_modified_pixels().to_pyarray(py)).unwrap();
dict.set_item("width", self.get_width()).unwrap();
dict.set_item("height", self.get_height()).unwrap();
dict.set_item("pts", self.get_pts().mseconds()).unwrap();
dict.set_item("dts", self.get_dts().mseconds()).unwrap();
dict.set_item("duration", self.get_duration().mseconds()).unwrap();
dict.set_item("fps", self.get_fps()).unwrap();
dict.set_item("input_ts", self.get_input_ts()).unwrap();
dict.set_item("inference_input", numpy::PyArrayDyn::from_array(py, self.get_inference_input())).unwrap();
dict.set_item("inference_output", numpy::PyArrayDyn::from_array(py, self.get_inference_output())).unwrap();
dict.set_item("inference_input", self.get_inference_input().to_pyarray(py)).unwrap();
dict.set_item("inference_output", self.get_inference_output().to_pyarray(py)).unwrap();
dict.set_item("pipeline_id", self.get_pipeline_id().to_string()).unwrap();
dict.into()
}
}

/// Allows the RgbFrame variant of Frame to be converted from Python to Rust
impl<'source> FromPyObject<'source> for RgbFrame {
fn extract(ob: &'source PyAny) -> PyResult<Self> {
let original_py_array: &numpy::PyArray3<u8> = ob.get_item("original")?.extract()?;
let original_py_array: &numpy::PyArray3<u8> = ob.get_item("original")?.downcast::<numpy::PyArray3<u8>>()?;
let original_ndarray: ndarray::Array3<u8> = original_py_array.to_owned_array();
let modified_py_array: &numpy::PyArray3<u8> = ob.get_item("modified")?.extract()?;
let modified_py_array: &numpy::PyArray3<u8> = ob.get_item("modified")?.downcast::<numpy::PyArray3<u8>>()?;
let modified_ndarray: ndarray::Array3<u8> = modified_py_array.to_owned_array();

let inference_input_ndarray: ndarray::ArrayBase<_, ndarray::Dim<ndarray::IxDynImpl>>;
Expand Down Expand Up @@ -153,6 +154,7 @@ impl PythonHook {
let module_file_name = format!("{}.py", module_name);
let wrapper_module_name = format!("{}_wrapper", module_name);
let wrapper_module_file_name = format!("{}.py", wrapper_module_name);
// TODO: create a KVS module for python using PYo3 and expose it via pyo3::append_to_inittab!(make_person_module); so users can import it on their hooks
let wrapper_py_code = format!("
import {0}

Expand Down Expand Up @@ -200,7 +202,7 @@ def hook_wrapper(frame, context):
}
impl HookTrait for PythonHook {
/// Executes a Python hook by obtaining the GIL and passes the provided frame and stage context to it
fn exec_hook(&self, frame: Frame, _stage_context: &Context) -> Option<Frame> {
fn exec_hook(&self, mut frame: Frame, _stage_context: &Context) -> Option<Frame> {
let py_module = self.get_module();
let out_frame = Python::with_gil(|py| -> Option<Frame> {
let stage_context = match _stage_context {
Expand All @@ -213,7 +215,7 @@ impl HookTrait for PythonHook {
};

if let Ok(hook_func) = py_module.getattr(py, "hook_wrapper") {
let original_pixels = frame.get_original_pixels().to_owned();
let original_pixels = frame.get_original_pixels().to_owned(); // it seems that to_owned implies to copy data
// TODO: we will probably need a pool of interpreters to avoid initializing a new one ever time because o cold starts
// TODO: this acquires the Python GIL, breaking the concurrency, except when we are running on different cores thanks
// to how we invoke frame processing using the Tokio thread pool, becuase it runs threads on all the cores.
Expand All @@ -222,7 +224,7 @@ impl HookTrait for PythonHook {
Ok(ret) => {
match ret.extract::<Frame>(py) {
Ok(mut f) => {
// Avoid the user to accidentally the original frame
// Avoid the user to accidentally modify the original frame
f.set_original_pixels(original_pixels);
return Some(f)
},
Expand Down
5 changes: 0 additions & 5 deletions pipeless/src/stages/path.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,7 @@ async fn run_hook(
frame: Option<pipeless::data::Frame>,
) -> Option<pipeless::data::Frame> {
if let Some(frame) = frame {
// Offload CPU bounded task to a worker thread
let context = stage.get_context();
// NOTE: I hcnaged this and it works faster, however the process time is like 4 times bigger
// Using spawn_blocking there are not several models instantiated. Using spawn, everal models are instantiated.
// Testing without any of those, since we have await -> frames run one after the other
// All the cases take the same time to run the whole stream...
return hook.exec_hook(frame, context).await;
}

Expand Down