reactor-c
C Runtime for Lingua Franca
Loading...
Searching...
No Matches
federate.h File Reference

Data structures and functions used and defined in federate.c. More...

#include <stdbool.h>
#include "tag.h"
#include "lf_types.h"
#include "environment.h"
#include "low_level_platform.h"

Go to the source code of this file.

Data Structures

struct  federate_instance_t
 
struct  federation_metadata_t
 

Macros

#define ADVANCE_MESSAGE_INTERVAL   MSEC(10)
 

Typedefs

typedef struct federate_instance_t federate_instance_t
 
typedef struct federation_metadata_t federation_metadata_t
 
typedef enum parse_rti_code_t parse_rti_code_t
 

Enumerations

enum  parse_rti_code_t {
  SUCCESS , INVALID_PORT , INVALID_HOST , INVALID_USER ,
  FAILED_TO_PARSE
}
 

Functions

void lf_connect_to_federate (uint16_t)
 Connect to the federate with the specified id.
 
void lf_connect_to_rti (const char *hostname, int port_number)
 Connect to the RTI at the specified host and port.
 
void lf_create_server (int specified_port)
 Create a server to listen to incoming P2P connections.
 
void lf_enqueue_port_absent_reactions (environment_t *env)
 Enqueue port absent reactions.
 
void * lf_handle_p2p_connections_from_federates (void *)
 Thread to accept connections from other federates.
 
void lf_latest_tag_confirmed (tag_t)
 Send a latest tag confirmed (LTC) signal to the RTI.
 
parse_rti_code_t lf_parse_rti_addr (const char *rti_addr)
 Parse the address of the RTI and store them into the global federation_metadata struct.
 
void lf_reset_status_fields_on_input_port_triggers ()
 Reset the status fields on network input ports to unknown or absent.
 
int lf_send_message (int message_type, unsigned short port, unsigned short federate, const char *next_destination_str, size_t length, unsigned char *message)
 Send a message to another federate.
 
void lf_send_neighbor_structure_to_RTI (int)
 Send information about connections to the RTI.
 
tag_t lf_send_next_event_tag (environment_t *env, tag_t tag, bool wait_for_reply)
 Send a next event tag (NET) signal.
 
void lf_send_port_absent_to_federate (environment_t *env, interval_t additional_delay, unsigned short port_ID, unsigned short fed_ID)
 Send a port absent message.
 
int lf_send_stop_request_to_rti (tag_t stop_tag)
 Send a MSG_TYPE_STOP_REQUEST message to the RTI.
 
int lf_send_tagged_message (environment_t *env, interval_t additional_delay, int message_type, unsigned short port, unsigned short federate, const char *next_destination_str, size_t length, unsigned char *message)
 Send a tagged message to the specified port of the specified federate.
 
void lf_set_federation_id (const char *fid)
 Set the federation_id of this federate.
 
void lf_stall_advance_level_federation (environment_t *env, size_t level)
 Wait until inputs statuses are known up to and including the specified level.
 
void lf_stall_advance_level_federation_locked (size_t level)
 Version of lf_stall_advance_level_federation() that assumes the caller holds the mutex lock.
 
void lf_synchronize_with_other_federates ()
 Synchronize the start with other federates via the RTI.
 
bool lf_update_max_level (tag_t tag, bool is_provisional)
 Update the max level allowed to advance (MLAA).
 

Variables

lf_mutex_t lf_outbound_socket_mutex
 
lf_cond_t lf_port_status_changed
 

Detailed Description

Data structures and functions used and defined in federate.c.

Author
Soroush Bateni
Peter Donovan
Edward A. Lee
Anirudh Rengarajsm

Macro Definition Documentation

◆ ADVANCE_MESSAGE_INTERVAL

#define ADVANCE_MESSAGE_INTERVAL   MSEC(10)

Typedef Documentation

◆ federate_instance_t

typedef struct federate_instance_t federate_instance_t

Structure that a federate instance uses to keep track of its own state.

◆ federation_metadata_t

