Skip to content

Commit

Permalink
Merge pull request #425 from lf-lang/clock-sync-initial
Browse files Browse the repository at this point in the history
Fixed preprocessor directives for clock sync
  • Loading branch information
edwardalee authored May 17, 2024
2 parents 3c1fffa + ac3faad commit 0a42722
Show file tree
Hide file tree
Showing 8 changed files with 89 additions and 36 deletions.
8 changes: 4 additions & 4 deletions core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,9 @@ target_compile_definitions(reactor-c PRIVATE INITIAL_EVENT_QUEUE_SIZE=${INITIAL_
target_compile_definitions(reactor-c PRIVATE INITIAL_REACT_QUEUE_SIZE=${INITIAL_REACT_QUEUE_SIZE})
target_compile_definitions(reactor-c PUBLIC PLATFORM_${CMAKE_SYSTEM_NAME})

# Macro for translating a command-line argument into compile definition for
# reactor-c lib
# If variable X is defined in cMake (set using SET()) or passed in as a command-line
# argument using -DX=<value>, then make it a compiler flag for reactor-c so that X
# is also defined in the C code for reactor-c.
macro(define X)
if(DEFINED ${X})
message(STATUS ${X}=${${X}})
Expand All @@ -155,8 +156,7 @@ message(STATUS "Applying preprocessor definitions...")
define(_LF_CLOCK_SYNC_ATTENUATION)
define(_LF_CLOCK_SYNC_COLLECT_STATS)
define(_LF_CLOCK_SYNC_EXCHANGES_PER_INTERVAL)
define(_LF_CLOCK_SYNC_INITIAL)
define(_LF_CLOCK_SYNC_ON)
define(LF_CLOCK_SYNC) # 1 for OFF, 2 for INIT and 3 for ON.
define(_LF_CLOCK_SYNC_PERIOD_NS)
define(_LF_FEDERATE_NAMES_COMMA_SEPARATED)
define(ADVANCE_MESSAGE_INTERVAL)
Expand Down
7 changes: 4 additions & 3 deletions core/clock.c
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@
#include "clock.h"
#include "low_level_platform.h"

#if defined(_LF_CLOCK_SYNC_ON) || defined(_LF_CLOCK_SYNC_INITIAL)
// If we are federated, include clock-sync API (and implementation)
#if defined(FEDERATED)
#include "clock-sync.h"
#else
// Provide empty implementations of these functions.
// In the unfederated case, just provide empty implementations.
void clock_sync_add_offset(instant_t* t) { (void)t; }
void clock_sync_subtract_offset(instant_t* t) { (void)t; }
#endif // defined(_LF_CLOCK_SYNC_ON) || defined(_LF_CLOCK_SYNC_INITIAL)
#endif // defined(FEDERATED)

static instant_t last_read_physical_time = NEVER;

Expand Down
7 changes: 6 additions & 1 deletion core/federated/RTI/rti_remote.c
Original file line number Diff line number Diff line change
Expand Up @@ -1738,9 +1738,14 @@ void initialize_RTI(rti_remote_t* rti) {
rti_remote->stop_in_progress = false;
}

// The RTI includes clock.c, which requires the following functions that are defined
// in clock-sync.c. But clock-sync.c is not included in the standalone RTI.
// Provide empty implementations of these functions.
void clock_sync_add_offset(instant_t* t) { (void)t; }
void clock_sync_subtract_offset(instant_t* t) { (void)t; }

void free_scheduling_nodes(scheduling_node_t** scheduling_nodes, uint16_t number_of_scheduling_nodes) {
for (uint16_t i = 0; i < number_of_scheduling_nodes; i++) {
// FIXME: Gives error freeing memory not allocated!!!!
scheduling_node_t* node = scheduling_nodes[i];
if (node->upstream != NULL)
free(node->upstream);
Expand Down
36 changes: 19 additions & 17 deletions core/federated/clock-sync.c
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,8 @@ void reset_socket_stat(struct socket_stat_t* socket_stat) {
* will be sent.
*/
uint16_t setup_clock_synchronization_with_rti() {
uint16_t port_to_return = UINT16_MAX;
#ifdef _LF_CLOCK_SYNC_ON
uint16_t port_to_return = UINT16_MAX; // Default if clock sync is off.
#if (LF_CLOCK_SYNC >= LF_CLOCK_SYNC_ON)
// Initialize the UDP socket
_lf_rti_socket_UDP = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
// Initialize the necessary information for the UDP address
Expand Down Expand Up @@ -201,11 +201,9 @@ uint16_t setup_clock_synchronization_with_rti() {
if (setsockopt(_lf_rti_socket_UDP, SOL_SOCKET, SO_SNDTIMEO, (const char*)&timeout_time, sizeof(timeout_time)) < 0) {
lf_print_error("Failed to set SO_SNDTIMEO option on the socket: %s.", strerror(errno));
}
#else // No runtime clock synchronization. Send port -1 or 0 instead.
#ifdef _LF_CLOCK_SYNC_INITIAL
#elif (LF_CLOCK_SYNC == LF_CLOCK_SYNC_INIT)
port_to_return = 0u;
#endif // _LF_CLOCK_SYNC_INITIAL
#endif // _LF_CLOCK_SYNC_ON
#endif // (LF_CLOCK_SYNC >= LF_CLOCK_SYNC_ON)
return port_to_return;
}

Expand Down Expand Up @@ -530,17 +528,21 @@ void* listen_to_rti_UDP_thread(void* args) {

// If clock synchronization is enabled, provide implementations. If not
// just empty implementations that should be optimized away.
#if defined(_LF_CLOCK_SYNC_ON) || defined(_LF_CLOCK_SYNC_INITIAL)
void clock_sync_add_offset(instant_t* t) { *t += (_lf_clock_sync_offset + _lf_clock_sync_constant_bias); }
#if (LF_CLOCK_SYNC >= LF_CLOCK_SYNC_INIT)
void clock_sync_add_offset(instant_t* t) {
*t = lf_time_add(*t, (_lf_clock_sync_offset + _lf_clock_sync_constant_bias));
}

void clock_sync_subtract_offset(instant_t* t) { *t -= (_lf_clock_sync_offset + _lf_clock_sync_constant_bias); }
void clock_sync_subtract_offset(instant_t* t) {
*t = lf_time_add(*t, -(_lf_clock_sync_offset + _lf_clock_sync_constant_bias));
}

void clock_sync_set_constant_bias(interval_t offset) { _lf_clock_sync_constant_bias = offset; }
#else
// Empty implementations of clock_sync_add_offset and clock_sync_subtract_offset
// are in clock.c.
#else // i.e. (LF_CLOCK_SYNC < LF_CLOCK_SYNC_INIT)
void clock_sync_add_offset(instant_t* t) { (void)t; }
void clock_sync_subtract_offset(instant_t* t) { (void)t; }
void clock_sync_set_constant_bias(interval_t offset) { (void)offset; }
#endif // (defined(_LF_CLOCK_SYNC_ON) || defined(_LF_CLOCK_SYNC_INITIAL)
#endif // (LF_CLOCK_SYNC >= LF_CLOCK_SYNC_INIT)

/**
* Create the thread responsible for handling clock synchronization
Expand All @@ -551,12 +553,12 @@ void clock_sync_set_constant_bias(interval_t offset) { (void)offset; }
* \ingroup agroup
*/
int create_clock_sync_thread(lf_thread_t* thread_id) {
#ifdef _LF_CLOCK_SYNC_ON
#if (LF_CLOCK_SYNC >= LF_CLOCK_SYNC_ON)
// One for UDP messages if clock synchronization is enabled for this federate
return lf_thread_create(thread_id, listen_to_rti_UDP_thread, NULL);
#else
(void)thread_id;
#endif // _LF_CLOCK_SYNC_ON
#else // i.e. (LF_CLOCK_SYNC < LF_CLOCK_SYNC_ON)
(void)thread_id; // Suppress unused parameter warning.
#endif // (LF_CLOCK_SYNC >= LF_CLOCK_SYNC_ON)
return 0;
}

Expand Down
3 changes: 2 additions & 1 deletion core/reactor_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -1187,6 +1187,8 @@ void termination(void) {
#endif
lf_free_all_reactors();

lf_tracing_global_shutdown();

// Free up memory associated with environment.
// Do this last so that printed warnings don't access freed memory.
for (int i = 0; i < num_envs; i++) {
Expand All @@ -1196,7 +1198,6 @@ void termination(void) {
free_local_rti();
#endif
}
lf_tracing_global_shutdown();
}

index_t lf_combine_deadline_and_level(interval_t deadline, int level) {
Expand Down
46 changes: 36 additions & 10 deletions core/tag.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,20 +39,46 @@ tag_t lf_tag(void* env) {
return ((environment_t*)env)->current_tag;
}

instant_t lf_time_add(instant_t a, interval_t b) {
if (a == NEVER || b == NEVER) {
return NEVER;
}
if (a == FOREVER || b == FOREVER) {
return FOREVER;
}
instant_t res = a + b;
// Check for overflow
if (res < a && b > 0) {
return FOREVER;
}
// Check for underflow
if (res > a && b < 0) {
return NEVER;
}
return res;
}

tag_t lf_tag_add(tag_t a, tag_t b) {
if (a.time == NEVER || b.time == NEVER)
return NEVER_TAG;
if (a.time == FOREVER || b.time == FOREVER)
instant_t res = lf_time_add(a.time, b.time);
if (res == FOREVER) {
return FOREVER_TAG;
if (b.time > 0)
}
if (res == NEVER) {
return NEVER_TAG;
}

if (b.time > 0) {
// NOTE: The reason for handling this case is to "reset" the microstep counter at each after delay.
a.microstep = 0; // Ignore microstep of first arg if time of second is > 0.
tag_t result = {.time = a.time + b.time, .microstep = a.microstep + b.microstep};
if (result.microstep < a.microstep)
return FOREVER_TAG;
if (result.time < a.time && b.time > 0)
}
tag_t result = {.time = res, .microstep = a.microstep + b.microstep};

// If microsteps overflows
// FIXME: What should be the resulting tag in case of microstep overflow.
// see https://github.com/lf-lang/reactor-c/issues/430
if (result.microstep < a.microstep) {
return FOREVER_TAG;
if (result.time > a.time && b.time < 0)
return NEVER_TAG;
}
return result;
}

Expand Down
9 changes: 9 additions & 0 deletions include/core/federated/clock-sync.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,15 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

#include "low_level_platform.h"

// Clock synchronization defaults to performing clock synchronization only at initialization.
#define LF_CLOCK_SYNC_OFF 1
#define LF_CLOCK_SYNC_INIT 2
#define LF_CLOCK_SYNC_ON 3

#ifndef LF_CLOCK_SYNC
#define LF_CLOCK_SYNC LF_CLOCK_SYNC_INIT
#endif

/**
* Number of required clock sync T4 messages per synchronization
* interval. The offset to the clock will not be adjusted until
Expand Down
9 changes: 9 additions & 0 deletions tag/api/tag.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,15 @@ tag_t lf_tag(void* env);
*/
tag_t lf_tag_add(tag_t a, tag_t b);

/**
* @brief Return the sum of an interval and an instant, saturating on overflow and underflow.
*
* @param a
* @param b
* @return instant_t
*/
instant_t lf_time_add(instant_t a, interval_t b);

/**
* Compare two tags. Return -1 if the first is less than
* the second, 0 if they are equal, and +1 if the first is
Expand Down

0 comments on commit 0a42722

Please sign in to comment.