Skip to content

Commit

Permalink
Move create_rti_server to socket_common.c
Browse files Browse the repository at this point in the history
  • Loading branch information
Jakio815 committed Dec 16, 2024
1 parent ba6c35b commit e4363f2
Show file tree
Hide file tree
Showing 2 changed files with 134 additions and 509 deletions.
126 changes: 0 additions & 126 deletions core/federated/RTI/rti_remote.c
Original file line number Diff line number Diff line change
Expand Up @@ -52,132 +52,6 @@ extern int lf_critical_section_enter(environment_t* env) { return lf_mutex_lock(

extern int lf_critical_section_exit(environment_t* env) { return lf_mutex_unlock(&rti_mutex); }

/**
* Create a server and enable listening for socket connections.
* If the specified port if it is non-zero, it will attempt to acquire that port.
* If it fails, it will repeatedly attempt up to PORT_BIND_RETRY_LIMIT times with
* a delay of PORT_BIND_RETRY_INTERVAL in between. If the specified port is
* zero, then it will attempt to acquire DEFAULT_PORT first. If this fails, then it
* will repeatedly attempt up to PORT_BIND_RETRY_LIMIT times, incrementing the port
* number between attempts, with no delay between attempts. Once it has incremented
* the port number MAX_NUM_PORT_ADDRESSES times, it will cycle around and begin again
* with DEFAULT_PORT.
*
* @param port The port number to use or 0 to start trying at DEFAULT_PORT.
* @param socket_type The type of the socket for the server (TCP or UDP).
* @return The socket descriptor on which to accept connections.
*/
static int create_rti_server(uint16_t port, socket_type_t socket_type) {
// Timeout time for the communications of the server
struct timeval timeout_time = {.tv_sec = TCP_TIMEOUT_TIME / BILLION, .tv_usec = (TCP_TIMEOUT_TIME % BILLION) / 1000};
// Create an IPv4 socket for TCP (not UDP) communication over IP (0).
int socket_descriptor = -1;
if (socket_type == TCP) {
socket_descriptor = create_real_time_tcp_socket_errexit();
} else if (socket_type == UDP) {
socket_descriptor = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
// Set the appropriate timeout time
timeout_time =
(struct timeval){.tv_sec = UDP_TIMEOUT_TIME / BILLION, .tv_usec = (UDP_TIMEOUT_TIME % BILLION) / 1000};
}
if (socket_descriptor < 0) {
lf_print_error_system_failure("Failed to create RTI socket.");
}

// Set the option for this socket to reuse the same address
int true_variable = 1; // setsockopt() requires a reference to the value assigned to an option
if (setsockopt(socket_descriptor, SOL_SOCKET, SO_REUSEADDR, &true_variable, sizeof(int32_t)) < 0) {
lf_print_error("RTI failed to set SO_REUSEADDR option on the socket: %s.", strerror(errno));
}
// Set the timeout on the socket so that read and write operations don't block for too long
if (setsockopt(socket_descriptor, SOL_SOCKET, SO_RCVTIMEO, (const char*)&timeout_time, sizeof(timeout_time)) < 0) {
lf_print_error("RTI failed to set SO_RCVTIMEO option on the socket: %s.", strerror(errno));
}
if (setsockopt(socket_descriptor, SOL_SOCKET, SO_SNDTIMEO, (const char*)&timeout_time, sizeof(timeout_time)) < 0) {
lf_print_error("RTI failed to set SO_SNDTIMEO option on the socket: %s.", strerror(errno));
}

/*
* The following used to permit reuse of a port that an RTI has previously
* used that has not been released. We no longer do this, and instead retry
* some number of times after waiting.
// SO_REUSEPORT (since Linux 3.9)
// Permits multiple AF_INET or AF_INET6 sockets to be bound to an
// identical socket address. This option must be set on each
// socket (including the first socket) prior to calling bind(2)
// on the socket. To prevent port hijacking, all of the
// processes binding to the same address must have the same
// effective UID. This option can be employed with both TCP and
// UDP sockets.
int reuse = 1;
#ifdef SO_REUSEPORT
if (setsockopt(socket_descriptor, SOL_SOCKET, SO_REUSEPORT,
(const char*)&reuse, sizeof(reuse)) < 0) {
perror("setsockopt(SO_REUSEPORT) failed");
}
#endif
*/

// Server file descriptor.
struct sockaddr_in server_fd;
// Zero out the server address structure.
bzero((char*)&server_fd, sizeof(server_fd));

uint16_t specified_port = port;
if (specified_port == 0)
port = DEFAULT_PORT;

server_fd.sin_family = AF_INET; // IPv4
server_fd.sin_addr.s_addr = INADDR_ANY; // All interfaces, 0.0.0.0.
// Convert the port number from host byte order to network byte order.
server_fd.sin_port = htons(port);

int result = bind(socket_descriptor, (struct sockaddr*)&server_fd, sizeof(server_fd));

// Try repeatedly to bind to a port. If no specific port is specified, then
// increment the port number each time.

int count = 1;
while (result != 0 && count++ < PORT_BIND_RETRY_LIMIT) {
if (specified_port == 0) {
lf_print_warning("RTI failed to get port %d.", port);
port++;
if (port >= DEFAULT_PORT + MAX_NUM_PORT_ADDRESSES)
port = DEFAULT_PORT;
lf_print_warning("RTI will try again with port %d.", port);
server_fd.sin_port = htons(port);
// Do not sleep.
} else {
lf_print("RTI failed to get port %d. Will try again.", port);
lf_sleep(PORT_BIND_RETRY_INTERVAL);
}
result = bind(socket_descriptor, (struct sockaddr*)&server_fd, sizeof(server_fd));
}
if (result != 0) {
lf_print_error_and_exit("Failed to bind the RTI socket. Port %d is not available. ", port);
}
char* type = "TCP";
if (socket_type == UDP) {
type = "UDP";
}
lf_print("RTI using %s port %d for federation %s.", type, port, rti_remote->federation_id);

if (socket_type == TCP) {
rti_remote->final_port_TCP = port;
// Enable listening for socket connections.
// The second argument is the maximum number of queued socket requests,
// which according to the Mac man page is limited to 128.
listen(socket_descriptor, 128);
} else if (socket_type == UDP) {
rti_remote->final_port_UDP = port;
// No need to listen on the UDP socket
}

return socket_descriptor;
}

void notify_tag_advance_grant(scheduling_node_t* e, tag_t tag) {
if (e->state == NOT_CONNECTED || lf_tag_compare(tag, e->last_granted) <= 0 ||
lf_tag_compare(tag, e->last_provisionally_granted) < 0) {
Expand Down
Loading

0 comments on commit e4363f2

Please sign in to comment.