From c65c51ab1cbcf875fdbec480f4d58c15d22a13d2 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Wed, 29 Jun 2022 11:46:30 +0200 Subject: [PATCH] Release the Python interpreter lock when waiting for data As long as we keep the global interpreter lock (GIL) active, no other Python thread can make progress. This lead to starvation when there are multiple Python operators on the same runtime node. This PR fixes this by holding the GIL only as long as needed, i.e. when calling code of the operator. Most importantly, we don't hold the GIL anymore when the operator is idle and waiting for new inputs. --- runtime/src/operator/python.rs | 40 +++++++++++++++++++++------------- 1 file changed, 25 insertions(+), 15 deletions(-) diff --git a/runtime/src/operator/python.rs b/runtime/src/operator/python.rs index 90702dde0..ca0cfa669 100644 --- a/runtime/src/operator/python.rs +++ b/runtime/src/operator/python.rs @@ -1,6 +1,6 @@ use super::{OperatorEvent, OperatorInput}; use eyre::{bail, eyre, Context}; -use pyo3::{pyclass, types::IntoPyDict, Python}; +use pyo3::{pyclass, types::IntoPyDict, Py, Python}; use std::{ panic::{catch_unwind, AssertUnwindSafe}, path::Path, @@ -25,7 +25,7 @@ pub fn spawn( events_tx: events_tx.clone(), }; - let python_runner = move |py: pyo3::Python| { + let init_operator = move |py: Python| { if let Some(parent_path) = path.parent() { let parent_path = parent_path .to_str() @@ -58,32 +58,42 @@ pub fn spawn( let operator = py .eval("Operator()", None, Some(locals)) .wrap_err("failed to create Operator instance")?; + Result::<_, eyre::Report>::Ok(Py::from(operator)) + }; + + let python_runner = move || { + let operator = + Python::with_gil(init_operator).wrap_err("failed to init python operator")?; while let Some(input) = inputs.blocking_recv() { - operator - .call_method1( + Python::with_gil(|py| { + operator.call_method1( + py, "on_input", (input.id.to_string(), input.value, send_output.clone()), ) - .wrap_err("on_input failed")?; + }) + .wrap_err("on_input failed")?; } - if operator - .hasattr("drop_operator") - .wrap_err("failed to look for drop_operator")? - { - operator - .call_method0("drop_operator") - .wrap_err("drop_operator failed")?; - } + Python::with_gil(|py| { + let operator = operator.as_ref(py); + if operator + .hasattr("drop_operator") + .wrap_err("failed to look for drop_operator")? + { + operator.call_method0("drop_operator")?; + } + Result::<_, eyre::Report>::Ok(()) + })?; Result::<_, eyre::Report>::Ok(()) }; thread::spawn(move || { let closure = AssertUnwindSafe(|| { - let result = Python::with_gil(python_runner); - result.wrap_err_with(|| format!("error in Python module at {}", path_cloned.display())) + python_runner() + .wrap_err_with(|| format!("error in Python module at {}", path_cloned.display())) }); match catch_unwind(closure) {