typedef struct federation_metadata_t federation_metadata_t

◆ parse_rti_code_t

Enumeration Type Documentation

◆ parse_rti_code_t

Enumerator
SUCCESS 
INVALID_PORT 
INVALID_HOST 
INVALID_USER 
FAILED_TO_PARSE 

Function Documentation

◆ lf_connect_to_federate()

void lf_connect_to_federate ( uint16_t )

Connect to the federate with the specified id.

The established connection will then be used in functions such as lf_send_tagged_message() to send messages directly to the specified federate. This function first sends an MSG_TYPE_ADDRESS_QUERY message to the RTI to obtain the IP address and port number of the specified federate. It then attempts to establish a socket connection to the specified federate. If this fails, the program exits. If it succeeds, it sets element [id] of the _fed.sockets_for_outbound_p2p_connections global array to refer to the socket for communicating directly with the federate.

Parameters
remote_federate_idThe ID of the remote federate.

◆ lf_connect_to_rti()

void lf_connect_to_rti ( const char * hostname,
int port_number )

Connect to the RTI at the specified host and port.

This will return the socket descriptor for the connection. If port_number is 0, then start at DEFAULT_PORT and increment the port number on each attempt. If an attempt fails, wait CONNECT_RETRY_INTERVAL and try again. If it fails after CONNECT_TIMEOUT, the program exits. If it succeeds, it sets the _fed.socket_TCP_RTI global variable to refer to the socket for communicating with the RTI.

Parameters
hostnameA hostname, such as "localhost".
port_numberA port number or 0 to start with the default.

◆ lf_create_server()

void lf_create_server ( int specified_port)

Create a server to listen to incoming P2P connections.

Such connections are used for physical connections or any connection if using decentralized coordination. This function only handles the creation of the server socket. The bound port for the server socket is then sent to the RTI by sending an MSG_TYPE_ADDRESS_ADVERTISEMENT message (

See also
net_common.h). This function expects no response from the RTI.

If a port is specified by the user, that will be used. Otherwise, a random port will be assigned. If the bind fails, it will retry after PORT_BIND_RETRY_INTERVAL until it has tried PORT_BIND_RETRY_LIMIT times. Then it will fail.

Parameters
specified_portThe port specified by the user or 0 to use a random port.

◆ lf_enqueue_port_absent_reactions()

void lf_enqueue_port_absent_reactions ( environment_t * env)

Enqueue port absent reactions.

These reactions will send a MSG_TYPE_PORT_ABSENT message to downstream federates if a given network output port is not present.

Parameters
envThe environment of the federate

◆ lf_handle_p2p_connections_from_federates()

void * lf_handle_p2p_connections_from_federates ( void * )

Thread to accept connections from other federates.

This thread accepts connections from federates that send messages directly to this one (not through the RTI). This thread starts a thread for each accepted socket connection to read messages and, once it has opened all expected sockets, exits.

Parameters
ignoredNo argument needed for this thread.

◆ lf_latest_tag_confirmed()

void lf_latest_tag_confirmed ( tag_t )

Send a latest tag confirmed (LTC) signal to the RTI.

This avoids the send if an equal or later LTC has previously been sent.

This function assumes the caller holds the mutex lock on the top-level environment.

Parameters
tag_to_sendThe tag to send.

◆ lf_parse_rti_addr()

parse_rti_code_t lf_parse_rti_addr ( const char * rti_addr)

Parse the address of the RTI and store them into the global federation_metadata struct.

Returns
a parse_rti_code_t indicating the result of the parse.

◆ lf_reset_status_fields_on_input_port_triggers()

void lf_reset_status_fields_on_input_port_triggers ( )

Reset the status fields on network input ports to unknown or absent.

This will reset to absent if the last_known_status_tag field of the port is greater than or equal to the current tag of the top-level environment. This should be overriden to present if an event gets scheduled. Otherwise, set the status to unknown.

Note
This function must be called at the beginning of each logical time.

◆ lf_send_message()

