Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Federated support without C include reliance #164

Merged
merged 16 commits into from
Feb 25, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ list(APPEND INFO_SOURCES ${GENERAL_SOURCES})
# Create the core library
add_library(core ${GENERAL_SOURCES})

# Add sources for either threaded or unthreaded runtime
if(DEFINED FEDERATED)
arengarajan99 marked this conversation as resolved.
Show resolved Hide resolved
include(federated/CMakeLists.txt)
endif()

# Add sources for either threaded or unthreaded runtime
if(DEFINED LF_THREADED)
message(STATUS "Including sources for threaded runtime with \
Expand Down
5 changes: 5 additions & 0 deletions core/federated/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
set(FEDERATED_SOURCES clock-sync.c federate.c net_util.c)
list(APPEND INFO_SOURCES ${FEDERATED_SOURCES})

list(TRANSFORM FEDERATED_SOURCES PREPEND federated/)
target_sources(core PRIVATE ${FEDERATED_SOURCES})
arengarajan99 marked this conversation as resolved.
Show resolved Hide resolved
17 changes: 10 additions & 7 deletions core/federated/RTI/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,11 @@ else()
endif()

# The following should not be done
include_directories(${CoreLib}/..)
include_directories(${CoreLib})
include_directories(${CoreLib}/federated)
include_directories(${CoreLib}/platform)
include_directories(${CoreLib}/utils)
# include_directories(${CoreLib}/..)
lhstrh marked this conversation as resolved.
Show resolved Hide resolved
# include_directories(${CoreLib})
# include_directories(${CoreLib}/federated)
# include_directories(${CoreLib}/platform)
# include_directories(${CoreLib}/utils)

set(IncludeDir ../../../include/core)
include_directories(${IncludeDir})
Expand All @@ -70,6 +70,9 @@ add_executable(
rti.c
${LF_PLATFORM_FILE}
${CoreLib}/platform/lf_unix_clock_support.c
${CoreLib}/utils/util.c
${CoreLib}/tag.c
${CoreLib}/federated/net_util.c
${CoreLib}/utils/pqueue.c
message_record/message_record.c
)
Expand All @@ -79,8 +82,8 @@ IF(CMAKE_BUILD_TYPE MATCHES DEBUG)
target_compile_definitions(RTI PUBLIC LOG_LEVEL=4)
ENDIF(CMAKE_BUILD_TYPE MATCHES DEBUG)

# Set LF_THREADING to get the threaded support
target_compile_definitions(RTI PUBLIC LF_THREADED=1)
# Set LF_THREADING to get the threaded support and FEDERATED to get federated compilation support
target_compile_definitions(RTI PUBLIC LF_THREADED=1 FEDERATED=1)
target_compile_definitions(RTI PUBLIC PLATFORM_${CMAKE_SYSTEM_NAME})

# Find threads and link to it
Expand Down
6 changes: 3 additions & 3 deletions core/federated/RTI/rti.c
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,10 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include <sys/wait.h> // Defines wait() for process to change state.

#include "platform.h" // Platform-specific types and functions
#include "util.c" // Defines print functions (e.g., lf_print).
#include "net_util.c" // Defines network functions.
#include "util.h" // Defines print functions (e.g., lf_print).
#include "net_util.h" // Defines network functions.
#include "net_common.h" // Defines message types, etc. Includes <pthread.h> and "reactor.h".
#include "tag.c" // Time-related types and functions.
#include "tag.h" // Time-related types and functions.
#include "rti.h"
#ifdef __RTI_AUTH__
#include <openssl/rand.h> // For secure random number generation.
Expand Down
7 changes: 7 additions & 0 deletions core/federated/clock-sync.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,18 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
* Utility functions for clock synchronization.
*/

#ifdef FEDERATED
#include <errno.h>
#include <math.h>
#include <string.h>
#include <stdlib.h>
#include <sys/socket.h>
#include <netinet/in.h>

#include "clock-sync.h"
#include "net_common.h"
#include "net_util.h"
#include "util.h"

/**
* Keep a record of connection statistics
Expand Down Expand Up @@ -567,3 +573,4 @@ int create_clock_sync_thread(lf_thread_t* thread_id) {
#endif // _LF_CLOCK_SYNC_ON
return 0;
}
#endif
45 changes: 13 additions & 32 deletions core/federated/federate.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,25 +30,31 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
* The main entry point is synchronize_with_other_federates().
*/

#ifdef FEDERATED
#ifndef PLATFORM_ARDUINO
#include <arpa/inet.h> // inet_ntop & inet_pton
#include <assert.h>
#include <errno.h> // Defined perror(), errno
#include <netdb.h> // Defines gethostbyname().
#include <netinet/in.h> // Defines struct sockaddr_in
#include <regex.h>
#include <strings.h> // Defines bzero().
#include <sys/socket.h>
#endif

#include <assert.h>
#include <errno.h> // Defined perror(), errno
#include <signal.h> // Defines sigaction.
#include <stdio.h>
#include <stdlib.h>
#include <strings.h> // Defines bzero().
#include <sys/socket.h>
#include <unistd.h> // Defines read(), write(), and close()

#include "clock-sync.c"
//clock-sync.c, net_util.c
#include "clock-sync.h"
#include "federate.h"
#include "lf_types.h"
#include "net_common.h"
#include "net_util.c"
#include "net_util.h"
#include "platform.h"
#include "reactor.h"
#include "reactor_common.h"
#include "reactor_threaded.h"
#include "scheduler.h"
Expand All @@ -61,10 +67,6 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
char* ERROR_SENDING_HEADER = "ERROR sending header information to federate via RTI";
char* ERROR_SENDING_MESSAGE = "ERROR sending message to federate via RTI";

// Mutex lock held while performing socket write and close operations.
lf_mutex_t outbound_socket_mutex;
lf_cond_t port_status_changed;

/**
* The state of this federate instance.
*/
Expand Down Expand Up @@ -101,28 +103,6 @@ federation_metadata_t federation_metadata = {
};


/**
* Thread that listens for inputs from other federates.
* This thread listens for messages of type MSG_TYPE_P2P_TAGGED_MESSAGE
* from the specified peer federate and calls schedule to
* schedule an event. If an error occurs or an EOF is received
* from the peer, then this procedure returns, terminating the
* thread.
* @param fed_id_ptr A pointer to a uint16_t containing federate ID being listened to.
* This procedure frees the memory pointed to before returning.
*/
void* listen_to_federates(void* args);


/**
* Generated function that sends information about connections between this federate and
* other federates where messages are routed through the RTI. Currently, this
* only includes logical connections when the coordination is centralized. This
* information is needed for the RTI to perform the centralized coordination.
* @see MSG_TYPE_NEIGHBOR_STRUCTURE in net_common.h
*/
void send_neighbor_structure_to_RTI(int rti_socket);

/**
* Create a server to listen to incoming physical
* connections from remote federates. This function
Expand Down Expand Up @@ -2857,3 +2837,4 @@ parse_rti_code_t parse_rti_addr(const char* rti_addr) {
void set_federation_id(const char* fid) {
federation_metadata.federation_id = fid;
}
#endif
3 changes: 3 additions & 0 deletions core/federated/net_util.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
* @section DESCRIPTION
* Utility functions for a federate in a federated execution.
*/

#ifdef FEDERATED
#include <assert.h>
#include <ctype.h>
#include <errno.h>
Expand Down Expand Up @@ -705,3 +707,4 @@ void extract_rti_addr_info(const char* rti_addr, rti_addr_info_t* rti_addr_info)
}
regfree(&regex_compiled);
}
#endif
2 changes: 2 additions & 0 deletions include/core/federated/clock-sync.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#ifndef CLOCK_SYNC_H
#define CLOCK_SYNC_H

