diff --git a/src/core/federation.ts b/src/core/federation.ts index f0eeea0c9..17f94618a 100644 --- a/src/core/federation.ts +++ b/src/core/federation.ts @@ -1,7 +1,7 @@ import type {Socket, SocketConnectOpts} from "net"; import {createConnection} from "net"; import {EventEmitter} from "events"; -import type {Action, FederateConfig} from "./internal"; +import type {Action, FederateConfig, Mutation, Reaction, Variable} from "./internal"; import { Log, Tag, @@ -11,7 +11,8 @@ import { Alarm, App, TaggedEvent, - FederatePortAction + FederatePortAction, + Reactor } from "./internal"; // ---------------------------------------------------------------------// @@ -282,6 +283,79 @@ function isANodeJSCodedError(e: Error): e is NodeJSCodedError { return typeof (e as NodeJSCodedError).code === "string"; } +/** + * A network reactor is a reactor handling network actions (NetworkReciever and NetworkSender). + */ +export class NetworkReactor extends Reactor { + // TPO level of this NetworkReactor + private readonly tpoLevel: number; + + // Fixme: How to use the appropriate type instead of 'unknown'? + private networkInputAction: FederatePortAction = new FederatePortAction(this, Origin.logical); + + private readonly portID?: number; + + constructor ( + parent: Reactor, + tpoLevel: number, + portID?: number + ) { + super(parent); + this.tpoLevel = tpoLevel; + if (portID !== undefined) { + this.portID = portID; + } + } + + public getTpoLevel(): number { + return this.tpoLevel; + } + + public getPortID(): number | undefined { + return this.portID; + } + + public registerNetworkInputAction(networkInputAction: FederatePortAction): void { + this.networkInputAction = networkInputAction; + } + + public getReactions():Array> { + return this._getReactions(); + } + + public getLastReactioOrMutation():Reaction | undefined { + return this._getLastReactionOrMutation(); + } + + public handlingMessage( + portID: number, + value: T + ):void { + this.networkInputAction + .asSchedulable(this._getKey(this.networkInputAction)) + .schedule(0, value); +} + + public handlingTimedMessage( + portID: number, + value: T, + intendedTag: Tag + ):void { + if (this.networkInputAction.origin === Origin.logical) { + this.networkInputAction + // FIXME: Is this a right way to trigger a federatePortAction in the NetworkReceiver reactor? + .asSchedulable(this._getKey(this.networkInputAction)) + .schedule(0, value, intendedTag); + } else { + // The schedule function for physical actions implements + // Tr = max(r, R + A) + this.networkInputAction + .asSchedulable(this._getKey(this.networkInputAction)) + .schedule(0, value); + } + } +} + /** * An RTIClient is used within a federate to abstract the socket * connection to the RTI and the RTI's binary protocol over the socket. @@ -313,20 +387,20 @@ class RTIClient extends EventEmitter { private readonly federatePortActionByID: Map> = new Map>(); - /** - * Establish the mapping between a federate port's action and its ID. - * @param federatePortID The federate port's ID. - * @param federatePort The federate port's action. - */ - public registerFederatePortAction( - federatePortID: number, - federatePortAction: Action - ): void { - this.federatePortActionByID.set( - federatePortID, - federatePortAction as Action - ); - } + // /** + // * Establish the mapping between a federate port's action and its ID. + // * @param federatePortID The federate port's ID. + // * @param federatePort The federate port's action. + // */ + // public registerFederatePortAction( + // federatePortID: number, + // federatePortAction: Action + // ): void { + // this.federatePortActionByID.set( + // federatePortID, + // federatePortAction as Action + // ); + // } /** * Constructor for an RTIClient @@ -860,9 +934,9 @@ class RTIClient extends EventEmitter { bufferIndex + 9, bufferIndex + 9 + messageLength ); - const destPortAction = - this.federatePortActionByID.get(destPortID); - this.emit("message", destPortAction, messageBuffer); + // const destPort = + // this.federatePortActionByID.get(destPortID); + this.emit("message", destPortID, messageBuffer); } bufferIndex += messageLength + 9; @@ -918,8 +992,8 @@ class RTIClient extends EventEmitter { bufferIndex + 21, bufferIndex + 21 + messageLength ); - const destPort = this.federatePortActionByID.get(destPortID); - this.emit("timedMessage", destPort, messageBuffer, tag); + // const destPort = this.federatePortActionByID.get(destPortID); + this.emit("timedMessage", destPortID, messageBuffer, tag); } bufferIndex += messageLength + 21; @@ -1119,6 +1193,18 @@ export class FederatedApp extends App { */ private readonly rtiClient: RTIClient; + /** + * An array of network receivers + */ + private readonly networkReceivers: NetworkReactor[] = []; + + /** + * An array of network senders + */ + private readonly networkSenders: NetworkReactor[] = []; + + private readonly portAbsentReactions = new Set>(); + /** * Stop request-related information * including the current state and the tag associated with the stop requested or stop granted. @@ -1142,7 +1228,7 @@ export class FederatedApp extends App { private readonly downstreamFedIDs: number[] = []; - private readonly outputControlReactionTriggers: Array> = []; + // private readonly outputControlReactionTriggers: Array> = []; /** * The default value, null, indicates there is no output depending on a physical action. @@ -1169,11 +1255,11 @@ export class FederatedApp extends App { this.minDelayFromPhysicalActionToFederateOutput = minDelay; } - public registerOutputControlReactionTrigger( - outputControlReactionTrigger: Action - ): void { - this.outputControlReactionTriggers.push(outputControlReactionTrigger); - } + // public registerOutputControlReactionTrigger( + // outputControlReactionTrigger: Action + // ): void { + // this.outputControlReactionTriggers.push(outputControlReactionTrigger); + // } /** * Getter for greatestTimeAdvanceGrant @@ -1210,7 +1296,7 @@ export class FederatedApp extends App { } } - //TODO: Add functions for updating MLAA and port statuses + // TODO: Add functions for updating MLAA and port statuses /** * Return whether the next event can be handled, or handling the next event @@ -1378,37 +1464,75 @@ export class FederatedApp extends App { } } + // /** + // * Register a federate port's action with the federate. It must be registered + // * so it is known by the rtiClient and may be scheduled when a message for the + // * port has been received via the RTI. + // * @param federatePortID The designated ID for the federate port. For compatability with the + // * C RTI, the ID must be expressable as a 16 bit unsigned short. The ID must be + // * unique among all port IDs on this federate and be a number between 0 and NUMBER_OF_PORTS - 1 + // * @param federatePort The federate port's action for registration. + // */ + // public registerFederatePortAction( + // federatePortID: number, + // federatePortAction: Action + // ): void { + // this.rtiClient.registerFederatePortAction( + // federatePortID, + // federatePortAction + // ); + // } + /** - * Register a federate port's action with the federate. It must be registered - * so it is known by the rtiClient and may be scheduled when a message for the - * port has been received via the RTI. - * @param federatePortID The designated ID for the federate port. For compatability with the - * C RTI, the ID must be expressable as a 16 bit unsigned short. The ID must be - * unique among all port IDs on this federate and be a number between 0 and NUMBER_OF_PORTS - 1 - * @param federatePort The federate port's action for registration. + * TODO: Add a description + * @param networkReciever */ - public registerFederatePortAction( - federatePortID: number, - federatePortAction: Action + public registerNetworkReciever( + networkReciever: NetworkReactor ): void { - this.rtiClient.registerFederatePortAction( - federatePortID, - federatePortAction - ); + this.networkReceivers.push(networkReciever); } - private _getFederatePortActionKey(federatePortAction: FederatePortAction): symbol | undefined { - if ( - (federatePortAction instanceof FederatePortAction) && - federatePortAction._isContainedByContainerOf(this) - ) { - const owner = federatePortAction.getContainer(); - if (owner !== null) { - return owner._getKey(federatePortAction, this._keyChain.get(owner)); + /** + * TODO: Add a description + * @param networkSender + */ + public registerNetworkSender( + networkSender: NetworkReactor + ): void { + this.networkSenders.push(networkSender); + } + + /** + * TODO: Add a description + */ + protected registerOutputControlReactions(): void { + for (const networkSender of this.networkSenders) { + const lastReactionOrMutation = networkSender.getLastReactioOrMutation(); + if (lastReactionOrMutation !== undefined) { + this.portAbsentReactions.add(lastReactionOrMutation); + } } - } } + protected enqueueOutputControlReactions(): void { + this.portAbsentReactions.forEach(reaction => { + this._reactionQ.push(reaction); + }); + } + + // private _getFederatePortActionKey(federatePortAction: FederatePortAction): symbol | undefined { + // if ( + // (federatePortAction instanceof FederatePortAction) && + // federatePortAction._isContainedByContainerOf(this) + // ) { + // const owner = federatePortAction.getContainer(); + // if (owner !== null) { + // return owner._getKey(federatePortAction, this._keyChain.get(owner)); + // } + // } + // } + /** * Send a message to a potentially remote federate's port via the RTI. This message * is untimed, and will be timestamped by the destination federate when it is received. @@ -1561,6 +1685,24 @@ export class FederatedApp extends App { this.rtiClient.closeRTIConnection(); } + /** + * + */ + _addEdgesForTpoLevels():void { + let networkReactors = this.networkReceivers.concat(this.networkSenders); + networkReactors.sort((a: NetworkReactor, b: NetworkReactor): number => { + return a.getTpoLevel() - b.getTpoLevel(); + }) + + for (let i = 0; i < networkReactors.length - 1; i++) { + for (const upstream of networkReactors[i].getReactions()) { + for (const downstream of networkReactors[i + 1].getReactions()) { + this._dependencyGraph.addEdge(upstream, downstream); + } + } + } + } + /** * @override * Register this federated app with the RTI and request a start time. @@ -1570,6 +1712,8 @@ export class FederatedApp extends App { * time message from the RTI. */ _start(): void { + this._addEdgesForTpoLevels(); + this._analyzeDependencies(); this._loadStartupReactions(); @@ -1612,7 +1756,7 @@ export class FederatedApp extends App { this.rtiClient.on( "message", - (destPortAction: Action, messageBuffer: Buffer) => { + (destPortID: number, messageBuffer: Buffer) => { // Schedule this federate port's action. // This message is untimed, so schedule it immediately. Log.debug(this, () => { @@ -1622,15 +1766,21 @@ export class FederatedApp extends App { // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment const value: T = JSON.parse(messageBuffer.toString()); - destPortAction - .asSchedulable(this._getFederatePortActionKey(destPortAction)) - .schedule(0, value); + for (const candidate of this.networkReceivers) { + if (candidate.getPortID() === destPortID) { + candidate.handlingMessage(destPortID, value); + } + } + + // destPortAction + // .asSchedulable(this._getFederatePortActionKey(destPortAction)) + // .schedule(0, value); } ); this.rtiClient.on( "timedMessage", - (destPortAction: Action, messageBuffer: Buffer, tag: Tag) => { + (destPortID: number, messageBuffer: Buffer, tag: Tag) => { // Schedule this federate port's action. /** @@ -1659,18 +1809,24 @@ export class FederatedApp extends App { // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment const value: T = JSON.parse(messageBuffer.toString()); - if (destPortAction.origin === Origin.logical) { - destPortAction - //FIXME: Is this a right way to trigger a federatePortAction in the NetworkReceiver reactor? - .asSchedulable(this._getFederatePortActionKey(destPortAction)) - .schedule(0, value, tag); - } else { - // The schedule function for physical actions implements - // Tr = max(r, R + A) - destPortAction - .asSchedulable(this._getFederatePortActionKey(destPortAction)) - .schedule(0, value); + for (let candidate of this.networkReceivers) { + if (candidate.getPortID() === destPortID) { + candidate.handlingTimedMessage(destPortID, value, tag); + } } + + // if (destPortAction.origin === Origin.logical) { + // destPortAction + // //FIXME: Is this a right way to trigger a federatePortAction in the NetworkReceiver reactor? + // .asSchedulable(this._getFederatePortActionKey(destPortAction)) + // .schedule(0, value, tag); + // } else { + // // The schedule function for physical actions implements + // // Tr = max(r, R + A) + // destPortAction + // .asSchedulable(this._getFederatePortActionKey(destPortAction)) + // .schedule(0, value); + // } } ); diff --git a/src/core/reactor.ts b/src/core/reactor.ts index cca261202..8a876b05f 100644 --- a/src/core/reactor.ts +++ b/src/core/reactor.ts @@ -152,7 +152,7 @@ export abstract class Reactor extends Component { * Note: declare this class member before any other ones as they may * attempt to access it. */ - protected readonly _keyChain = new Map(); + private readonly _keyChain = new Map(); /** * This graph has in it all the dependencies implied by this container's @@ -784,7 +784,6 @@ export abstract class Reactor extends Component { trigs: Variable[], args: [...ArgList], react: (this: ReactionSandbox, ...args: ArgList) => void, - level?: Number, deadline?: TimeValue, late: (this: ReactionSandbox, ...args: ArgList) => void = () => { Log.global.warn("Deadline violation occurred!"); @@ -1917,6 +1916,12 @@ export class App extends Reactor { ): void { this.app.sendRTIPortAbsent(additionalDelay, destFederateID, destPortID); } + + public registerOutputControlReactions( + + ): void { + this.app.registerOutputControlReactions(); + } })(this); /** @@ -2101,6 +2106,14 @@ export class App extends Reactor { ); } + protected registerOutputControlReactions( + + ): void { + throw new Error( + "Cannot call registerOutputControlReactions from an App. registerOutputControlReactions may be called only from a FederatedApp" + ); + } + /** * The current time, made available so actions may be scheduled relative to it. */ @@ -2145,7 +2158,7 @@ export class App extends Reactor { /** * Priority set that keeps track of reactions at the current Logical time. */ - private readonly _reactionQ = new ReactionQueue(); + protected readonly _reactionQ = new ReactionQueue(); /** * The physical time when execution began relative to January 1, 1970 00:00:00 UTC. @@ -2285,9 +2298,7 @@ export class App extends Reactor { Log.global.debug("Finished handling all events at current time."); } - // protected enqueueNetworkOutputControlReactions(): void { - // return undefined; - // } + protected enqueueOutputControlReactions(): void { } /** * Handle the next events on the event queue. @@ -2386,8 +2397,8 @@ export class App extends Reactor { nextEvent != null && this._currentTag.isSimultaneousWith(nextEvent.tag) ); - // // enqueue networkOutputControlReactions - // this.enqueueNetworkOutputControlReactions(); + // enqueue networkOutputControlReactions + this.enqueueOutputControlReactions(); // React to all the events loaded onto the reaction queue. this._react(); @@ -2670,7 +2681,7 @@ export class App extends Reactor { Log.info(this, () => `>>> Start of execution: ${this._currentTag}`); Log.info(this, () => Log.hr); // enqueue networkOutputControlReactions - // this.enqueueNetworkOutputControlReactions(); + this.enqueueOutputControlReactions(); // Handle the reactions that were loaded onto the reaction queue. this._react();