diff --git a/core/CMakeLists.txt b/core/CMakeLists.txt index 9d8cdb9f8..ca51fd50c 100644 --- a/core/CMakeLists.txt +++ b/core/CMakeLists.txt @@ -141,8 +141,9 @@ target_compile_definitions(reactor-c PRIVATE INITIAL_EVENT_QUEUE_SIZE=${INITIAL_ target_compile_definitions(reactor-c PRIVATE INITIAL_REACT_QUEUE_SIZE=${INITIAL_REACT_QUEUE_SIZE}) target_compile_definitions(reactor-c PUBLIC PLATFORM_${CMAKE_SYSTEM_NAME}) -# Macro for translating a command-line argument into compile definition for -# reactor-c lib +# If variable X is defined in cMake (set using SET()) or passed in as a command-line +# argument using -DX=, then make it a compiler flag for reactor-c so that X +# is also defined in the C code for reactor-c. macro(define X) if(DEFINED ${X}) message(STATUS ${X}=${${X}}) @@ -155,8 +156,7 @@ message(STATUS "Applying preprocessor definitions...") define(_LF_CLOCK_SYNC_ATTENUATION) define(_LF_CLOCK_SYNC_COLLECT_STATS) define(_LF_CLOCK_SYNC_EXCHANGES_PER_INTERVAL) -define(_LF_CLOCK_SYNC_INITIAL) -define(_LF_CLOCK_SYNC_ON) +define(LF_CLOCK_SYNC) # 1 for OFF, 2 for INIT and 3 for ON. define(_LF_CLOCK_SYNC_PERIOD_NS) define(_LF_FEDERATE_NAMES_COMMA_SEPARATED) define(ADVANCE_MESSAGE_INTERVAL) diff --git a/core/clock.c b/core/clock.c index cdebaefde..8b10297a7 100644 --- a/core/clock.c +++ b/core/clock.c @@ -8,13 +8,14 @@ #include "clock.h" #include "low_level_platform.h" -#if defined(_LF_CLOCK_SYNC_ON) || defined(_LF_CLOCK_SYNC_INITIAL) +// If we are federated, include clock-sync API (and implementation) +#if defined(FEDERATED) #include "clock-sync.h" #else -// Provide empty implementations of these functions. +// In the unfederated case, just provide empty implementations. void clock_sync_add_offset(instant_t* t) { (void)t; } void clock_sync_subtract_offset(instant_t* t) { (void)t; } -#endif // defined(_LF_CLOCK_SYNC_ON) || defined(_LF_CLOCK_SYNC_INITIAL) +#endif // defined(FEDERATED) static instant_t last_read_physical_time = NEVER; diff --git a/core/federated/RTI/rti_remote.c b/core/federated/RTI/rti_remote.c index a88f384c4..cc0252843 100644 --- a/core/federated/RTI/rti_remote.c +++ b/core/federated/RTI/rti_remote.c @@ -1738,9 +1738,14 @@ void initialize_RTI(rti_remote_t* rti) { rti_remote->stop_in_progress = false; } +// The RTI includes clock.c, which requires the following functions that are defined +// in clock-sync.c. But clock-sync.c is not included in the standalone RTI. +// Provide empty implementations of these functions. +void clock_sync_add_offset(instant_t* t) { (void)t; } +void clock_sync_subtract_offset(instant_t* t) { (void)t; } + void free_scheduling_nodes(scheduling_node_t** scheduling_nodes, uint16_t number_of_scheduling_nodes) { for (uint16_t i = 0; i < number_of_scheduling_nodes; i++) { - // FIXME: Gives error freeing memory not allocated!!!! scheduling_node_t* node = scheduling_nodes[i]; if (node->upstream != NULL) free(node->upstream); diff --git a/core/federated/clock-sync.c b/core/federated/clock-sync.c index 2a6ce8e3d..b18efb650 100644 --- a/core/federated/clock-sync.c +++ b/core/federated/clock-sync.c @@ -161,8 +161,8 @@ void reset_socket_stat(struct socket_stat_t* socket_stat) { * will be sent. */ uint16_t setup_clock_synchronization_with_rti() { - uint16_t port_to_return = UINT16_MAX; -#ifdef _LF_CLOCK_SYNC_ON + uint16_t port_to_return = UINT16_MAX; // Default if clock sync is off. +#if (LF_CLOCK_SYNC >= LF_CLOCK_SYNC_ON) // Initialize the UDP socket _lf_rti_socket_UDP = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); // Initialize the necessary information for the UDP address @@ -201,11 +201,9 @@ uint16_t setup_clock_synchronization_with_rti() { if (setsockopt(_lf_rti_socket_UDP, SOL_SOCKET, SO_SNDTIMEO, (const char*)&timeout_time, sizeof(timeout_time)) < 0) { lf_print_error("Failed to set SO_SNDTIMEO option on the socket: %s.", strerror(errno)); } -#else // No runtime clock synchronization. Send port -1 or 0 instead. -#ifdef _LF_CLOCK_SYNC_INITIAL +#elif (LF_CLOCK_SYNC == LF_CLOCK_SYNC_INIT) port_to_return = 0u; -#endif // _LF_CLOCK_SYNC_INITIAL -#endif // _LF_CLOCK_SYNC_ON +#endif // (LF_CLOCK_SYNC >= LF_CLOCK_SYNC_ON) return port_to_return; } @@ -530,17 +528,21 @@ void* listen_to_rti_UDP_thread(void* args) { // If clock synchronization is enabled, provide implementations. If not // just empty implementations that should be optimized away. -#if defined(_LF_CLOCK_SYNC_ON) || defined(_LF_CLOCK_SYNC_INITIAL) -void clock_sync_add_offset(instant_t* t) { *t += (_lf_clock_sync_offset + _lf_clock_sync_constant_bias); } +#if (LF_CLOCK_SYNC >= LF_CLOCK_SYNC_INIT) +void clock_sync_add_offset(instant_t* t) { + *t = lf_time_add(*t, (_lf_clock_sync_offset + _lf_clock_sync_constant_bias)); +} -void clock_sync_subtract_offset(instant_t* t) { *t -= (_lf_clock_sync_offset + _lf_clock_sync_constant_bias); } +void clock_sync_subtract_offset(instant_t* t) { + *t = lf_time_add(*t, -(_lf_clock_sync_offset + _lf_clock_sync_constant_bias)); +} void clock_sync_set_constant_bias(interval_t offset) { _lf_clock_sync_constant_bias = offset; } -#else -// Empty implementations of clock_sync_add_offset and clock_sync_subtract_offset -// are in clock.c. +#else // i.e. (LF_CLOCK_SYNC < LF_CLOCK_SYNC_INIT) +void clock_sync_add_offset(instant_t* t) { (void)t; } +void clock_sync_subtract_offset(instant_t* t) { (void)t; } void clock_sync_set_constant_bias(interval_t offset) { (void)offset; } -#endif // (defined(_LF_CLOCK_SYNC_ON) || defined(_LF_CLOCK_SYNC_INITIAL) +#endif // (LF_CLOCK_SYNC >= LF_CLOCK_SYNC_INIT) /** * Create the thread responsible for handling clock synchronization @@ -551,12 +553,12 @@ void clock_sync_set_constant_bias(interval_t offset) { (void)offset; } * \ingroup agroup */ int create_clock_sync_thread(lf_thread_t* thread_id) { -#ifdef _LF_CLOCK_SYNC_ON +#if (LF_CLOCK_SYNC >= LF_CLOCK_SYNC_ON) // One for UDP messages if clock synchronization is enabled for this federate return lf_thread_create(thread_id, listen_to_rti_UDP_thread, NULL); -#else - (void)thread_id; -#endif // _LF_CLOCK_SYNC_ON +#else // i.e. (LF_CLOCK_SYNC < LF_CLOCK_SYNC_ON) + (void)thread_id; // Suppress unused parameter warning. +#endif // (LF_CLOCK_SYNC >= LF_CLOCK_SYNC_ON) return 0; } diff --git a/core/reactor_common.c b/core/reactor_common.c index dd348a52a..3f337ea6a 100644 --- a/core/reactor_common.c +++ b/core/reactor_common.c @@ -1187,6 +1187,8 @@ void termination(void) { #endif lf_free_all_reactors(); + lf_tracing_global_shutdown(); + // Free up memory associated with environment. // Do this last so that printed warnings don't access freed memory. for (int i = 0; i < num_envs; i++) { @@ -1196,7 +1198,6 @@ void termination(void) { free_local_rti(); #endif } - lf_tracing_global_shutdown(); } index_t lf_combine_deadline_and_level(interval_t deadline, int level) { diff --git a/core/tag.c b/core/tag.c index 9bb35933f..c5f8f88c6 100644 --- a/core/tag.c +++ b/core/tag.c @@ -39,20 +39,46 @@ tag_t lf_tag(void* env) { return ((environment_t*)env)->current_tag; } +instant_t lf_time_add(instant_t a, interval_t b) { + if (a == NEVER || b == NEVER) { + return NEVER; + } + if (a == FOREVER || b == FOREVER) { + return FOREVER; + } + instant_t res = a + b; + // Check for overflow + if (res < a && b > 0) { + return FOREVER; + } + // Check for underflow + if (res > a && b < 0) { + return NEVER; + } + return res; +} + tag_t lf_tag_add(tag_t a, tag_t b) { - if (a.time == NEVER || b.time == NEVER) - return NEVER_TAG; - if (a.time == FOREVER || b.time == FOREVER) + instant_t res = lf_time_add(a.time, b.time); + if (res == FOREVER) { return FOREVER_TAG; - if (b.time > 0) + } + if (res == NEVER) { + return NEVER_TAG; + } + + if (b.time > 0) { + // NOTE: The reason for handling this case is to "reset" the microstep counter at each after delay. a.microstep = 0; // Ignore microstep of first arg if time of second is > 0. - tag_t result = {.time = a.time + b.time, .microstep = a.microstep + b.microstep}; - if (result.microstep < a.microstep) - return FOREVER_TAG; - if (result.time < a.time && b.time > 0) + } + tag_t result = {.time = res, .microstep = a.microstep + b.microstep}; + + // If microsteps overflows + // FIXME: What should be the resulting tag in case of microstep overflow. + // see https://github.com/lf-lang/reactor-c/issues/430 + if (result.microstep < a.microstep) { return FOREVER_TAG; - if (result.time > a.time && b.time < 0) - return NEVER_TAG; + } return result; } diff --git a/include/core/federated/clock-sync.h b/include/core/federated/clock-sync.h index 9c9de110d..b003a3150 100644 --- a/include/core/federated/clock-sync.h +++ b/include/core/federated/clock-sync.h @@ -35,6 +35,15 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #include "low_level_platform.h" +// Clock synchronization defaults to performing clock synchronization only at initialization. +#define LF_CLOCK_SYNC_OFF 1 +#define LF_CLOCK_SYNC_INIT 2 +#define LF_CLOCK_SYNC_ON 3 + +#ifndef LF_CLOCK_SYNC +#define LF_CLOCK_SYNC LF_CLOCK_SYNC_INIT +#endif + /** * Number of required clock sync T4 messages per synchronization * interval. The offset to the clock will not be adjusted until diff --git a/tag/api/tag.h b/tag/api/tag.h index c903aaf53..c40e490f8 100644 --- a/tag/api/tag.h +++ b/tag/api/tag.h @@ -104,6 +104,15 @@ tag_t lf_tag(void* env); */ tag_t lf_tag_add(tag_t a, tag_t b); +/** + * @brief Return the sum of an interval and an instant, saturating on overflow and underflow. + * + * @param a + * @param b + * @return instant_t + */ +instant_t lf_time_add(instant_t a, interval_t b); + /** * Compare two tags. Return -1 if the first is less than * the second, 0 if they are equal, and +1 if the first is