Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed preprocessor directives for clock sync #425

Merged
merged 17 commits into from
May 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,9 @@ target_compile_definitions(reactor-c PRIVATE INITIAL_EVENT_QUEUE_SIZE=${INITIAL_
target_compile_definitions(reactor-c PRIVATE INITIAL_REACT_QUEUE_SIZE=${INITIAL_REACT_QUEUE_SIZE})
target_compile_definitions(reactor-c PUBLIC PLATFORM_${CMAKE_SYSTEM_NAME})

# Macro for translating a command-line argument into compile definition for
# reactor-c lib
# If variable X is defined in cMake (set using SET()) or passed in as a command-line
# argument using -DX=<value>, then make it a compiler flag for reactor-c so that X
# is also defined in the C code for reactor-c.
macro(define X)
if(DEFINED ${X})
message(STATUS ${X}=${${X}})
Expand All @@ -155,8 +156,7 @@ message(STATUS "Applying preprocessor definitions...")
define(_LF_CLOCK_SYNC_ATTENUATION)
define(_LF_CLOCK_SYNC_COLLECT_STATS)
define(_LF_CLOCK_SYNC_EXCHANGES_PER_INTERVAL)
define(_LF_CLOCK_SYNC_INITIAL)
define(_LF_CLOCK_SYNC_ON)
define(LF_CLOCK_SYNC) # 1 for OFF, 2 for INIT and 3 for ON.
define(_LF_CLOCK_SYNC_PERIOD_NS)
define(_LF_FEDERATE_NAMES_COMMA_SEPARATED)
define(ADVANCE_MESSAGE_INTERVAL)
Expand Down
7 changes: 4 additions & 3 deletions core/clock.c
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@
#include "clock.h"
#include "low_level_platform.h"

#if defined(_LF_CLOCK_SYNC_ON) || defined(_LF_CLOCK_SYNC_INITIAL)
// If we are federated, include clock-sync API (and implementation)
#if defined(FEDERATED)
#include "clock-sync.h"
#else
// Provide empty implementations of these functions.
// In the unfederated case, just provide empty implementations.
void clock_sync_add_offset(instant_t* t) { (void)t; }
void clock_sync_subtract_offset(instant_t* t) { (void)t; }
#endif // defined(_LF_CLOCK_SYNC_ON) || defined(_LF_CLOCK_SYNC_INITIAL)
#endif // defined(FEDERATED)

static instant_t last_read_physical_time = NEVER;

Expand Down
7 changes: 6 additions & 1 deletion core/federated/RTI/rti_remote.c
Original file line number Diff line number Diff line change
Expand Up @@ -1738,9 +1738,14 @@ void initialize_RTI(rti_remote_t* rti) {
rti_remote->stop_in_progress = false;
}

// The RTI includes clock.c, which requires the following functions that are defined
// in clock-sync.c. But clock-sync.c is not included in the standalone RTI.
// Provide empty implementations of these functions.
void clock_sync_add_offset(instant_t* t) { (void)t; }
void clock_sync_subtract_offset(instant_t* t) { (void)t; }

void free_scheduling_nodes(scheduling_node_t** scheduling_nodes, uint16_t number_of_scheduling_nodes) {
for (uint16_t i = 0; i < number_of_scheduling_nodes; i++) {
// FIXME: Gives error freeing memory not allocated!!!!
scheduling_node_t* node = scheduling_nodes[i];
if (node->upstream != NULL)
free(node->upstream);
Expand Down
36 changes: 19 additions & 17 deletions core/federated/clock-sync.c
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,8 @@ void reset_socket_stat(struct socket_stat_t* socket_stat) {
* will be sent.
*/
uint16_t setup_clock_synchronization_with_rti() {
uint16_t port_to_return = UINT16_MAX;
#ifdef _LF_CLOCK_SYNC_ON
uint16_t port_to_return = UINT16_MAX; // Default if clock sync is off.
#if (LF_CLOCK_SYNC >= LF_CLOCK_SYNC_ON)
// Initialize the UDP socket
_lf_rti_socket_UDP = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
// Initialize the necessary information for the UDP address
Expand Down Expand Up @@ -201,11 +201,9 @@ uint16_t setup_clock_synchronization_with_rti() {
if (setsockopt(_lf_rti_socket_UDP, SOL_SOCKET, SO_SNDTIMEO, (const char*)&timeout_time, sizeof(timeout_time)) < 0) {
lf_print_error("Failed to set SO_SNDTIMEO option on the socket: %s.", strerror(errno));
}
#else // No runtime clock synchronization. Send port -1 or 0 instead.
#ifdef _LF_CLOCK_SYNC_INITIAL
#elif (LF_CLOCK_SYNC == LF_CLOCK_SYNC_INIT)
port_to_return = 0u;
#endif // _LF_CLOCK_SYNC_INITIAL
#endif // _LF_CLOCK_SYNC_ON
#endif // (LF_CLOCK_SYNC >= LF_CLOCK_SYNC_ON)
return port_to_return;
}

Expand Down Expand Up @@ -530,17 +528,21 @@ void* listen_to_rti_UDP_thread(void* args) {

// If clock synchronization is enabled, provide implementations. If not
// just empty implementations that should be optimized away.
#if defined(_LF_CLOCK_SYNC_ON) || defined(_LF_CLOCK_SYNC_INITIAL)
void clock_sync_add_offset(instant_t* t) { *t += (_lf_clock_sync_offset + _lf_clock_sync_constant_bias); }
#if (LF_CLOCK_SYNC >= LF_CLOCK_SYNC_INIT)
void clock_sync_add_offset(instant_t* t) {
*t = lf_time_add(*t, (_lf_clock_sync_offset + _lf_clock_sync_constant_bias));
}

void clock_sync_subtract_offset(instant_t* t) { *t -= (_lf_clock_sync_offset + _lf_clock_sync_constant_bias); }
void clock_sync_subtract_offset(instant_t* t) {
*t = lf_time_add(*t, -(_lf_clock_sync_offset + _lf_clock_sync_constant_bias));
}

void clock_sync_set_constant_bias(interval_t offset) { _lf_clock_sync_constant_bias = offset; }
#else
// Empty implementations of clock_sync_add_offset and clock_sync_subtract_offset
// are in clock.c.
#else // i.e. (LF_CLOCK_SYNC < LF_CLOCK_SYNC_INIT)
void clock_sync_add_offset(instant_t* t) { (void)t; }
void clock_sync_subtract_offset(instant_t* t) { (void)t; }
void clock_sync_set_constant_bias(interval_t offset) { (void)offset; }
#endif // (defined(_LF_CLOCK_SYNC_ON) || defined(_LF_CLOCK_SYNC_INITIAL)
#endif // (LF_CLOCK_SYNC >= LF_CLOCK_SYNC_INIT)

/**
* Create the thread responsible for handling clock synchronization
Expand All @@ -551,12 +553,12 @@ void clock_sync_set_constant_bias(interval_t offset) { (void)offset; }
* \ingroup agroup
*/
int create_clock_sync_thread(lf_thread_t* thread_id) {
#ifdef _LF_CLOCK_SYNC_ON
#if (LF_CLOCK_SYNC >= LF_CLOCK_SYNC_ON)
// One for UDP messages if clock synchronization is enabled for this federate
return lf_thread_create(thread_id, listen_to_rti_UDP_thread, NULL);
#else
(void)thread_id;
#endif // _LF_CLOCK_SYNC_ON
#else // i.e. (LF_CLOCK_SYNC < LF_CLOCK_SYNC_ON)
(void)thread_id; // Suppress unused parameter warning.
#endif // (LF_CLOCK_SYNC >= LF_CLOCK_SYNC_ON)
return 0;
}

Expand Down
3 changes: 2 additions & 1 deletion core/reactor_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -1187,6 +1187,8 @@ void termination(void) {
#endif
lf_free_all_reactors();

lf_tracing_global_shutdown();

// Free up memory associated with environment.
// Do this last so that printed warnings don't access freed memory.
for (int i = 0; i < num_envs; i++) {
Expand All @@ -1196,7 +1198,6 @@ void termination(void) {
free_local_rti();
#endif
}
lf_tracing_global_shutdown();
}

index_t lf_combine_deadline_and_level(interval_t deadline, int level) {
Expand Down
46 changes: 36 additions & 10 deletions core/tag.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,20 +39,46 @@ tag_t lf_tag(void* env) {
return ((environment_t*)env)->current_tag;
}

instant_t lf_time_add(instant_t a, interval_t b) {
if (a == NEVER || b == NEVER) {
return NEVER;
}
if (a == FOREVER || b == FOREVER) {
return FOREVER;
}
instant_t res = a + b;
// Check for overflow
if (res < a && b > 0) {
return FOREVER;
}
// Check for underflow
if (res > a && b < 0) {
return NEVER;
}
return res;
}

tag_t lf_tag_add(tag_t a, tag_t b) {
if (a.time == NEVER || b.time == NEVER)
return NEVER_TAG;
if (a.time == FOREVER || b.time == FOREVER)
instant_t res = lf_time_add(a.time, b.time);
if (res == FOREVER) {
return FOREVER_TAG;
if (b.time > 0)
}
if (res == NEVER) {
return NEVER_TAG;
}

if (b.time > 0) {
erlingrj marked this conversation as resolved.
Show resolved Hide resolved
// NOTE: The reason for handling this case is to "reset" the microstep counter at each after delay.
a.microstep = 0; // Ignore microstep of first arg if time of second is > 0.
tag_t result = {.time = a.time + b.time, .microstep = a.microstep + b.microstep};
if (result.microstep < a.microstep)
return FOREVER_TAG;
if (result.time < a.time && b.time > 0)
}
tag_t result = {.time = res, .microstep = a.microstep + b.microstep};

// If microsteps overflows
// FIXME: What should be the resulting tag in case of microstep overflow.
// see https://github.com/lf-lang/reactor-c/issues/430
if (result.microstep < a.microstep) {
erlingrj marked this conversation as resolved.
Show resolved Hide resolved
return FOREVER_TAG;
if (result.time > a.time && b.time < 0)
return NEVER_TAG;
}
return result;
}

Expand Down
9 changes: 9 additions & 0 deletions include/core/federated/clock-sync.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,15 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

#include "low_level_platform.h"

// Clock synchronization defaults to performing clock synchronization only at initialization.
#define LF_CLOCK_SYNC_OFF 1
#define LF_CLOCK_SYNC_INIT 2
#define LF_CLOCK_SYNC_ON 3

#ifndef LF_CLOCK_SYNC
#define LF_CLOCK_SYNC LF_CLOCK_SYNC_INIT
#endif

/**
* Number of required clock sync T4 messages per synchronization
* interval. The offset to the clock will not be adjusted until
Expand Down
9 changes: 9 additions & 0 deletions tag/api/tag.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,15 @@ tag_t lf_tag(void* env);
*/
tag_t lf_tag_add(tag_t a, tag_t b);

/**
* @brief Return the sum of an interval and an instant, saturating on overflow and underflow.
*
* @param a
* @param b
* @return instant_t
*/
instant_t lf_time_add(instant_t a, interval_t b);

/**
* Compare two tags. Return -1 if the first is less than
* the second, 0 if they are equal, and +1 if the first is
Expand Down
Loading