reactor-c
C Runtime for Lingua Franca
|
Runtime infrastructure for the threaded version of the C target of Lingua Franca. More...
#include <assert.h>
#include <signal.h>
#include <string.h>
#include <time.h>
#include "lf_types.h"
#include "low_level_platform.h"
#include "reactor_threaded.h"
#include "reactor.h"
#include "scheduler.h"
#include "tag.h"
#include "environment.h"
#include "rti_local.h"
#include "reactor_common.h"
#include "watchdog.h"
Macros | |
#define | NUMBER_OF_WORKERS 1 |
#define | MAX_STALL_INTERVAL MSEC(1) |
Functions | |
void | _lf_increment_tag_barrier_locked (environment_t *env, tag_t future_tag) |
Version of _lf_increment_tag_barrier to call when the caller holds the mutex. This version does not acquire the mutex belonging to env. | |
void | _lf_increment_tag_barrier (environment_t *env, tag_t future_tag) |
void | _lf_decrement_tag_barrier_locked (environment_t *env) |
int | _lf_wait_on_tag_barrier (environment_t *env, tag_t proposed_tag) |
void | lf_set_present (lf_port_base_t *port) |
Mark the given port's is_present field as true. | |
bool | wait_until (instant_t wait_until_time, lf_cond_t *condition) |
Wait until physical time matches or exceeds the time of the specified tag. | |
tag_t | get_next_event_tag (environment_t *env) |
tag_t | send_next_event_tag (environment_t *env, tag_t tag, bool wait_for_reply) |
void | _lf_next_locked (environment_t *env) |
void | lf_request_stop (void) |
Request a stop to execution as soon as possible. | |
void | _lf_trigger_reaction (environment_t *env, reaction_t *reaction, int worker_number) |
Trigger the specified reaction on the specified worker in the specified environment. | |
void | _lf_initialize_start_tag (environment_t *env) |
bool | _lf_worker_handle_deadline_violation_for_reaction (environment_t *env, int worker_number, reaction_t *reaction) |
bool | _lf_worker_handle_STP_violation_for_reaction (environment_t *env, int worker_number, reaction_t *reaction) |
bool | _lf_worker_handle_violations (environment_t *env, int worker_number, reaction_t *reaction) |
void | _lf_worker_invoke_reaction (environment_t *env, int worker_number, reaction_t *reaction) |
void | _lf_worker_do_work (environment_t *env, int worker_number) |
The main looping logic of each LF worker thread. | |
void * | worker (void *arg) |
void | lf_print_snapshot (environment_t *env) |
Print a snapshot of the priority queues used during execution (for debugging). | |
void | determine_number_of_workers (void) |
Determine the number of workers. | |
int | lf_reactor_c_main (int argc, const char *argv[]) |
int | lf_notify_of_event (environment_t *env) |
Notify of new event by broadcasting on a condition variable. | |
int | lf_critical_section_enter (environment_t *env) |
Enter critical section by locking the global mutex. | |
int | lf_critical_section_exit (environment_t *env) |
Leave a critical section by unlocking the global mutex. | |
Variables | |
instant_t | start_time |
lf_mutex_t | global_mutex |
bool | lf_stop_requested = false |
True if stop has been requested so it doesn't get re-requested. | |
int | worker_thread_count = 0 |
Runtime infrastructure for the threaded version of the C target of Lingua Franca.
#define MAX_STALL_INTERVAL MSEC(1) |
The maximum amount of time a worker thread should stall before checking the reaction queue again. This is not currently used.
#define NUMBER_OF_WORKERS 1 |
void _lf_decrement_tag_barrier_locked | ( | environment_t * | env | ) |
Decrement the total number of pending barrier requests for the environment tag barrier. If the total number of requests reaches zero, this function resets the tag barrier to FOREVER_TAG and notifies all threads that are waiting on the barrier that the number of requests has reached zero.
This function assumes that the caller already holds the mutex lock on env.
env | The environment in which we are executing. |
void _lf_increment_tag_barrier | ( | environment_t * | env, |
tag_t | future_tag ) |
Raise a barrier to prevent the current tag for the specified environment from advancing to or beyond the value of the future_tag argument, if possible. If the current tag is already at or beyond future_tag, then prevent any further advances. This function will increment the total number of pending barrier requests. For each call to this function, there should always be a subsequent call to _lf_decrement_tag_barrier_locked() to release the barrier.
If there is already a barrier raised at a tag later than future_tag, this function will change the barrier to future_tag or the current tag, whichever is larger. If the existing barrier is earlier than future_tag, this function will not change the barrier. If there are no existing barriers and future_tag is in the past relative to the current tag, this function will raise a barrier to the current tag plus one microstep.
This function acquires the mutex on the specified environment.
env | Environment within which we are executing. |
future_tag | A desired tag for the barrier. This function will guarantee that current logical time will not go past future_tag if it is in the future. If future_tag is in the past (or equals to current logical time), the runtime will freeze advancement of logical time. |
void _lf_increment_tag_barrier_locked | ( | environment_t * | env, |
tag_t | future_tag ) |
Version of _lf_increment_tag_barrier to call when the caller holds the mutex. This version does not acquire the mutex belonging to env.
env | Environment within which we are executing. |
future_tag | A desired tag for the barrier. This function will guarantee that current logical time will not go past future_tag if it is in the future. If future_tag is in the past (or equals to current logical time), the runtime will freeze advancement of logical time. |
void _lf_initialize_start_tag | ( | environment_t * | env | ) |
Perform the necessary operations before tag (0,0) can be processed.
This includes injecting any reactions triggered at (0,0), initializing timers, and for the federated execution, waiting for a proper coordinated start.
This assumes the mutex lock is held by the caller.
env | Environment within which we are executing. |
void _lf_next_locked | ( | environment_t * | env | ) |
If there is at least one event in the event queue, then wait until physical time matches or exceeds the time of the least tag on the event queue; pop the next event(s) from the event queue that all have the same tag; extract from those events the reactions that are to be invoked at this logical time and insert them into the reaction queue. The event queue is sorted by time tag.
If there is no event in the queue and the keepalive command-line option was not given, and this is not a federated execution with centralized coordination, set the stop tag to the current tag. If keepalive was given, then wait for either lf_request_stop() to be called or an event appears in the event queue and then return.
Every time tag is advanced, it is checked against stop tag and if they are equal, shutdown reactions are triggered.
This does not acquire the mutex lock. It assumes the lock is already held.
env | Environment within which we are executing. |
void _lf_trigger_reaction | ( | environment_t * | env, |
reaction_t * | reaction, | ||
int | worker_number ) |
Trigger the specified reaction on the specified worker in the specified environment.
env | Environment in which we are executing. |
reaction | The reaction. |
worker_number | The ID of the worker that is making this call. 0 should be used if there is only one worker (e.g., when the program is using the single-threaded C runtime). -1 is used for an anonymous call in a context where a worker number does not make sense (e.g., the caller is not a worker thread). |
int _lf_wait_on_tag_barrier | ( | environment_t * | env, |
tag_t | proposed_tag ) |
If the proposed_tag is greater than or equal to a barrier tag that has been set by a call to _lf_increment_tag_barrier or _lf_increment_tag_barrier_locked, and if there are requestors still pending on that barrier, then wait until all requestors have been satisfied. This is used in federated execution when an incoming timed message has been partially read so that we know its tag, but the rest of message has not yet been read and hence the event has not yet appeared on the event queue. To prevent tardiness, this function blocks the advancement of time until to the proposed tag until the message has been put onto the event queue.
If the proposed_tag is greater than the stop tag, then use the stop tag instead.
This function assumes the mutex is already locked. Thus, it unlocks the mutex while it's waiting to allow the tag barrier to change.
env | Environment within which we are executing. |
proposed_tag | The tag that the runtime wants to advance to. |
void _lf_worker_do_work | ( | environment_t * | env, |
int | worker_number ) |
The main looping logic of each LF worker thread.
This function returns when the scheduler's lf_sched_get_ready_reaction() implementation returns NULL, indicating that there are no more reactions to execute.
This function assumes the caller does not hold the mutex lock on the environment.
env | Environment within which we are executing. |
worker_number | The number assigned to this worker thread |
bool _lf_worker_handle_deadline_violation_for_reaction | ( | environment_t * | env, |
int | worker_number, | ||
reaction_t * | reaction ) |
Handle deadline violation for 'reaction'. The mutex should NOT be locked when this function is called. It might acquire the mutex when scheduling the reactions that are triggered as a result of executing the deadline violation handler on the 'reaction', if it exists.
env | Environment within which we are executing. |
worker_number | The ID of the worker. |
reaction | The reaction whose deadline has been violated. |
bool _lf_worker_handle_STP_violation_for_reaction | ( | environment_t * | env, |
int | worker_number, | ||
reaction_t * | reaction ) |
Handle STP violation for 'reaction'. The mutex should NOT be locked when this function is called. It might acquire the mutex when scheduling the reactions that are triggered as a result of executing the STP violation handler on the 'reaction', if it exists.
env | Environment within which we are executing. |
worker_number | The ID of the worker. |
reaction | The reaction whose STP offset has been violated. |
bool _lf_worker_handle_violations | ( | environment_t * | env, |
int | worker_number, | ||
reaction_t * | reaction ) |
Handle violations for 'reaction'. Currently limited to deadline violations and STP violations. The mutex should NOT be locked when this function is called. It might acquire the mutex when scheduling the reactions that are triggered as a result of executing the deadline or STP violation handler(s) on the 'reaction', if they exist.
env | Environment within which we are executing. |
worker_number | The ID of the worker. |
reaction | The reaction. |
void _lf_worker_invoke_reaction | ( | environment_t * | env, |
int | worker_number, | ||
reaction_t * | reaction ) |
Invoke 'reaction' and schedule any resulting triggered reaction(s) on the reaction queue. The mutex should NOT be locked when this function is called. It might acquire the mutex when scheduling the reactions that are triggered as a result of executing 'reaction'.
env | Environment within which we are executing. |
worker_number | The ID of the worker. |
reaction | The reaction to invoke. |
void determine_number_of_workers | ( | void | ) |
Determine the number of workers.
tag_t get_next_event_tag | ( | environment_t * | env | ) |
Return the tag of the next event on the event queue. If the event queue is empty then return either FOREVER_TAG or, is a stop_time (timeout time) has been set, the stop time.
env | Environment within which we are executing. |
int lf_critical_section_enter | ( | environment_t * | env | ) |
Enter critical section by locking the global mutex.
Enter critical section within an environment.
env | Environment within which we are executing or GLOBAL_ENVIRONMENT. |
int lf_critical_section_exit | ( | environment_t * | env | ) |
Leave a critical section by unlocking the global mutex.
Leave a critical section within an environment.
env | Environment within which we are executing or GLOBAL_ENVIRONMENT. |
int lf_notify_of_event | ( | environment_t * | env | ) |
Notify of new event by broadcasting on a condition variable.
Notify of new event.
env | Environment within which we are executing. |
void lf_print_snapshot | ( | environment_t * | env | ) |
Print a snapshot of the priority queues used during execution (for debugging).
This function implementation will be empty if the NDEBUG macro is defined; that macro is normally defined for release builds.
env | The environment in which we are executing. |
int lf_reactor_c_main | ( | int | argc, |
const char * | argv[] ) |
The main loop of the LF program.
An unambiguous function name that can be called by external libraries.
Note: In target languages that use the C core library, there should be an unambiguous way to execute the LF program's main function that will not conflict with other main functions that might get resolved and linked at compile time.
void lf_request_stop | ( | void | ) |
Request a stop to execution as soon as possible.
In a non-federated execution with only a single enclave, this will occur one microstep later than the current tag. In a federated execution or when there is more than one enclave, it will likely occur at a later tag determined by the RTI so that all federates and enclaves stop at the same tag.
void lf_set_present | ( | lf_port_base_t * | port | ) |
Mark the given port's is_present field as true.
port | A pointer to the port struct as an lf_port_base_t* . |
tag_t send_next_event_tag | ( | environment_t * | env, |
tag_t | tag, | ||
bool | wait_for_reply ) |
In a federated execution with centralized coordination, this function returns a tag that is less than or equal to the specified tag when, as far as the federation is concerned, it is safe to commit to advancing to the returned tag. That is, all incoming network messages with tags less than the returned tag have been received. In unfederated execution or in federated execution with decentralized control, this function returns the specified tag immediately.
env | Environment within which we are executing. |
tag | The tag to which to advance. |
wait_for_reply | If true, wait for the RTI to respond. |
Wait until physical time matches or exceeds the time of the specified tag.
If -fast is given, there will be no wait.
If an event is put on the event queue during the wait, then the wait is interrupted and this function returns false. It also returns false if the timeout time is reached before the wait has completed. Note this this could return true even if the a new event was placed on the queue. This will occur if that event time matches or exceeds the specified time.
The mutex lock associated with the condition argument is assumed to be held by the calling thread. This mutex is released while waiting. If the wait time is too small to actually wait (less than MIN_SLEEP_DURATION), then this function immediately returns true and the mutex is not released.
env | Environment within which we are executing. |
wait_until_time | The time to wait until physical time matches it. |
condition | A condition variable that can interrupt the wait. The mutex associated with this condition variable will be released during the wait. |
void * worker | ( | void * | arg | ) |
Worker thread for the thread pool. Its argument is the environment within which is working The very first worker per environment/enclave is in charge of synchronizing with the other enclaves by getting a TAG to (0,0) this might block until upstream enclaves have finished tag (0,0). This is unlike federated scheduling where each federate will get a PTAG to (0,0) and use network control reactions to handle upstream dependencies
arg | Environment within which the worker should execute. |
lf_mutex_t global_mutex |
Global mutex, used for synchronizing across environments. Mainly used for token-management and tracing
bool lf_stop_requested = false |
True if stop has been requested so it doesn't get re-requested.
|
extern |
The start time read from the trace file.
int worker_thread_count = 0 |
For logging and debugging, each worker thread is numbered.