From d7dc5c5be5827c1a5ef8f9d9beecea22e5441f99 Mon Sep 17 00:00:00 2001 From: Soroush Bateni Date: Fri, 1 Oct 2021 22:41:15 -0500 Subject: [PATCH 1/6] Fixes #563 --- org.lflang/src/lib/Python/pythontarget.c | 42 ++++++++++--------- org.lflang/src/org/lflang/TargetProperty.java | 2 +- .../lflang/generator/PythonGenerator.xtend | 11 +++++ test/Python/src/concurrent/AsyncCallback.lf | 3 +- 4 files changed, 36 insertions(+), 22 deletions(-) diff --git a/org.lflang/src/lib/Python/pythontarget.c b/org.lflang/src/lib/Python/pythontarget.c index c639a35b62..48344d0725 100644 --- a/org.lflang/src/lib/Python/pythontarget.c +++ b/org.lflang/src/lib/Python/pythontarget.c @@ -254,7 +254,15 @@ static PyObject* py_request_stop(PyObject *self) { static PyObject* py_main(PyObject *self, PyObject *args) { DEBUG_PRINT("Initializing main."); const char *argv[] = {TOSTRING(MODULE_NAME), NULL }; + + // Initialize the Python interpreter + Py_Initialize(); + + DEBUG_PRINT("Initialized the Python interpreter."); + + Py_BEGIN_ALLOW_THREADS lf_reactor_c_main(1, argv); + Py_END_ALLOW_THREADS Py_INCREF(Py_None); return Py_None; @@ -966,14 +974,6 @@ PyObject* convert_C_action_to_py(void* action) { */ PyObject* get_python_function(string module, string class, int instance_id, string func) { - - // Set if the interpreter is already initialized - int is_initialized = 0; - - if (Py_IsInitialized()) { - is_initialized = 1; - } - DEBUG_PRINT("Starting the function start()."); // Necessary PyObject variables to load the react() function from test.py @@ -981,10 +981,10 @@ get_python_function(string module, string class, int instance_id, string func) { PyObject *rValue; - // Initialize the Python interpreter - Py_Initialize(); - DEBUG_PRINT("Initialized the Python interpreter."); + + PyGILState_STATE gstate; + gstate = PyGILState_Ensure(); // If the Python module is already loaded, skip this. if (globalPythonModule == NULL) { @@ -993,8 +993,7 @@ get_python_function(string module, string class, int instance_id, string func) { // Set the Python search path to be the current working directory char cwd[PATH_MAX]; - if ( getcwd(cwd, sizeof(cwd)) == NULL) - { + if ( getcwd(cwd, sizeof(cwd)) == NULL) { error_print_and_exit("Failed to get the current working directory."); } @@ -1004,7 +1003,7 @@ get_python_function(string module, string class, int instance_id, string func) { Py_SetPath(wcwd); - DEBUG_PRINT("Loading module %s in %s.", module, cwd); + DEBUG_PRINT("Loading module %s in %s.", module, cwd); pModule = PyImport_Import(pFileName); @@ -1020,6 +1019,8 @@ get_python_function(string module, string class, int instance_id, string func) { if (pDict == NULL) { PyErr_Print(); error_print("Failed to load contents of module %s.", module); + /* Release the thread. No Python API allowed beyond this point. */ + PyGILState_Release(gstate); return 1; } @@ -1042,6 +1043,8 @@ get_python_function(string module, string class, int instance_id, string func) { if (pClasses == NULL){ PyErr_Print(); error_print("Failed to load class list \"%s\" in module %s.", class, module); + /* Release the thread. No Python API allowed beyond this point. */ + PyGILState_Release(gstate); return 1; } @@ -1051,6 +1054,8 @@ get_python_function(string module, string class, int instance_id, string func) { if (pClass == NULL) { PyErr_Print(); error_print("Failed to load class \"%s[%d]\" in module %s.", class, instance_id, module); + /* Release the thread. No Python API allowed beyond this point. */ + PyGILState_Release(gstate); return 1; } @@ -1066,6 +1071,8 @@ get_python_function(string module, string class, int instance_id, string func) { if (pFunc && PyCallable_Check(pFunc)) { DEBUG_PRINT("Calling function %s from class %s[%d].", func , class, instance_id); Py_INCREF(pFunc); + /* Release the thread. No Python API allowed beyond this point. */ + PyGILState_Release(gstate); return pFunc; } else { @@ -1086,11 +1093,8 @@ get_python_function(string module, string class, int instance_id, string func) { DEBUG_PRINT("Done with start()."); - if (is_initialized == 0) { - /* We are the first to initilize the Pyton interpreter. Destroy it when done. */ - Py_FinalizeEx(); - } - Py_INCREF(Py_None); + /* Release the thread. No Python API allowed beyond this point. */ + PyGILState_Release(gstate); return Py_None; } diff --git a/org.lflang/src/org/lflang/TargetProperty.java b/org.lflang/src/org/lflang/TargetProperty.java index 991184ad7c..fcb903e753 100644 --- a/org.lflang/src/org/lflang/TargetProperty.java +++ b/org.lflang/src/org/lflang/TargetProperty.java @@ -282,7 +282,7 @@ public enum TargetProperty { * Directive to specify the number of threads. */ THREADS("threads", PrimitiveType.NON_NEGATIVE_INTEGER, - Arrays.asList(Target.C, Target.CPP, Target.CCPP), + Arrays.asList(Target.C, Target.CPP, Target.CCPP, Target.Python), (config, value) -> { config.threads = ASTUtils.toInteger(value); }), diff --git a/org.lflang/src/org/lflang/generator/PythonGenerator.xtend b/org.lflang/src/org/lflang/generator/PythonGenerator.xtend index 8c01fbc576..c2094027d5 100644 --- a/org.lflang/src/org/lflang/generator/PythonGenerator.xtend +++ b/org.lflang/src/org/lflang/generator/PythonGenerator.xtend @@ -1324,6 +1324,12 @@ class PythonGenerator extends CGenerator { prSourceLineNumber(reaction.code) + + pr(''' + PyGILState_STATE gstate; + gstate = PyGILState_Ensure(); + ''') + // Unfortunately, threads cannot run concurrently in Python. // Therefore, we need to make sure reactions cannot execute concurrently by // holding the mutex lock. @@ -1350,6 +1356,11 @@ class PythonGenerator extends CGenerator { pr(pyThreadMutexLockCode(1, reactor)) } + pr(''' + /* Release the thread. No Python API allowed beyond this point. */ + PyGILState_Release(gstate); + ''') + unindent() pr("}") diff --git a/test/Python/src/concurrent/AsyncCallback.lf b/test/Python/src/concurrent/AsyncCallback.lf index 18b1331202..49e073845c 100644 --- a/test/Python/src/concurrent/AsyncCallback.lf +++ b/test/Python/src/concurrent/AsyncCallback.lf @@ -3,8 +3,7 @@ // This test will not work with the unthreaded C target because that target // does not implement any mutex protecting the event queue. target Python { - timeout: 2 sec, - keepalive: true // Not really needed here because there is a timer. + timeout: 2 sec }; main reactor AsyncCallback { From a1ab24d52cf9956bfc6496561204a8d74cd8e4c2 Mon Sep 17 00:00:00 2001 From: Soroush Bateni Date: Fri, 1 Oct 2021 22:46:21 -0500 Subject: [PATCH 2/6] Comments --- org.lflang/src/lib/Python/pythontarget.c | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/org.lflang/src/lib/Python/pythontarget.c b/org.lflang/src/lib/Python/pythontarget.c index 48344d0725..f091711269 100644 --- a/org.lflang/src/lib/Python/pythontarget.c +++ b/org.lflang/src/lib/Python/pythontarget.c @@ -981,8 +981,13 @@ get_python_function(string module, string class, int instance_id, string func) { PyObject *rValue; - - + // According to + // https://docs.python.org/3/c-api/init.html#non-python-created-threads + // the following code does the following: + // - Register this thread with the interpreter + // - Acquire the GIL (Global Interpreter Lock) + // - Store (return) the thread pointer + // When done, we should always call PyGILState_Release(gstate); PyGILState_STATE gstate; gstate = PyGILState_Ensure(); From 2d558e2329a9d57f30b68a48f35979ee2a4ad102 Mon Sep 17 00:00:00 2001 From: Soroush Bateni Date: Fri, 1 Oct 2021 22:54:57 -0500 Subject: [PATCH 3/6] Added a test and more comments --- test/Python/src/concurrent/AsyncCallback.lf | 9 +-- .../src/concurrent/AsyncCallbackNoTimer.lf | 68 +++++++++++++++++++ 2 files changed, 73 insertions(+), 4 deletions(-) create mode 100644 test/Python/src/concurrent/AsyncCallbackNoTimer.lf diff --git a/test/Python/src/concurrent/AsyncCallback.lf b/test/Python/src/concurrent/AsyncCallback.lf index 49e073845c..89d058b0a4 100644 --- a/test/Python/src/concurrent/AsyncCallback.lf +++ b/test/Python/src/concurrent/AsyncCallback.lf @@ -1,7 +1,8 @@ -// Test asynchronous callbacks that trigger a physical action. -// This test case assumes that target is multithreaded. -// This test will not work with the unthreaded C target because that target -// does not implement any mutex protecting the event queue. +/** + * Test asynchronous callbacks that trigger a physical action. + * This test periodically creates a concurrent Python thread that + * schedule a physical action twice. + */ target Python { timeout: 2 sec }; diff --git a/test/Python/src/concurrent/AsyncCallbackNoTimer.lf b/test/Python/src/concurrent/AsyncCallbackNoTimer.lf new file mode 100644 index 0000000000..b55edc853d --- /dev/null +++ b/test/Python/src/concurrent/AsyncCallbackNoTimer.lf @@ -0,0 +1,68 @@ +/** + * Test asynchronous callbacks that trigger a physical action. + * This test creates a concurrent Python thread that schedule a + * physical action twice. + * + * There are no timers in this test to drive the logical time forward. + * This is important in the Python target since the user Python threads + * should be allowed to execute independently of the underlying C core + * runtime, without the C runtime polling the Python context with a + * reaction to a timer. + */ +target Python { + timeout: 2 sec +}; + +main reactor { + + preamble {= + # Note that preamble code is generated inside the reactor class in Python + import time + import threading + + def callback(self, a): + # Schedule twice. If the action is not physical, these should + # get consolidated into a single action triggering. If it is, + # then they cause two separate triggerings with close but not + # equal time stamps. The minimum time between these is determined + # by the argument in the physical action definition. + a.schedule(0) + a.schedule(0) + + # Simulate time passing before a callback occurs. + def take_time(self, a): + # The best Python can offer short of directly using ctypes to call nanosleep + self.time.sleep(0.1) + self.callback(a) + return None + + =} + + state threads({=list()=}); + state expected_time(100 msec); + state toggle(false); + + physical action a(100 msec); + state i(0); + + reaction(startup) -> a {= + # start new thread, provide callback + x = self.threading.Thread(target=self.take_time, args=(a,)) + self.threads.append(x) + x.start() + =} + + reaction(a) {= + elapsed_time = get_elapsed_logical_time() + print("Asynchronous callback {:d}: Assigned logical time greater than start time by {:d} nsec.".format(self.i, elapsed_time)); + self.i += 1 + if elapsed_time <= self.expected_time: + sys.stderr.write("ERROR: Expected logical time to be larger than {:d}.".format(self.expected_time)); + exit(1) + if self.toggle: + self.toggle = False + self.expected_time += 200000000 + else: + self.toggle = True + =} +} \ No newline at end of file From 16cf65edefe61a7c7fdbd322bfcd75cabe7a9d17 Mon Sep 17 00:00:00 2001 From: Soroush Bateni Date: Fri, 1 Oct 2021 23:39:52 -0500 Subject: [PATCH 4/6] Added GIL lock/unlock for the deadline handler as well --- .../org/lflang/generator/PythonGenerator.xtend | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/org.lflang/src/org/lflang/generator/PythonGenerator.xtend b/org.lflang/src/org/lflang/generator/PythonGenerator.xtend index c2094027d5..5df7f0e311 100644 --- a/org.lflang/src/org/lflang/generator/PythonGenerator.xtend +++ b/org.lflang/src/org/lflang/generator/PythonGenerator.xtend @@ -1371,6 +1371,11 @@ class PythonGenerator extends CGenerator { pr('void ' + deadlineFunctionName + '(void* instance_args) {') indent(); + + pr(''' + PyGILState_STATE gstate; + gstate = PyGILState_Ensure(); + ''') super.generateInitializationForReaction("", reaction, decl, reactionIndex) @@ -1401,11 +1406,12 @@ class PythonGenerator extends CGenerator { if (targetConfig.threads > 0) { pr(pyThreadMutexLockCode(1, reactor)) } - //pr(reactionInitialization.toString) - // Code verbatim from 'deadline' - //prSourceLineNumber(reaction.deadline.code) - //pr(reaction.deadline.code.toText) - // TODO: Handle deadlines + + pr(''' + /* Release the thread. No Python API allowed beyond this point. */ + PyGILState_Release(gstate); + ''') + unindent() pr("}") } From f8258403c2f218b74536c3d27e33f9fe11401e31 Mon Sep 17 00:00:00 2001 From: Soroush Bateni Date: Fri, 1 Oct 2021 23:58:51 -0500 Subject: [PATCH 5/6] A bit of a cleanup of the PythonGenerator logic. Removed the mutex since GIL will now take its place --- .../lflang/generator/PythonGenerator.xtend | 114 ++++-------------- 1 file changed, 24 insertions(+), 90 deletions(-) diff --git a/org.lflang/src/org/lflang/generator/PythonGenerator.xtend b/org.lflang/src/org/lflang/generator/PythonGenerator.xtend index 5df7f0e311..4d2e5fdf9c 100644 --- a/org.lflang/src/org/lflang/generator/PythonGenerator.xtend +++ b/org.lflang/src/org/lflang/generator/PythonGenerator.xtend @@ -814,25 +814,6 @@ class PythonGenerator extends CGenerator { } } - /** - * Generate code that ensures only one thread can execute at a time as per Python specifications - * @param state 0=beginning, 1=end - */ - def pyThreadMutexLockCode(int state, Reactor reactor) { - if(targetConfig.threads > 0) - { - switch(state){ - case 0: return '''lf_mutex_lock(&py_«reactor.name»_reaction_mutex);''' - case 1: return '''lf_mutex_unlock(&py_«reactor.name»_reaction_mutex);''' - default: return '''''' - } - } - else - { - return '''''' - } - } - /** * Generate top-level preambles and #include of pqueue.c and either reactor.c or reactor_threaded.c * depending on whether threads are specified in target directive. @@ -876,33 +857,6 @@ class PythonGenerator extends CGenerator { super.parseTargetParameters() - // If the program is threaded, create a mutex for each reactor - // that guards the execution of its reactions. - // This is necessary because Python is not thread-safe - // and running multiple instances of the same function can cause - // a segmentation fault. - if (targetConfig.threads > 0) { - for (r : this.reactors ?: emptyList) { - pr(''' - lf_mutex_t py_«r.toDefinition.name»_reaction_mutex; - ''') - pr(super.initializeTriggerObjects, ''' - // Initialize reaction mutex for «r.toDefinition.name» - lf_mutex_init(&py_«r.toDefinition.name»_reaction_mutex); - ''') - } - // Add mutex for the main reactor - if (this.mainDef !== null) { - pr(''' - lf_mutex_t py_«this.mainDef.name»_reaction_mutex; - ''') - pr(super.initializeTriggerObjects, ''' - // Initialize reaction mutex for «this.mainDef.name» - lf_mutex_init(&py_«this.mainDef.name»_reaction_mutex); - ''') - } - } - // FIXME: Probably not the best place to do // this. if (!targetConfig.protoFiles.isNullOrEmpty) { @@ -1325,38 +1279,28 @@ class PythonGenerator extends CGenerator { prSourceLineNumber(reaction.code) - pr(''' + pr(''' + // Acquire the GIL (Global Interpreter Lock) to be able to call Python APIs. PyGILState_STATE gstate; gstate = PyGILState_Ensure(); - ''') - - // Unfortunately, threads cannot run concurrently in Python. - // Therefore, we need to make sure reactions cannot execute concurrently by - // holding the mutex lock. - if(targetConfig.threads > 0) { - pr(pyThreadMutexLockCode(0, reactor)) - } - - pr(''' + DEBUG_PRINT("Calling reaction function «decl.name».«pythonFunctionName»"); - PyObject *rValue = PyObject_CallObject(self->__py_reaction_function_«reactionIndex», Py_BuildValue("(«pyObjectDescriptor»)" «pyObjects»)); - ''') - pr(''' + PyObject *rValue = PyObject_CallObject( + self->__py_reaction_function_«reactionIndex», + Py_BuildValue("(«pyObjectDescriptor»)" «pyObjects») + ); if (rValue == NULL) { error_print("FATAL: Calling reaction «decl.name».«pythonFunctionName» failed."); if (PyErr_Occurred()) { PyErr_PrintEx(0); PyErr_Clear(); // this will reset the error indicator so we can run Python code again } + /* Release the thread. No Python API allowed beyond this point. */ + PyGILState_Release(gstate); + Py_FinalizeEx(); exit(1); } - ''') - - if(targetConfig.threads > 0) { - pr(pyThreadMutexLockCode(1, reactor)) - } - - pr(''' + /* Release the thread. No Python API allowed beyond this point. */ PyGILState_Release(gstate); ''') @@ -1371,26 +1315,19 @@ class PythonGenerator extends CGenerator { pr('void ' + deadlineFunctionName + '(void* instance_args) {') indent(); - - pr(''' - PyGILState_STATE gstate; - gstate = PyGILState_Ensure(); - ''') - super.generateInitializationForReaction("", reaction, decl, reactionIndex) - // Unfortunately, threads cannot run concurrently in Python. - // Therefore, we need to make sure reactions cannot execute concurrently by - // holding the mutex lock. - if (targetConfig.threads > 0) { - pr(pyThreadMutexLockCode(0, reactor)) - } - + pr(''' + // Acquire the GIL (Global Interpreter Lock) to be able to call Python APIs. + PyGILState_STATE gstate; + gstate = PyGILState_Ensure(); + DEBUG_PRINT("Calling deadline function «decl.name».«deadlineFunctionName»"); - PyObject *rValue = PyObject_CallObject(self->__py_deadline_function_«reactionIndex», Py_BuildValue("(«pyObjectDescriptor»)" «pyObjects»)); - ''') - pr(''' + PyObject *rValue = PyObject_CallObject( + self->__py_deadline_function_«reactionIndex», + Py_BuildValue("(«pyObjectDescriptor»)" «pyObjects») + ); if (rValue == NULL) { error_print("FATAL: Calling reaction «decl.name».«deadlineFunctionName» failed.\n"); if (rValue == NULL) { @@ -1399,15 +1336,12 @@ class PythonGenerator extends CGenerator { PyErr_Clear(); // this will reset the error indicator so we can run Python code again } } + /* Release the thread. No Python API allowed beyond this point. */ + PyGILState_Release(gstate); + Py_FinalizeEx(); exit(1); } - ''') - - if (targetConfig.threads > 0) { - pr(pyThreadMutexLockCode(1, reactor)) - } - - pr(''' + /* Release the thread. No Python API allowed beyond this point. */ PyGILState_Release(gstate); ''') From 77acbd1894b0f06d6aabfd2ce95b00b230c8b745 Mon Sep 17 00:00:00 2001 From: Soroush Bateni Date: Sat, 2 Oct 2021 00:54:19 -0500 Subject: [PATCH 6/6] Updated TimeLimit to print time per reaction --- test/Python/src/TimeLimit.lf | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/test/Python/src/TimeLimit.lf b/test/Python/src/TimeLimit.lf index 5431fc657e..931efb7326 100644 --- a/test/Python/src/TimeLimit.lf +++ b/test/Python/src/TimeLimit.lf @@ -26,19 +26,23 @@ reactor Destination { reaction(x) {= # print(x.value) if x.value != self.s: - sys.stderr.write("Error: Expected {:d} and got {:d}.\n".format(self.s, x.value)) + sys.stderr.write("ERROR: Expected {:d} and got {:d}.\n".format(self.s, x.value)) exit(1) self.s += 1 =} + reaction(shutdown) {= + print("**** shutdown reaction invoked.") + if self.s != 10000002: + sys.stderr.write("ERROR: Expected 10000002 but got {:d}.\n".format(self.s)) + exit(1) + print(f"Approx. time per reaction: {get_elapsed_physical_time()/(self.s+1):.1f}ns") + =} } main reactor TimeLimit(period(1 usec)) { - timer stop(1 secs); + timer stop(10 secs); reaction(stop) {= request_stop() =} - reaction(shutdown) {= - print("**** shutdown reaction invoked.") - =} c = new Clock(period = period); d = new Destination(); c.y -> d.x;