Skip to content

Commit

Permalink
Merge pull request #564 from lf-lang/python-allow-async-threads-concu…
Browse files Browse the repository at this point in the history
…rrently

Python target: Allow the execution of async Python threads concurrently with the C runtime
  • Loading branch information
lhstrh authored Oct 3, 2021
2 parents 620d437 + 77acbd1 commit 9087aa8
Show file tree
Hide file tree
Showing 6 changed files with 143 additions and 111 deletions.
49 changes: 29 additions & 20 deletions org.lflang/src/lib/Python/pythontarget.c
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,15 @@ static PyObject* py_request_stop(PyObject *self) {
static PyObject* py_main(PyObject *self, PyObject *args) {
DEBUG_PRINT("Initializing main.");
const char *argv[] = {TOSTRING(MODULE_NAME), NULL };

// Initialize the Python interpreter
Py_Initialize();

DEBUG_PRINT("Initialized the Python interpreter.");

Py_BEGIN_ALLOW_THREADS
lf_reactor_c_main(1, argv);
Py_END_ALLOW_THREADS

Py_INCREF(Py_None);
return Py_None;
Expand Down Expand Up @@ -966,25 +974,22 @@ PyObject* convert_C_action_to_py(void* action) {
*/
PyObject*
get_python_function(string module, string class, int instance_id, string func) {

// Set if the interpreter is already initialized
int is_initialized = 0;

if (Py_IsInitialized()) {
is_initialized = 1;
}

DEBUG_PRINT("Starting the function start().");

// Necessary PyObject variables to load the react() function from test.py
PyObject *pFileName, *pModule, *pDict, *pClasses, *pClass, *pFunc;

PyObject *rValue;

// Initialize the Python interpreter
Py_Initialize();

DEBUG_PRINT("Initialized the Python interpreter.");
// According to
// https://docs.python.org/3/c-api/init.html#non-python-created-threads
// the following code does the following:
// - Register this thread with the interpreter
// - Acquire the GIL (Global Interpreter Lock)
// - Store (return) the thread pointer
// When done, we should always call PyGILState_Release(gstate);
PyGILState_STATE gstate;
gstate = PyGILState_Ensure();

// If the Python module is already loaded, skip this.
if (globalPythonModule == NULL) {
Expand All @@ -993,8 +998,7 @@ get_python_function(string module, string class, int instance_id, string func) {

// Set the Python search path to be the current working directory
char cwd[PATH_MAX];
if ( getcwd(cwd, sizeof(cwd)) == NULL)
{
if ( getcwd(cwd, sizeof(cwd)) == NULL) {
error_print_and_exit("Failed to get the current working directory.");
}

Expand All @@ -1004,7 +1008,7 @@ get_python_function(string module, string class, int instance_id, string func) {

Py_SetPath(wcwd);

DEBUG_PRINT("Loading module %s in %s.", module, cwd);
DEBUG_PRINT("Loading module %s in %s.", module, cwd);

pModule = PyImport_Import(pFileName);

Expand All @@ -1020,6 +1024,8 @@ get_python_function(string module, string class, int instance_id, string func) {
if (pDict == NULL) {
PyErr_Print();
error_print("Failed to load contents of module %s.", module);
/* Release the thread. No Python API allowed beyond this point. */
PyGILState_Release(gstate);
return 1;
}

Expand All @@ -1042,6 +1048,8 @@ get_python_function(string module, string class, int instance_id, string func) {
if (pClasses == NULL){
PyErr_Print();
error_print("Failed to load class list \"%s\" in module %s.", class, module);
/* Release the thread. No Python API allowed beyond this point. */
PyGILState_Release(gstate);
return 1;
}

Expand All @@ -1051,6 +1059,8 @@ get_python_function(string module, string class, int instance_id, string func) {
if (pClass == NULL) {
PyErr_Print();
error_print("Failed to load class \"%s[%d]\" in module %s.", class, instance_id, module);
/* Release the thread. No Python API allowed beyond this point. */
PyGILState_Release(gstate);
return 1;
}

Expand All @@ -1066,6 +1076,8 @@ get_python_function(string module, string class, int instance_id, string func) {
if (pFunc && PyCallable_Check(pFunc)) {
DEBUG_PRINT("Calling function %s from class %s[%d].", func , class, instance_id);
Py_INCREF(pFunc);
/* Release the thread. No Python API allowed beyond this point. */
PyGILState_Release(gstate);
return pFunc;
}
else {
Expand All @@ -1086,11 +1098,8 @@ get_python_function(string module, string class, int instance_id, string func) {

DEBUG_PRINT("Done with start().");

if (is_initialized == 0) {
/* We are the first to initilize the Pyton interpreter. Destroy it when done. */
Py_FinalizeEx();
}

Py_INCREF(Py_None);
/* Release the thread. No Python API allowed beyond this point. */
PyGILState_Release(gstate);
return Py_None;
}
2 changes: 1 addition & 1 deletion org.lflang/src/org/lflang/TargetProperty.java
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ public enum TargetProperty {
* Directive to specify the number of threads.
*/
THREADS("threads", PrimitiveType.NON_NEGATIVE_INTEGER,
Arrays.asList(Target.C, Target.CPP, Target.CCPP),
Arrays.asList(Target.C, Target.CPP, Target.CCPP, Target.Python),
(config, value) -> {
config.threads = ASTUtils.toInteger(value);
}),
Expand Down
109 changes: 30 additions & 79 deletions org.lflang/src/org/lflang/generator/PythonGenerator.xtend
Original file line number Diff line number Diff line change
Expand Up @@ -814,25 +814,6 @@ class PythonGenerator extends CGenerator {
}
}

/**
* Generate code that ensures only one thread can execute at a time as per Python specifications
* @param state 0=beginning, 1=end
*/
def pyThreadMutexLockCode(int state, Reactor reactor) {
if(targetConfig.threads > 0)
{
switch(state){
case 0: return '''lf_mutex_lock(&py_«reactor.name»_reaction_mutex);'''
case 1: return '''lf_mutex_unlock(&py_«reactor.name»_reaction_mutex);'''
default: return ''''''
}
}
else
{
return ''''''
}
}

/**
* Generate top-level preambles and #include of pqueue.c and either reactor.c or reactor_threaded.c
* depending on whether threads are specified in target directive.
Expand Down Expand Up @@ -876,33 +857,6 @@ class PythonGenerator extends CGenerator {

super.parseTargetParameters()

// If the program is threaded, create a mutex for each reactor
// that guards the execution of its reactions.
// This is necessary because Python is not thread-safe
// and running multiple instances of the same function can cause
// a segmentation fault.
if (targetConfig.threads > 0) {
for (r : this.reactors ?: emptyList) {
pr('''
lf_mutex_t py_«r.toDefinition.name»_reaction_mutex;
''')
pr(super.initializeTriggerObjects, '''
// Initialize reaction mutex for «r.toDefinition.name»
lf_mutex_init(&py_«r.toDefinition.name»_reaction_mutex);
''')
}
// Add mutex for the main reactor
if (this.mainDef !== null) {
pr('''
lf_mutex_t py_«this.mainDef.name»_reaction_mutex;
''')
pr(super.initializeTriggerObjects, '''
// Initialize reaction mutex for «this.mainDef.name»
lf_mutex_init(&py_«this.mainDef.name»_reaction_mutex);
''')
}
}

// FIXME: Probably not the best place to do
// this.
if (!targetConfig.protoFiles.isNullOrEmpty) {
Expand Down Expand Up @@ -1324,32 +1278,33 @@ class PythonGenerator extends CGenerator {


prSourceLineNumber(reaction.code)
// Unfortunately, threads cannot run concurrently in Python.
// Therefore, we need to make sure reactions cannot execute concurrently by
// holding the mutex lock.
if(targetConfig.threads > 0) {
pr(pyThreadMutexLockCode(0, reactor))
}

pr('''
// Acquire the GIL (Global Interpreter Lock) to be able to call Python APIs.
PyGILState_STATE gstate;
gstate = PyGILState_Ensure();
DEBUG_PRINT("Calling reaction function «decl.name».«pythonFunctionName»");
PyObject *rValue = PyObject_CallObject(self->__py_reaction_function_«reactionIndex», Py_BuildValue("(«pyObjectDescriptor»)" «pyObjects»));
''')
pr('''
PyObject *rValue = PyObject_CallObject(
self->__py_reaction_function_«reactionIndex»,
Py_BuildValue("(«pyObjectDescriptor»)" «pyObjects»)
);
if (rValue == NULL) {
error_print("FATAL: Calling reaction «decl.name».«pythonFunctionName» failed.");
if (PyErr_Occurred()) {
PyErr_PrintEx(0);
PyErr_Clear(); // this will reset the error indicator so we can run Python code again
}
/* Release the thread. No Python API allowed beyond this point. */
PyGILState_Release(gstate);
Py_FinalizeEx();
exit(1);
}
/* Release the thread. No Python API allowed beyond this point. */
PyGILState_Release(gstate);
''')

if(targetConfig.threads > 0) {
pr(pyThreadMutexLockCode(1, reactor))
}

unindent()
pr("}")

Expand All @@ -1361,20 +1316,18 @@ class PythonGenerator extends CGenerator {
pr('void ' + deadlineFunctionName + '(void* instance_args) {')
indent();


super.generateInitializationForReaction("", reaction, decl, reactionIndex)
// Unfortunately, threads cannot run concurrently in Python.
// Therefore, we need to make sure reactions cannot execute concurrently by
// holding the mutex lock.
if (targetConfig.threads > 0) {
pr(pyThreadMutexLockCode(0, reactor))
}


pr('''
// Acquire the GIL (Global Interpreter Lock) to be able to call Python APIs.
PyGILState_STATE gstate;
gstate = PyGILState_Ensure();
DEBUG_PRINT("Calling deadline function «decl.name».«deadlineFunctionName»");
PyObject *rValue = PyObject_CallObject(self->__py_deadline_function_«reactionIndex», Py_BuildValue("(«pyObjectDescriptor»)" «pyObjects»));
''')
pr('''
PyObject *rValue = PyObject_CallObject(
self->__py_deadline_function_«reactionIndex»,
Py_BuildValue("(«pyObjectDescriptor»)" «pyObjects»)
);
if (rValue == NULL) {
error_print("FATAL: Calling reaction «decl.name».«deadlineFunctionName» failed.\n");
if (rValue == NULL) {
Expand All @@ -1383,18 +1336,16 @@ class PythonGenerator extends CGenerator {
PyErr_Clear(); // this will reset the error indicator so we can run Python code again
}
}
/* Release the thread. No Python API allowed beyond this point. */
PyGILState_Release(gstate);
Py_FinalizeEx();
exit(1);
}
/* Release the thread. No Python API allowed beyond this point. */
PyGILState_Release(gstate);
''')

if (targetConfig.threads > 0) {
pr(pyThreadMutexLockCode(1, reactor))
}
//pr(reactionInitialization.toString)
// Code verbatim from 'deadline'
//prSourceLineNumber(reaction.deadline.code)
//pr(reaction.deadline.code.toText)
// TODO: Handle deadlines

unindent()
pr("}")
}
Expand Down
14 changes: 9 additions & 5 deletions test/Python/src/TimeLimit.lf
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,23 @@ reactor Destination {
reaction(x) {=
# print(x.value)
if x.value != self.s:
sys.stderr.write("Error: Expected {:d} and got {:d}.\n".format(self.s, x.value))
sys.stderr.write("ERROR: Expected {:d} and got {:d}.\n".format(self.s, x.value))
exit(1)
self.s += 1
=}
reaction(shutdown) {=
print("**** shutdown reaction invoked.")
if self.s != 10000002:
sys.stderr.write("ERROR: Expected 10000002 but got {:d}.\n".format(self.s))
exit(1)
print(f"Approx. time per reaction: {get_elapsed_physical_time()/(self.s+1):.1f}ns")
=}
}
main reactor TimeLimit(period(1 usec)) {
timer stop(1 secs);
timer stop(10 secs);
reaction(stop) {=
request_stop()
=}
reaction(shutdown) {=
print("**** shutdown reaction invoked.")
=}
c = new Clock(period = period);
d = new Destination();
c.y -> d.x;
Expand Down
12 changes: 6 additions & 6 deletions test/Python/src/concurrent/AsyncCallback.lf
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
// Test asynchronous callbacks that trigger a physical action.
// This test case assumes that target is multithreaded.
// This test will not work with the unthreaded C target because that target
// does not implement any mutex protecting the event queue.
/**
* Test asynchronous callbacks that trigger a physical action.
* This test periodically creates a concurrent Python thread that
* schedule a physical action twice.
*/
target Python {
timeout: 2 sec,
keepalive: true // Not really needed here because there is a timer.
timeout: 2 sec
};

main reactor AsyncCallback {
Expand Down
Loading

0 comments on commit 9087aa8

Please sign in to comment.