Skip to content

Commit

Permalink
Merge branch 'main' of github.com:lf-lang/reactor-c into shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
Jakio815 committed Jan 8, 2025
2 parents ef3345b + c2437e1 commit 311205d
Show file tree
Hide file tree
Showing 10 changed files with 111 additions and 41 deletions.
6 changes: 3 additions & 3 deletions core/federated/RTI/rti_remote.c
Original file line number Diff line number Diff line change
Expand Up @@ -1480,15 +1480,15 @@ void initialize_federate(federate_info_t* fed, uint16_t id) {
int32_t start_rti_server(uint16_t port) {
_lf_initialize_clock();
// Create the TCP socket server
if (create_TCP_server(port, &rti_remote->socket_descriptor_TCP, &rti_remote->final_port_TCP, true)) {
if (create_server(port, &rti_remote->socket_descriptor_TCP, &rti_remote->final_port_TCP, TCP, true)) {
lf_print_error_system_failure("RTI failed to create TCP server: %s.", strerror(errno));
};
lf_print("RTI: Listening for federates.");
// Create the UDP socket server
// Try to get the rti_remote->final_port_TCP + 1 port
if (rti_remote->clock_sync_global_status >= clock_sync_on) {
if (create_UDP_server(rti_remote->final_port_TCP + 1, &rti_remote->socket_descriptor_UDP,
&rti_remote->final_port_UDP, true)) {
if (create_server(rti_remote->final_port_TCP + 1, &rti_remote->socket_descriptor_UDP, &rti_remote->final_port_UDP,
UDP, true)) {
lf_print_error_system_failure("RTI failed to create UDP server: %s.", strerror(errno));
}
}
Expand Down
3 changes: 1 addition & 2 deletions core/federated/RTI/rti_remote.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,14 @@

#include "lf_types.h"
#include "pqueue_tag.h"
#include "socket_common.h"

/** Time allowed for federates to reply to stop request. */
#define MAX_TIME_FOR_REPLY_TO_STOP_REQUEST SEC(30)

/////////////////////////////////////////////
//// Data structures

typedef enum socket_type_t { TCP, UDP } socket_type_t;

/**
* Information about a federate known to the RTI, including its runtime state,
* mode of execution, and connectivity with other federates.
Expand Down
3 changes: 1 addition & 2 deletions core/federated/federate.c
Original file line number Diff line number Diff line change
Expand Up @@ -1917,8 +1917,7 @@ void lf_connect_to_rti(const char* hostname, int port) {

void lf_create_server(int specified_port) {
assert(specified_port <= UINT16_MAX && specified_port >= 0);
uint16_t port;
if (create_TCP_server(specified_port, &_fed.server_socket, &port, false)) {
if (create_server(specified_port, &_fed.server_socket, (uint16_t*)&_fed.server_port, TCP, false)) {
lf_print_error_system_failure("RTI failed to create TCP server: %s.", strerror(errno));
};
_fed.server_port = (int)port;
Expand Down
20 changes: 6 additions & 14 deletions core/federated/network/socket_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -133,11 +133,11 @@ static int set_socket_bind_option(int socket_descriptor, uint16_t specified_port
return used_port;
}

static int create_server(uint16_t port, int* final_socket, uint16_t* final_port, int sock_type,
bool increment_port_on_retry) {
int create_server(uint16_t port, int* final_socket, uint16_t* final_port, socket_type_t sock_type,
bool increment_port_on_retry) {
int socket_descriptor;
struct timeval timeout_time;
if (sock_type == 0) {
if (sock_type == TCP) {
// Create an IPv4 socket for TCP.
socket_descriptor = create_real_time_tcp_socket_errexit();
// Set the timeout time for the communications of the server
Expand All @@ -149,14 +149,14 @@ static int create_server(uint16_t port, int* final_socket, uint16_t* final_port,
timeout_time =
(struct timeval){.tv_sec = UDP_TIMEOUT_TIME / BILLION, .tv_usec = (UDP_TIMEOUT_TIME % BILLION) / 1000};
}
char* type = (sock_type == 0) ? "TCP" : "UDP";
char* type = (sock_type == TCP) ? "TCP" : "UDP";
if (socket_descriptor < 0) {
lf_print_error("Failed to create %s socket.", type);
return -1;
}
set_socket_timeout_option(socket_descriptor, &timeout_time);
uint16_t used_port = set_socket_bind_option(socket_descriptor, port, increment_port_on_retry);
if (sock_type == 0) {
int used_port = set_socket_bind_option(socket_descriptor, port, increment_port_on_retry);
if (sock_type == TCP) {
// Enable listening for socket connections.
// The second argument is the maximum number of queued socket requests,
// which according to the Mac man page is limited to 128.
Expand All @@ -170,14 +170,6 @@ static int create_server(uint16_t port, int* final_socket, uint16_t* final_port,
return 0;
}

int create_TCP_server(uint16_t port, int* final_socket, uint16_t* final_port, bool increment_port_on_retry) {
return create_server(port, final_socket, final_port, 0, increment_port_on_retry);
}

int create_UDP_server(uint16_t port, int* final_socket, uint16_t* final_port, bool increment_port_on_retry) {
return create_server(port, final_socket, final_port, 1, increment_port_on_retry);
}

/**
* Return true if either the socket to the RTI is broken or the socket is
* alive and the first unread byte on the socket's queue is MSG_TYPE_FAILED.
Expand Down
32 changes: 32 additions & 0 deletions core/reactor_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,38 @@ void lf_set_stop_tag(environment_t* env, tag_t tag) {
}
}

const char* lf_reactor_name(self_base_t* self) { return self->name; }

const char* lf_reactor_full_name(self_base_t* self) {
if (self->full_name != NULL) {
return self->full_name;
}
// First find the length of the full name.
size_t name_len = strlen(self->name);
size_t len = name_len;
self_base_t* parent = self->parent;
while (parent != NULL) {
len++; // For the dot.
len += strlen(parent->name);
parent = parent->parent;
}
self->full_name = (char*)lf_allocate(len + 1, sizeof(char), &self->allocations);
self->full_name[len] = '\0'; // Null terminate the string.

size_t location = len - name_len;
memcpy(&self->full_name[location], self->name, name_len);
parent = self->parent;
while (parent != NULL) {
location--;
self->full_name[location] = '.';
size_t parent_len = strlen(parent->name);
location -= parent_len;
memcpy(&self->full_name[location], parent->name, parent_len);
parent = parent->parent;
}
return self->full_name;
}

#ifdef FEDERATED_DECENTRALIZED

interval_t lf_get_stp_offset() { return lf_fed_STA_offset; }
Expand Down
21 changes: 21 additions & 0 deletions include/api/reaction_macros.h
Original file line number Diff line number Diff line change
Expand Up @@ -199,4 +199,25 @@
*/
#define lf_time_logical_elapsed() lf_time_logical_elapsed(self->base.environment)

/**
* @brief Return the instance name of the reactor.
*
* The instance name is the name of given to the instance created by the `new` operator in LF.
* If the instance is in a bank, then the name will have a suffix of the form `[bank_index]`.
*
* @param reactor The reactor to get the name of.
*/
#define lf_reactor_name(reactor) lf_reactor_name(&reactor->base)

/**
* @brief Return the fully qualified name of the reactor.
*
* The fully qualified name of a reactor is the instance name of the reactor concatenated with the names of all
* of its parents, separated by dots. If the reactor or any of its parents is a bank, then the name
* will have a suffix of the form `[bank_index]`.
*
* @param reactor The reactor to get the name of.
*/
#define lf_reactor_full_name(reactor) lf_reactor_full_name(&reactor->base)

#endif // REACTION_MACROS_H
3 changes: 3 additions & 0 deletions include/api/reaction_macros_undef.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,7 @@
#undef lf_time_logical
#undef lf_time_logical_elapsed

#undef lf_reactor_name
#undef lf_reactor_full_name

#endif // REACTION_MACROS_H
36 changes: 18 additions & 18 deletions include/core/federated/network/socket_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@
#define SOCKET_COMMON_H

#include "low_level_platform.h"
#define NUM_SOCKET_RETRIES 10

/**
* The amount of time to wait after a failed socket read or write before trying again. This defaults to 100 ms.
*/
#define DELAY_BETWEEN_SOCKET_RETRIES MSEC(100)

/**
Expand Down Expand Up @@ -63,6 +66,8 @@
*/
#define MSG_TYPE_FAILED 25

typedef enum socket_type_t { TCP, UDP } socket_type_t;

/**
* Mutex protecting socket close operations.
*/
Expand Down Expand Up @@ -93,28 +98,23 @@ int create_real_time_tcp_socket_errexit();
* @param port The port number to use or 0 to let the OS pick or 1 to start trying at DEFAULT_PORT.
* @param final_socket Pointer to the returned socket descriptor on which accepting connections will occur.
* @param final_port Pointer to the final port the server will use.
* @param sock_type Type of the socket, TCP or UDP.
* @param increment_port_on_retry Boolean to retry port increment.
* @return 0 for success, -1 for failure.
*/
int create_TCP_server(uint16_t port, int* final_socket, uint16_t* final_port, bool increment_port_on_retry);
/**
* @brief Create a UDP server that listens for socket connections.
*
* This function is just like create_TCP_server(), except that it creates a UDP server.
*
* @param port The port number to use or 0 to let the OS pick or 1 to start trying at DEFAULT_PORT.
* @param final_socket Pointer to the returned socket descriptor on which accepting connections will occur.
* @param final_port Pointer to the final port the server will use.
* @return 0 for success, -1 for failure.
*/
int create_UDP_server(uint16_t port, int* final_socket, uint16_t* final_port, bool increment_port_on_retry);
int create_server(uint16_t port, int* final_socket, uint16_t* final_port, socket_type_t sock_type,
bool increment_port_on_retry);

/**
* These two functions waits for an incoming connection request on the specified server socket.
* It blocks until a connection is successfully accepted. If an error occurs that is not
* Wait for an incoming connection request on the specified server socket.
* This blocks until a connection is successfully accepted. If an error occurs that is not
* temporary (e.g., `EAGAIN` or `EWOULDBLOCK`), it reports the error and exits. Temporary
* errors cause the function to retry accepting the connection.
* If the rti_socket is not -1, it checks if the RTI's server socket is still open.
*
* If the `rti_socket` is not -1, this function checks whether the specified socket is still open.
* If it is not open, then this function returns -1.
* This is useful for federates to determine whether they are still connected to the federation
* and to stop waiting when they are not.
*
* @param socket The server socket file descriptor that is listening for incoming connections.
* @param rti_socket The rti socket for the federate to check if it is still open.
Expand All @@ -126,8 +126,8 @@ int accept_socket(int socket, int rti_socket);

/**
*
* This function attempts to establish a TCP connection to the specified hostname
* and port. It uses `getaddrinfo` to resolve the hostname and retries the connection
* Attempt to establish a TCP connection to the specified hostname
* and port. This function uses `getaddrinfo` to resolve the hostname and retries the connection
* periodically if it fails. If the specified port is 0, it iterates through a range
* of default ports starting from `DEFAULT_PORT`. The function will stop retrying
* if the `CONNECT_TIMEOUT` is reached.
Expand Down
7 changes: 5 additions & 2 deletions include/core/lf_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -251,8 +251,8 @@ struct trigger_t {
* An allocation record that is used by a destructor for a reactor
* to free memory that has been dynamically allocated for the particular
* instance of the reactor. This will be an element of linked list.
* If the indirect field is true, then the allocated pointer points to
* pointer to allocated memory, rather than directly to the allocated memory.
* The `allocated` pointer points to the allocated memory, and the `next`
* pointer points to the next allocation record (or NULL if there are no more).
*/
typedef struct allocation_record_t {
void* allocated;
Expand All @@ -277,6 +277,9 @@ typedef struct self_base_t {
struct allocation_record_t* allocations;
struct reaction_t* executing_reaction; // The currently executing reaction of the reactor.
environment_t* environment;
char* name; // The name of the reactor. If a bank, appended with [index].
char* full_name; // The full name of the reactor or NULL if lf_reactor_full_name() is not called.
self_base_t* parent; // The parent of this reactor.
#if !defined(LF_SINGLE_THREADED)
void* reactor_mutex; // If not null, this is expected to point to an lf_mutex_t.
// It is not declared as such to avoid a dependence on platform.h.
Expand Down
21 changes: 21 additions & 0 deletions include/core/reactor.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,5 +128,26 @@ void lf_free_all_reactors(void);
*/
void lf_free_reactor(self_base_t* self);

/**
* @brief Return the instance name of the reactor.
*
* The instance name is the name of given to the instance created by the `new` operator in LF.
* If the instance is in a bank, then the name will have a suffix of the form `[bank_index]`.
*
* @param self The self struct of the reactor.
*/
const char* lf_reactor_name(self_base_t* self);

/**
* @brief Return the full name of the reactor.
*
* The fully qualified name of a reactor is the instance name of the reactor concatenated with the names of all
* of its parents, separated by dots. If the reactor or any of its parents is a bank, then the name
* will have a suffix of the form `[bank_index]`.
*
* @param self The self struct of the reactor.
*/
const char* lf_reactor_full_name(self_base_t* self);

#endif /* REACTOR_H */
/** @} */

0 comments on commit 311205d

Please sign in to comment.