diff --git a/core/reactor.c b/core/reactor.c index 9326b3895..62824e480 100644 --- a/core/reactor.c +++ b/core/reactor.c @@ -367,6 +367,7 @@ int lf_reactor_c_main(int argc, const char* argv[]) { _lf_execution_started = true; _lf_trigger_startup_reactions(); _lf_initialize_timers(); + // If the stop_tag is (0,0), also insert the shutdown // reactions. This can only happen if the timeout time // was set to 0. diff --git a/core/reactor_common.c b/core/reactor_common.c index a0636273d..0b39886dc 100644 --- a/core/reactor_common.c +++ b/core/reactor_common.c @@ -56,6 +56,14 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #include "hashset/hashset.h" #include "hashset/hashset_itr.h" +#ifdef LF_THREADED +#include "watchdog.h" + +// Code generated global variables. +extern int _lf_watchdog_count; +extern watchdog_t* _lf_watchdogs; +#endif + // Global variable defined in tag.c: extern tag_t current_tag; extern instant_t start_time; @@ -1292,17 +1300,32 @@ trigger_handle_t _lf_schedule_int(lf_action_base_t* action, interval_t extra_del } /** + * Invoke the given reaction * * @param reaction The reaction that has just executed. * @param worker The thread number of the worker thread or 0 for unthreaded execution (for tracing). */ void _lf_invoke_reaction(reaction_t* reaction, int worker) { + +#ifdef LF_THREADED + if (((self_base_t*) reaction->self)->reactor_mutex != NULL) { + lf_mutex_lock((lf_mutex_t*)((self_base_t*)reaction->self)->reactor_mutex); + } +#endif + tracepoint_reaction_starts(reaction, worker); ((self_base_t*) reaction->self)->executing_reaction = reaction; reaction->function(reaction->self); ((self_base_t*) reaction->self)->executing_reaction = NULL; tracepoint_reaction_ends(reaction, worker); + + +#ifdef LF_THREADED + if (((self_base_t*) reaction->self)->reactor_mutex != NULL) { + lf_mutex_unlock((lf_mutex_t*)((self_base_t*)reaction->self)->reactor_mutex); + } +#endif } /** @@ -1743,6 +1766,13 @@ void termination(void) { lf_print_warning("Memory allocated for tokens has not been freed!"); lf_print_warning("Number of unfreed tokens: %d.", _lf_count_token_allocations); } +#ifdef LF_THREADED + for (int i = 0; i < _lf_watchdog_count; i++) { + if (_lf_watchdogs[i].base->reactor_mutex != NULL) { + free(_lf_watchdogs[i].base->reactor_mutex); + } + } +#endif _lf_free_all_reactors(); free(_lf_is_present_fields); free(_lf_is_present_fields_abbreviated); diff --git a/core/threaded/CMakeLists.txt b/core/threaded/CMakeLists.txt index 95989b8ad..334c8bfa2 100644 --- a/core/threaded/CMakeLists.txt +++ b/core/threaded/CMakeLists.txt @@ -7,6 +7,7 @@ set( scheduler_NP.c scheduler_PEDF_NP.c scheduler_sync_tag_advance.c + watchdog.c ) list(APPEND INFO_SOURCES ${THREADED_SOURCES}) diff --git a/core/threaded/reactor_threaded.c b/core/threaded/reactor_threaded.c index a9abd848b..bc251f83f 100644 --- a/core/threaded/reactor_threaded.c +++ b/core/threaded/reactor_threaded.c @@ -1138,6 +1138,8 @@ int lf_reactor_c_main(int argc, const char* argv[]) { // it can be probably called in that manner as well). _lf_initialize_start_tag(); + _lf_initialize_watchdog_mutexes(); + start_threads(); lf_mutex_unlock(&mutex); diff --git a/core/threaded/watchdog.c b/core/threaded/watchdog.c new file mode 100644 index 000000000..6701a8396 --- /dev/null +++ b/core/threaded/watchdog.c @@ -0,0 +1,83 @@ +/** + * @file + * @author Benjamin Asch + * @author Edward A. Lee + * @copyright (c) 2023, The University of California at Berkeley. + * License: BSD 2-clause + * @brief Definitions for watchdogs. + */ + +#include +#include "watchdog.h" + +extern int _lf_watchdog_count; +extern watchdog_t* _lf_watchdogs; + +/** + * @brief Initialize watchdog mutexes. + * For any reactor with one or more watchdogs, the self struct should have a non-NULL + * `reactor_mutex` field which points to an instance of `lf_mutex_t`. + * This function initializes those mutexes. + */ +void _lf_initialize_watchdog_mutexes() { + for (int i = 0; i < _lf_watchdog_count; i++) { + self_base_t* current_base = _lf_watchdogs[i].base; + if (current_base->reactor_mutex != NULL) { + lf_mutex_init((lf_mutex_t*)(current_base->reactor_mutex)); + } + } +} + +/** + * @brief Thread function for watchdog. + * This function sleeps until physical time exceeds the expiration time of + * the watchdog and then invokes the watchdog expiration handler function. + * In normal usage, the expiration time is incremented while the thread is + * sleeping, so the watchdog never expires and the handler function is never + * invoked. + * This function acquires the reaction mutex and releases it while sleeping. + * + * @param arg A pointer to the watchdog struct + * @return NULL + */ +void* _lf_run_watchdog(void* arg) { + watchdog_t* watchdog = (watchdog_t*)arg; + + self_base_t* base = watchdog->base; + assert(base->reactor_mutex != NULL); + lf_mutex_lock((lf_mutex_t*)(base->reactor_mutex)); + instant_t physical_time = lf_time_physical(); + while (physical_time < watchdog->expiration) { + interval_t T = watchdog->expiration - physical_time; + lf_mutex_unlock((lf_mutex_t*)base->reactor_mutex); + lf_sleep(T); + lf_mutex_lock((lf_mutex_t*)(base->reactor_mutex)); + physical_time = lf_time_physical(); + } + + if (watchdog->expiration != NEVER) { + watchdog_function_t watchdog_func = watchdog->watchdog_function; + (*watchdog_func)(base); + } + watchdog->thread_active = false; + + lf_mutex_unlock((lf_mutex_t*)(base->reactor_mutex)); + return NULL; +} + +void lf_watchdog_start(watchdog_t* watchdog, interval_t additional_timeout) { + // Assumes reaction mutex is already held. + + self_base_t* base = watchdog->base; + + watchdog->expiration = lf_time_logical() + watchdog->min_expiration + additional_timeout; + + if (!watchdog->thread_active) { + lf_thread_create(&(watchdog->thread_id), _lf_run_watchdog, watchdog); + watchdog->thread_active = true; + } +} + +void lf_watchdog_stop(watchdog_t* watchdog) { + watchdog->expiration = NEVER; +} diff --git a/include/core/lf_types.h b/include/core/lf_types.h index 5a0bde4bf..d71dbaf7a 100644 --- a/include/core/lf_types.h +++ b/include/core/lf_types.h @@ -278,6 +278,10 @@ typedef struct allocation_record_t { typedef struct self_base_t { struct allocation_record_t *allocations; struct reaction_t *executing_reaction; // The currently executing reaction of the reactor. +#ifdef LF_THREADED + void* reactor_mutex; // If not null, this is expected to point to an lf_mutex_t. + // It is not declared as such to avoid a dependence on platform.h. +#endif #ifdef MODAL_REACTORS reactor_mode_state_t _lf__mode_state; // The current mode (for modal models). #endif diff --git a/include/core/platform.h b/include/core/platform.h index 30fa251ce..2a632d5fc 100644 --- a/include/core/platform.h +++ b/include/core/platform.h @@ -36,6 +36,10 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #ifndef PLATFORM_H #define PLATFORM_H +#ifdef __cplusplus +extern "C" { +#endif + #include "lf_types.h" #if defined(LF_THREADED) && defined(LF_UNTHREADED) @@ -313,4 +317,8 @@ extern int lf_sleep_until_locked(instant_t wakeup_time); */ DEPRECATED(extern int lf_nanosleep(interval_t sleep_duration)); +#ifdef __cplusplus +} +#endif + #endif // PLATFORM_H diff --git a/include/core/reactor.h b/include/core/reactor.h index 936b54a8b..a2d527688 100644 --- a/include/core/reactor.h +++ b/include/core/reactor.h @@ -415,6 +415,11 @@ void _lf_initialize_timers(void); */ void _lf_trigger_startup_reactions(void); +/** + * Function to initialize mutexes for watchdogs + */ +void _lf_initialize_watchdog_mutexes(void); + /** * Function (to be code generated) to terminate execution. @@ -530,5 +535,28 @@ trigger_handle_t _lf_schedule_copy(lf_action_base_t* action, interval_t offset, */ void _lf_fd_send_stop_request_to_rti(void); +/** + * These functions must be implemented by both threaded and unthreaded + * runtime. Should be routed to appropriate API calls in platform.h +*/ + +/** + * @brief Notify other threads of new events on the event queue. + * + */ +void _lf_notify_of_event(); + +/** + * @brief Enter critical section. Must be paired with a + * `_lf_critical_section_exit()` + * + */ +void _lf_critical_section_enter(); + +/** + * @brief Leave critical section + */ +void _lf_critical_section_exit(); + #endif /* REACTOR_H */ /** @} */ diff --git a/include/core/threaded/watchdog.h b/include/core/threaded/watchdog.h new file mode 100644 index 000000000..d14c7adad --- /dev/null +++ b/include/core/threaded/watchdog.h @@ -0,0 +1,67 @@ +/** + * @file + * @author Benjamin Asch + * @author Edward A. Lee + * @copyright (c) 2023, The University of California at Berkeley. + * License: BSD 2-clause + * @brief Declarations for watchdogs. + */ + +#ifndef WATCHDOG_H +#define WATCHDOG_H 1 + +#include "platform.h" + +#ifdef __cplusplus +extern "C" { +#endif + +/** + * Watchdog function type. The argument passed to one of + * these watchdog functions is a pointer to the self struct + * for the reactor. + */ +typedef void(*watchdog_function_t)(void*); + +/** Typdef for watchdog_t struct, used to call watchdog handler. */ +typedef struct watchdog_t watchdog_t; + +/** Watchdog struct for handler. */ +struct watchdog_t { + struct self_base_t* base; // The reactor that contains the watchdog. + trigger_t* trigger; // The trigger associated with this watchdog. + instant_t expiration; // The expiration instant for the watchdog. (Initialized to NEVER) + interval_t min_expiration; // The minimum expiration interval for the watchdog. + lf_thread_t thread_id; // The thread that the watchdog is meant to run on. + bool thread_active; // Boolean indicating whether or not thread is active. + watchdog_function_t watchdog_function; // The function/handler for the watchdog. +}; + +/** + * @brief Start or restart the watchdog timer. + * This function sets the expiration time of the watchdog to the current logical time + * plus the minimum timeout of the watchdog plus the specified `additional_timeout`. + * If a watchdog timer thread is not already running, then this function will start one. + * This function assumes the reactor mutex is held when it is called; this assumption + * is satisfied whenever this function is called from within a reaction that declares + * the watchdog as an effect. + * + * @param watchdog The watchdog to be started + * @param additional_timeout Additional timeout to be added to the watchdog's + * minimum expiration. + */ +void lf_watchdog_start(watchdog_t* watchdog, interval_t additional_timeout); + +/** + * @brief Stop the specified watchdog without invoking the expiration handler. + * This function sets the expiration time of the watchdog to `NEVER`. + * + * @param watchdog The watchdog. + */ +void lf_watchdog_stop(watchdog_t* watchdog); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/lingua-franca-ref.txt b/lingua-franca-ref.txt index 1f7391f92..5d1996fa8 100644 --- a/lingua-franca-ref.txt +++ b/lingua-franca-ref.txt @@ -1 +1 @@ -master +watchdogs-eal2 diff --git a/test/src_gen_stub.c b/test/src_gen_stub.c index 43a3d10b7..164a38a06 100644 --- a/test/src_gen_stub.c +++ b/test/src_gen_stub.c @@ -7,4 +7,5 @@ bool _lf_trigger_shutdown_reactions() { return true; } void _lf_set_default_command_line_options() {} void _lf_trigger_startup_reactions() {} void _lf_initialize_timers() {} +void _lf_initialize_watchdog_mutexes() {} void logical_tag_complete(tag_t tag_to_send) {}