int lf_send_message ( int message_type,
unsigned short port,
unsigned short federate,
const char * next_destination_str,
size_t length,
unsigned char * message )

Send a message to another federate.

This function is used for physical connections between federates. If the socket connection to the remote federate or the RTI has been broken, then this returns -1 without sending. Otherwise, it returns 0.

This method assumes that the caller does not hold the lf_outbound_socket_mutex lock, which it acquires to perform the send.

Parameters
message_typeThe type of the message being sent (currently only MSG_TYPE_P2P_MESSAGE).
portThe ID of the destination port.
federateThe ID of the destination federate.
next_destination_strThe name of the next destination in string format (for reporting).
lengthThe message length.
messageThe message.
Returns
0 if the message has been sent, -1 otherwise.

◆ lf_send_neighbor_structure_to_RTI()

void lf_send_neighbor_structure_to_RTI ( int )

Send information about connections to the RTI.

This is a generated function that sends information about connections between this federate and other federates where messages are routed through the RTI. Currently, this only includes logical connections when the coordination is centralized. This information is needed for the RTI to perform the centralized coordination.

See also
MSG_TYPE_NEIGHBOR_STRUCTURE in net_common.h

◆ lf_send_next_event_tag()

tag_t lf_send_next_event_tag ( environment_t * env,
tag_t tag,
bool wait_for_reply )

Send a next event tag (NET) signal.

If this federate depends on upstream federates or sends data to downstream federates, then send to the RTI a NET, which will give the tag of the earliest event on the event queue, or, if the queue is empty, the timeout time, or, if there is no timeout, FOREVER.

If there are network outputs that depend on physical actions, then insert a dummy event to ensure this federate advances its tag so that downstream federates can make progress.

A NET is a promise saying that, absent network inputs, this federate will not produce an output message with tag earlier than the NET value.