#include "platform.h"

/**
* Number of required clock sync T4 messages per synchronization
* interval. The offset to the clock will not be adjusted until
Expand Down
106 changes: 106 additions & 0 deletions include/core/federated/federate.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#ifndef FEDERATE_H
#define FEDERATE_H

#include "tag.h"
#include "lf_types.h"
#include <stdbool.h>

#ifndef ADVANCE_MESSAGE_INTERVAL
#define ADVANCE_MESSAGE_INTERVAL MSEC(10)
#endif
Expand Down Expand Up @@ -239,6 +243,108 @@ typedef struct federation_metadata_t {
char* rti_user;
} federation_metadata_t;

/**
* Generated function that sends information about connections between this federate and
* other federates where messages are routed through the RTI. Currently, this
* only includes logical connections when the coordination is centralized. This
* information is needed for the RTI to perform the centralized coordination.
* @see MSG_TYPE_NEIGHBOR_STRUCTURE in net_common.h
*/
void send_neighbor_structure_to_RTI(int);

// Mutex lock held while performing socket write and close operations.
lf_mutex_t outbound_socket_mutex;
lf_cond_t port_status_changed;

/**
* Send a logical tag complete (LTC) message to the RTI
* unless an equal or later LTC has previously been sent.
* This function assumes the caller holds the mutex lock.
*
* @param tag_to_send The tag to send.
*/
void _lf_logical_tag_complete(tag_t);

