From 7e84962f7b7ad92a0758cbbc8a153195dccc573b Mon Sep 17 00:00:00 2001 From: "Edward A. Lee" Date: Sat, 2 Dec 2023 11:55:56 -0800 Subject: [PATCH 1/2] Handle messages arriving during initial STA wait --- core/threaded/reactor_threaded.c | 59 +++++++++++++++++++++++--------- 1 file changed, 42 insertions(+), 17 deletions(-) diff --git a/core/threaded/reactor_threaded.c b/core/threaded/reactor_threaded.c index 6ebf6c8c6..7769652a1 100644 --- a/core/threaded/reactor_threaded.c +++ b/core/threaded/reactor_threaded.c @@ -691,7 +691,7 @@ void _lf_initialize_start_tag(environment_t *env) { // Add reactions invoked at tag (0,0) (including startup reactions) to the reaction queue _lf_trigger_startup_reactions(env); -#ifdef FEDERATED +#if defined FEDERATED // If env is the environment for the top-level enclave, then initialize the federate. environment_t *top_level_env; _lf_get_environments(&top_level_env); @@ -703,24 +703,21 @@ void _lf_initialize_start_tag(environment_t *env) { // Get a start_time from the RTI synchronize_with_other_federates(); // Resets start_time in federated execution according to the RTI. } + // The start time will likely have changed. Adjust the current tag and stop tag. env->current_tag = (tag_t){.time = start_time, .microstep = 0u}; if (duration >= 0LL) { // A duration has been specified. Recalculate the stop time. env->stop_tag = ((tag_t) {.time = start_time + duration, .microstep = 0}); } -#endif _lf_initialize_timers(env); - // If the stop_tag is (0,0), also insert the shutdown - // reactions. This can only happen if the timeout time - // was set to 0. - if (lf_tag_compare(env->current_tag, env->stop_tag) >= 0) { - _lf_trigger_shutdown_reactions(env); - } + // If we have a non-zero STA offset, then we need to allow messages to arrive + // prior to the start time. To avoid spurious STP violations, we temporarily + // set the current time back by the STA offset. + env->current_tag = (tag_t){.time = start_time - _lf_fed_STA_offset, .microstep = 0u}; -#if defined FEDERATED // Call wait_until if federated. This is required because the startup procedure // in synchronize_with_other_federates() can decide on a new start_time that is // larger than the current physical time. @@ -739,15 +736,26 @@ void _lf_initialize_start_tag(environment_t *env) { // Here we wait until the start time and also release the environment mutex. // this means that the other worker threads will be allowed to start. We need // this to avoid potential deadlock in federated startup. - while(!wait_until(env, start_time, &env->event_q_changed)) {}; - LF_PRINT_DEBUG("Done waiting for start time " PRINTF_TIME ".", start_time); - LF_PRINT_DEBUG("Physical time is ahead of current time by " PRINTF_TIME ". This should be small.", + // NOTE: wait_until automatically adds _lf_fed_STA_offset. + while(!wait_until(env, start_time + _lf_fed_STA_offset, &env->event_q_changed)) {}; + LF_PRINT_DEBUG("Done waiting for start time + STA offset " PRINTF_TIME ".", start_time + _lf_fed_STA_offset); + LF_PRINT_DEBUG("Physical time is ahead of current time by " PRINTF_TIME + ". This should be close to the STA offset.", lf_time_physical() - start_time); - // Each federate executes the start tag (which is the current - // tag). Inform the RTI of this if needed. - send_next_event_tag(env, env->current_tag, true); -#endif + // Restore the current tag to match the start time. + env->current_tag = (tag_t){.time = start_time, .microstep = 0u}; + + // For messages that may have arrived while we were waiting, put + // reactions on the reaction queue. + _lf_pop_events(env); + + // If the stop_tag is (0,0), also insert the shutdown + // reactions. This can only happen if the timeout time + // was set to 0. + if (lf_tag_compare(env->current_tag, env->stop_tag) >= 0) { + _lf_trigger_shutdown_reactions(env); + } #ifdef FEDERATED_DECENTRALIZED // In federated execution (at least under decentralized coordination), @@ -759,7 +767,24 @@ void _lf_initialize_start_tag(environment_t *env) { // to be removed, if appropriate before proceeding to executing tag (0,0). _lf_wait_on_tag_barrier(env, (tag_t){.time=start_time,.microstep=0}); spawn_staa_thread(); -#endif // FEDERATED_DECENTRALIZED + + // Pull from the event queue any messages that have been received during waiting. + +#else // NOT FEDERATED_DECENTRALIZED + // Each federate executes the start tag (which is the current + // tag). Inform the RTI of this if needed. + send_next_event_tag(env, env->current_tag, true); +#endif // NOT FEDERATED_DECENTRALIZED +#else // NOT FEDERATED + _lf_initialize_timers(env); + + // If the stop_tag is (0,0), also insert the shutdown + // reactions. This can only happen if the timeout time + // was set to 0. + if (lf_tag_compare(env->current_tag, env->stop_tag) >= 0) { + _lf_trigger_shutdown_reactions(env); + } +#endif // NOT FEDERATED // Set the following boolean so that other thread(s), including federated threads, // know that the execution has started From e8aba6cab3a97a12843320002ad9f86a7df12db6 Mon Sep 17 00:00:00 2001 From: "Edward A. Lee" Date: Sat, 2 Dec 2023 14:01:45 -0800 Subject: [PATCH 2/2] Fixed comments --- core/threaded/reactor_threaded.c | 3 --- 1 file changed, 3 deletions(-) diff --git a/core/threaded/reactor_threaded.c b/core/threaded/reactor_threaded.c index 7769652a1..a462dd77c 100644 --- a/core/threaded/reactor_threaded.c +++ b/core/threaded/reactor_threaded.c @@ -736,7 +736,6 @@ void _lf_initialize_start_tag(environment_t *env) { // Here we wait until the start time and also release the environment mutex. // this means that the other worker threads will be allowed to start. We need // this to avoid potential deadlock in federated startup. - // NOTE: wait_until automatically adds _lf_fed_STA_offset. while(!wait_until(env, start_time + _lf_fed_STA_offset, &env->event_q_changed)) {}; LF_PRINT_DEBUG("Done waiting for start time + STA offset " PRINTF_TIME ".", start_time + _lf_fed_STA_offset); LF_PRINT_DEBUG("Physical time is ahead of current time by " PRINTF_TIME @@ -768,8 +767,6 @@ void _lf_initialize_start_tag(environment_t *env) { _lf_wait_on_tag_barrier(env, (tag_t){.time=start_time,.microstep=0}); spawn_staa_thread(); - // Pull from the event queue any messages that have been received during waiting. - #else // NOT FEDERATED_DECENTRALIZED // Each federate executes the start tag (which is the current // tag). Inform the RTI of this if needed.