If there are upstream federates, then after sending a NET, this will block until either the RTI grants the advance to the requested time or the wait for the response from the RTI is interrupted by a change in the event queue (e.g., a physical action triggered or a network message arrived). If there are no upstream federates, then it will not wait for a TAG (which won't be forthcoming anyway) and returns the earliest tag on the event queue.

If the federate has neither upstream nor downstream federates, then this returns the specified tag immediately without sending anything to the RTI.

If there is at least one physical action somewhere in the federate that can trigger an output to a downstream federate, then the NET is required to be less than the current physical time. If physical time is less than the earliest event in the event queue (or the event queue is empty), then this function will insert a dummy event with a tag equal to the current physical time (and a microstep of 0). This will enforce advancement of tag for this federate and causes a NET message to be sent repeatedly as physical time advances with the time interval between messages controlled by the target parameter coordination-options: {advance-message-interval timevalue}. It will stop creating dummy events if and when its event queue has an event with a timestamp less than physical time.

If wait_for_reply is false, then this function will simply send the specified tag and return that tag immediately. This is useful when a federate is shutting down and will not be sending any more messages at all.

In all cases, this returns either the specified tag or another tag when it is safe to advance logical time to the returned tag. The returned tag may be less than the specified tag if there are upstream federates and either the RTI responds with a lesser tag or the wait for a response from the RTI is interrupted by a change in the event queue.

This function is used in centralized coordination only.

This function assumes the caller holds the mutex lock.

Parameters
envThe environment of the federate
tagThe tag.
wait_for_replyIf true, wait for a reply.

◆ lf_send_port_absent_to_federate()

void lf_send_port_absent_to_federate ( environment_t * env,
interval_t additional_delay,
unsigned short port_ID,
unsigned short fed_ID )

Send a port absent message.

This informs the remote federate that it will not receive a message with tag less than the current tag of the specified environment delayed by the additional_delay.

Parameters
envThe environment from which to get the current tag.
additional_delayThe after delay of the connection or NEVER if none.
port_IDThe ID of the receiving port.
fed_IDThe fed ID of the receiving federate.

◆ lf_send_stop_request_to_rti()

int lf_send_stop_request_to_rti ( tag_t stop_tag)

Send a MSG_TYPE_STOP_REQUEST message to the RTI.

The payload is the specified tag plus one microstep. If this federate has previously received a stop request from the RTI, then do not send the message and return 1. Return -1 if the socket is disconnected. Otherwise, return 0.

Returns
0 if the message is sent.

◆ lf_send_tagged_message()

int lf_send_tagged_message ( environment_t * env,
interval_t additional_delay,
int message_type,
unsigned short port,
unsigned short federate,
const char * next_destination_str,
size_t length,
unsigned char * message )

Send a tagged message to the specified port of the specified federate.

The tag will be the current tag of the specified environment delayed by the specified additional_delay. If the delayed tag falls after the timeout time, then the message is not sent and -1 is returned. The caller can reuse or free the memory storing the message after this returns.

If the message fails to send (e.g. the socket connection is broken), then the response depends on the message_type. For MSG_TYPE_TAGGED_MESSAGE, the message is supposed to go via the RTI, and failure to communicate with the RTI is a critical failure. In this case, the program will exit with an error message. If the message type is MSG_TYPE_P2P_TAGGED_MESSAGE, then the failure is not critical. It may be due to the remote federate having exited, for example, because its safe-to-process offset led it to believe that there were no messages forthcoming. In this case, on failure to send the message, this function returns -11.

This method assumes that the caller does not hold the lf_outbound_socket_mutex lock, which it acquires to perform the send.

Parameters
envThe environment from which to get the current tag.
additional_delayThe after delay on the connection or NEVER is there is none.
message_typeThe type of the message being sent. Currently can be MSG_TYPE_TAGGED_MESSAGE for messages sent via the RTI or MSG_TYPE_P2P_TAGGED_MESSAGE for messages sent directly between federates.
portThe ID of the destination port.
federateThe ID of the destination federate.
next_destination_strThe next destination in string format (RTI or federate) (used for reporting errors).
lengthThe message length.
messageThe message.
Returns
0 if the message has been sent, 1 otherwise.

◆ lf_set_federation_id()

void lf_set_federation_id ( const char * fid)

Set the federation_id of this federate.

Parameters
fidThe federation ID.

◆ lf_stall_advance_level_federation()

void lf_stall_advance_level_federation ( environment_t * env,
size_t level )

Wait until inputs statuses are known up to and including the specified level.

Specifically, wait until the specified level is less that the max level allowed to advance (MLAA).

Parameters
envThe environment (which should always be the top-level environment).
levelThe level to which we would like to advance.

◆ lf_stall_advance_level_federation_locked()

void lf_stall_advance_level_federation_locked ( size_t level)

Version of lf_stall_advance_level_federation() that assumes the caller holds the mutex lock.

Parameters
levelThe level to which we would like to advance.

◆ lf_synchronize_with_other_federates()

void lf_synchronize_with_other_federates ( )

Synchronize the start with other federates via the RTI.

This assumes that a connection to the RTI is already made and _lf_rti_socket_TCP is valid. It then sends the current logical time to the RTI and waits for the RTI to respond with a specified time. It starts a thread to listen for messages from the RTI.

◆ lf_update_max_level()

bool lf_update_max_level ( tag_t tag,
bool is_provisional )

Update the max level allowed to advance (MLAA).

If the specified tag is greater than the current_tag of the top-level environment (or equal and is_provisional is false), then set the MLAA to INT_MAX and return. This removes any barriers on execution at the current tag due to network inputs. Otherwise, set the MLAA to the minimum level over all (non-physical) network input ports where the status of the input port is not known at that current_tag.

This function assumes that the caller holds the mutex.

Parameters
tagThe latest TAG or PTAG received by this federate.
is_provisionalWhether the tag was provisional.
Returns
True if the MLAA changed.

Variable Documentation

◆ lf_outbound_socket_mutex

lf_mutex_t lf_outbound_socket_mutex
extern

Mutex lock held while performing socket write and close operations.

◆ lf_port_status_changed

lf_cond_t lf_port_status_changed
extern

Condition variable for blocking on unkonwn federate input ports.