/**
* Connect to the RTI at the specified host and port and return
* the socket descriptor for the connection. If this fails, the
* program exits. If it succeeds, it sets the _fed.socket_TCP_RTI global
* variable to refer to the socket for communicating with the RTI.
* @param hostname A hostname, such as "localhost".
* @param port_number A port number.
*/
void connect_to_rti(const char*, int);

/**
* Thread that listens for inputs from other federates.
* This thread listens for messages of type MSG_TYPE_P2P_MESSAGE,
* MSG_TYPE_P2P_TAGGED_MESSAGE, or MSG_TYPE_PORT_ABSENT (@see net_common.h) from the specified
* peer federate and calls the appropriate handling function for
* each message type. If an error occurs or an EOF is received
* from the peer, then this procedure sets the corresponding
* socket in _fed.sockets_for_inbound_p2p_connections
* to -1 and returns, terminating the thread.
* @param fed_id_ptr A pointer to a uint16_t containing federate ID being listened to.
* This procedure frees the memory pointed to before returning.
*/
void* listen_to_federates(void*);

/**
* Send a port absent message to federate with fed_ID, informing the
* remote federate that the current federate will not produce an event
* on this network port at the current logical time.
*
* @param additional_delay The offset applied to the timestamp
* using after. The additional delay will be greater or equal to zero
* if an after is used on the connection. If no after is given in the
* program, -1 is passed.
* @param port_ID The ID of the receiving port.
* @param fed_ID The fed ID of the receiving federate.
*/
void send_port_absent_to_federate(interval_t, unsigned short, unsigned short);

/**
* Send the specified timestamped message to the specified port in the
* specified federate via the RTI or directly to a federate depending on
* the given socket. The timestamp is calculated as current_logical_time +
* additional delay which is greater than or equal to zero.
* The port should be an input port of a reactor in
* the destination federate. This version does include the timestamp
* in the message. The caller can reuse or free the memory after this returns.
*
* If the socket connection to the remote federate or the RTI has been broken,
* then this returns 0 without sending. Otherwise, it returns 1.
*
* This method assumes that the caller does not hold the outbound_socket_mutex lock,
* which it acquires to perform the send.
*
* @note This function is similar to send_message() except that it
* sends timed messages and also contains logics related to time.
*
* @param additional_delay The offset applied to the timestamp
* using after. The additional delay will be greater or equal to zero
* if an after is used on the connection. If no after is given in the
* program, -1 is passed.
* @param message_type The type of the message being sent.
* Currently can be MSG_TYPE_TAGGED_MESSAGE for messages sent via
* RTI or MSG_TYPE_P2P_TAGGED_MESSAGE for messages sent between
* federates.
* @param port The ID of the destination port.
* @param federate The ID of the destination federate.
* @param next_destination_str The next destination in string format (RTI or federate)
* (used for reporting errors).
* @param length The message length.
* @param message The message.
* @return 1 if the message has been sent, 0 otherwise.
*/
int send_timed_message(interval_t,
int,
unsigned short,
unsigned short,
const char*,
size_t,
unsigned char*);

/**
* Synchronize the start with other federates via the RTI.
* This assumes that a connection to the RTI is already made
Expand Down
2 changes: 2 additions & 0 deletions include/core/federated/net_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,9 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#ifndef NET_COMMON_H
#define NET_COMMON_H

#ifndef PLATFORM_ARDUINO
lhstrh marked this conversation as resolved.
Show resolved Hide resolved
#include <pthread.h>
#endif

/**
* The timeout time in ns for TCP operations.
Expand Down
5 changes: 4 additions & 1 deletion include/core/federated/net_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,12 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#ifndef NET_UTIL_H
#define NET_UTIL_H

#ifndef PLATFORM_ARDUINO
arengarajan99 marked this conversation as resolved.
Show resolved Hide resolved
#include <sys/socket.h>
#include <sys/types.h>
#include <regex.h>
#endif

#include <sys/types.h>
#include <stdbool.h>

#include "../platform.h"
Expand Down
2 changes: 1 addition & 1 deletion lingua-franca-ref.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
arduino-mbed-support
arduino-fed-support