diff --git a/core/reactor.c b/core/reactor.c
index 9326b3895..62824e480 100644
--- a/core/reactor.c
+++ b/core/reactor.c
@@ -367,6 +367,7 @@ int lf_reactor_c_main(int argc, const char* argv[]) {
_lf_execution_started = true;
_lf_trigger_startup_reactions();
_lf_initialize_timers();
+
// If the stop_tag is (0,0), also insert the shutdown
// reactions. This can only happen if the timeout time
// was set to 0.
diff --git a/core/reactor_common.c b/core/reactor_common.c
index a0636273d..0b39886dc 100644
--- a/core/reactor_common.c
+++ b/core/reactor_common.c
@@ -56,6 +56,14 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include "hashset/hashset.h"
#include "hashset/hashset_itr.h"
+#ifdef LF_THREADED
+#include "watchdog.h"
+
+// Code generated global variables.
+extern int _lf_watchdog_count;
+extern watchdog_t* _lf_watchdogs;
+#endif
+
// Global variable defined in tag.c:
extern tag_t current_tag;
extern instant_t start_time;
@@ -1292,17 +1300,32 @@ trigger_handle_t _lf_schedule_int(lf_action_base_t* action, interval_t extra_del
}
/**
+
* Invoke the given reaction
*
* @param reaction The reaction that has just executed.
* @param worker The thread number of the worker thread or 0 for unthreaded execution (for tracing).
*/
void _lf_invoke_reaction(reaction_t* reaction, int worker) {
+
+#ifdef LF_THREADED
+ if (((self_base_t*) reaction->self)->reactor_mutex != NULL) {
+ lf_mutex_lock((lf_mutex_t*)((self_base_t*)reaction->self)->reactor_mutex);
+ }
+#endif
+
tracepoint_reaction_starts(reaction, worker);
((self_base_t*) reaction->self)->executing_reaction = reaction;
reaction->function(reaction->self);
((self_base_t*) reaction->self)->executing_reaction = NULL;
tracepoint_reaction_ends(reaction, worker);
+
+
+#ifdef LF_THREADED
+ if (((self_base_t*) reaction->self)->reactor_mutex != NULL) {
+ lf_mutex_unlock((lf_mutex_t*)((self_base_t*)reaction->self)->reactor_mutex);
+ }
+#endif
}
/**
@@ -1743,6 +1766,13 @@ void termination(void) {
lf_print_warning("Memory allocated for tokens has not been freed!");
lf_print_warning("Number of unfreed tokens: %d.", _lf_count_token_allocations);
}
+#ifdef LF_THREADED
+ for (int i = 0; i < _lf_watchdog_count; i++) {
+ if (_lf_watchdogs[i].base->reactor_mutex != NULL) {
+ free(_lf_watchdogs[i].base->reactor_mutex);
+ }
+ }
+#endif
_lf_free_all_reactors();
free(_lf_is_present_fields);
free(_lf_is_present_fields_abbreviated);
diff --git a/core/threaded/CMakeLists.txt b/core/threaded/CMakeLists.txt
index 95989b8ad..334c8bfa2 100644
--- a/core/threaded/CMakeLists.txt
+++ b/core/threaded/CMakeLists.txt
@@ -7,6 +7,7 @@ set(
scheduler_NP.c
scheduler_PEDF_NP.c
scheduler_sync_tag_advance.c
+ watchdog.c
)
list(APPEND INFO_SOURCES ${THREADED_SOURCES})
diff --git a/core/threaded/reactor_threaded.c b/core/threaded/reactor_threaded.c
index a9abd848b..bc251f83f 100644
--- a/core/threaded/reactor_threaded.c
+++ b/core/threaded/reactor_threaded.c
@@ -1138,6 +1138,8 @@ int lf_reactor_c_main(int argc, const char* argv[]) {
// it can be probably called in that manner as well).
_lf_initialize_start_tag();
+ _lf_initialize_watchdog_mutexes();
+
start_threads();
lf_mutex_unlock(&mutex);
diff --git a/core/threaded/watchdog.c b/core/threaded/watchdog.c
new file mode 100644
index 000000000..6701a8396
--- /dev/null
+++ b/core/threaded/watchdog.c
@@ -0,0 +1,83 @@
+/**
+ * @file
+ * @author Benjamin Asch
+ * @author Edward A. Lee
+ * @copyright (c) 2023, The University of California at Berkeley.
+ * License: BSD 2-clause
+ * @brief Definitions for watchdogs.
+ */
+
+#include
+#include "watchdog.h"
+
+extern int _lf_watchdog_count;
+extern watchdog_t* _lf_watchdogs;
+
+/**
+ * @brief Initialize watchdog mutexes.
+ * For any reactor with one or more watchdogs, the self struct should have a non-NULL
+ * `reactor_mutex` field which points to an instance of `lf_mutex_t`.
+ * This function initializes those mutexes.
+ */
+void _lf_initialize_watchdog_mutexes() {
+ for (int i = 0; i < _lf_watchdog_count; i++) {
+ self_base_t* current_base = _lf_watchdogs[i].base;
+ if (current_base->reactor_mutex != NULL) {
+ lf_mutex_init((lf_mutex_t*)(current_base->reactor_mutex));
+ }
+ }
+}
+
+/**
+ * @brief Thread function for watchdog.
+ * This function sleeps until physical time exceeds the expiration time of
+ * the watchdog and then invokes the watchdog expiration handler function.
+ * In normal usage, the expiration time is incremented while the thread is
+ * sleeping, so the watchdog never expires and the handler function is never
+ * invoked.
+ * This function acquires the reaction mutex and releases it while sleeping.
+ *
+ * @param arg A pointer to the watchdog struct
+ * @return NULL
+ */
+void* _lf_run_watchdog(void* arg) {
+ watchdog_t* watchdog = (watchdog_t*)arg;
+
+ self_base_t* base = watchdog->base;
+ assert(base->reactor_mutex != NULL);
+ lf_mutex_lock((lf_mutex_t*)(base->reactor_mutex));
+ instant_t physical_time = lf_time_physical();
+ while (physical_time < watchdog->expiration) {
+ interval_t T = watchdog->expiration - physical_time;
+ lf_mutex_unlock((lf_mutex_t*)base->reactor_mutex);
+ lf_sleep(T);
+ lf_mutex_lock((lf_mutex_t*)(base->reactor_mutex));
+ physical_time = lf_time_physical();
+ }
+
+ if (watchdog->expiration != NEVER) {
+ watchdog_function_t watchdog_func = watchdog->watchdog_function;
+ (*watchdog_func)(base);
+ }
+ watchdog->thread_active = false;
+
+ lf_mutex_unlock((lf_mutex_t*)(base->reactor_mutex));
+ return NULL;
+}
+
+void lf_watchdog_start(watchdog_t* watchdog, interval_t additional_timeout) {
+ // Assumes reaction mutex is already held.
+
+ self_base_t* base = watchdog->base;
+
+ watchdog->expiration = lf_time_logical() + watchdog->min_expiration + additional_timeout;
+
+ if (!watchdog->thread_active) {
+ lf_thread_create(&(watchdog->thread_id), _lf_run_watchdog, watchdog);
+ watchdog->thread_active = true;
+ }
+}
+
+void lf_watchdog_stop(watchdog_t* watchdog) {
+ watchdog->expiration = NEVER;
+}
diff --git a/include/core/lf_types.h b/include/core/lf_types.h
index 5a0bde4bf..d71dbaf7a 100644
--- a/include/core/lf_types.h
+++ b/include/core/lf_types.h
@@ -278,6 +278,10 @@ typedef struct allocation_record_t {
typedef struct self_base_t {
struct allocation_record_t *allocations;
struct reaction_t *executing_reaction; // The currently executing reaction of the reactor.
+#ifdef LF_THREADED
+ void* reactor_mutex; // If not null, this is expected to point to an lf_mutex_t.
+ // It is not declared as such to avoid a dependence on platform.h.
+#endif
#ifdef MODAL_REACTORS
reactor_mode_state_t _lf__mode_state; // The current mode (for modal models).
#endif
diff --git a/include/core/platform.h b/include/core/platform.h
index 30fa251ce..2a632d5fc 100644
--- a/include/core/platform.h
+++ b/include/core/platform.h
@@ -36,6 +36,10 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#ifndef PLATFORM_H
#define PLATFORM_H
+#ifdef __cplusplus
+extern "C" {
+#endif
+
#include "lf_types.h"
#if defined(LF_THREADED) && defined(LF_UNTHREADED)
@@ -313,4 +317,8 @@ extern int lf_sleep_until_locked(instant_t wakeup_time);
*/
DEPRECATED(extern int lf_nanosleep(interval_t sleep_duration));
+#ifdef __cplusplus
+}
+#endif
+
#endif // PLATFORM_H
diff --git a/include/core/reactor.h b/include/core/reactor.h
index 936b54a8b..a2d527688 100644
--- a/include/core/reactor.h
+++ b/include/core/reactor.h
@@ -415,6 +415,11 @@ void _lf_initialize_timers(void);
*/
void _lf_trigger_startup_reactions(void);
+/**
+ * Function to initialize mutexes for watchdogs
+ */
+void _lf_initialize_watchdog_mutexes(void);
+
/**
* Function (to be code generated) to terminate execution.
@@ -530,5 +535,28 @@ trigger_handle_t _lf_schedule_copy(lf_action_base_t* action, interval_t offset,
*/
void _lf_fd_send_stop_request_to_rti(void);
+/**
+ * These functions must be implemented by both threaded and unthreaded
+ * runtime. Should be routed to appropriate API calls in platform.h
+*/
+
+/**
+ * @brief Notify other threads of new events on the event queue.
+ *
+ */
+void _lf_notify_of_event();
+
+/**
+ * @brief Enter critical section. Must be paired with a
+ * `_lf_critical_section_exit()`
+ *
+ */
+void _lf_critical_section_enter();
+
+/**
+ * @brief Leave critical section
+ */
+void _lf_critical_section_exit();
+
#endif /* REACTOR_H */
/** @} */
diff --git a/include/core/threaded/watchdog.h b/include/core/threaded/watchdog.h
new file mode 100644
index 000000000..d14c7adad
--- /dev/null
+++ b/include/core/threaded/watchdog.h
@@ -0,0 +1,67 @@
+/**
+ * @file
+ * @author Benjamin Asch
+ * @author Edward A. Lee
+ * @copyright (c) 2023, The University of California at Berkeley.
+ * License: BSD 2-clause
+ * @brief Declarations for watchdogs.
+ */
+
+#ifndef WATCHDOG_H
+#define WATCHDOG_H 1
+
+#include "platform.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/**
+ * Watchdog function type. The argument passed to one of
+ * these watchdog functions is a pointer to the self struct
+ * for the reactor.
+ */
+typedef void(*watchdog_function_t)(void*);
+
+/** Typdef for watchdog_t struct, used to call watchdog handler. */
+typedef struct watchdog_t watchdog_t;
+
+/** Watchdog struct for handler. */
+struct watchdog_t {
+ struct self_base_t* base; // The reactor that contains the watchdog.
+ trigger_t* trigger; // The trigger associated with this watchdog.
+ instant_t expiration; // The expiration instant for the watchdog. (Initialized to NEVER)
+ interval_t min_expiration; // The minimum expiration interval for the watchdog.
+ lf_thread_t thread_id; // The thread that the watchdog is meant to run on.
+ bool thread_active; // Boolean indicating whether or not thread is active.
+ watchdog_function_t watchdog_function; // The function/handler for the watchdog.
+};
+
+/**
+ * @brief Start or restart the watchdog timer.
+ * This function sets the expiration time of the watchdog to the current logical time
+ * plus the minimum timeout of the watchdog plus the specified `additional_timeout`.
+ * If a watchdog timer thread is not already running, then this function will start one.
+ * This function assumes the reactor mutex is held when it is called; this assumption
+ * is satisfied whenever this function is called from within a reaction that declares
+ * the watchdog as an effect.
+ *
+ * @param watchdog The watchdog to be started
+ * @param additional_timeout Additional timeout to be added to the watchdog's
+ * minimum expiration.
+ */
+void lf_watchdog_start(watchdog_t* watchdog, interval_t additional_timeout);
+
+/**
+ * @brief Stop the specified watchdog without invoking the expiration handler.
+ * This function sets the expiration time of the watchdog to `NEVER`.
+ *
+ * @param watchdog The watchdog.
+ */
+void lf_watchdog_stop(watchdog_t* watchdog);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
diff --git a/lingua-franca-ref.txt b/lingua-franca-ref.txt
index 1f7391f92..5d1996fa8 100644
--- a/lingua-franca-ref.txt
+++ b/lingua-franca-ref.txt
@@ -1 +1 @@
-master
+watchdogs-eal2
diff --git a/test/src_gen_stub.c b/test/src_gen_stub.c
index 43a3d10b7..164a38a06 100644
--- a/test/src_gen_stub.c
+++ b/test/src_gen_stub.c
@@ -7,4 +7,5 @@ bool _lf_trigger_shutdown_reactions() { return true; }
void _lf_set_default_command_line_options() {}
void _lf_trigger_startup_reactions() {}
void _lf_initialize_timers() {}
+void _lf_initialize_watchdog_mutexes() {}
void logical_tag_complete(tag_t tag_to_send) {}