-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathfederation.ts
1336 lines (1195 loc) · 58.4 KB
/
federation.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import {Socket, createConnection, SocketConnectOpts} from 'net'
import {EventEmitter} from 'events';
import {
Log, Tag, TimeValue, Origin, getCurrentPhysicalTime, Alarm,
Present, App, Action, TaggedEvent
} from './internal';
//---------------------------------------------------------------------//
// Federated Execution Constants and Enums //
//---------------------------------------------------------------------//
// FIXME: For now this constant is unused.
/**
* Size of the buffer used for messages sent between federates.
* This is used by both the federates and the rti, so message lengths
* should generally match.
*/
export const BUFFER_SIZE: number = 256;
/**
* Number of seconds that elapse between a federate's attempts
* to connect to the RTI.
*/
export const CONNECT_RETRY_INTERVAL: TimeValue = TimeValue.secs(2);
/**
* Bound on the number of retries to connect to the RTI.
* A federate will retry every CONNECT_RETRY_INTERVAL seconds
* this many times before giving up. E.g., 500 retries every
* 2 seconds results in retrying for about 16 minutes.
*/
export const CONNECT_NUM_RETRIES: number = 500;
/**
* Message types defined for communication between a federate and the
* RTI (Run Time Infrastructure).
* In the C reactor target these message types are encoded as an unsigned char,
* so to maintain compatability in TypeScript the magnitude must not exceed 255
*/
enum RTIMessageTypes {
/**
* Byte identifying a rejection of the previously received message.
* The reason for the rejection is included as an additional byte
* (uchar) (see below for encodings of rejection reasons).
*/
MSG_TYPE_REJECT = 0,
/**
* Byte identifying a message from a federate to an RTI containing
* the federation ID and the federate ID. The message contains, in
* this order:
* * One byte equal to MSG_TYPE_FED_IDS.
* * Two bytes (ushort) giving the federate ID.
* * One byte (uchar) giving the length N of the federation ID.
* * N bytes containing the federation ID.
* Each federate needs to have a unique ID between 0 and
* NUMBER_OF_FEDERATES-1.
* Each federate, when starting up, should send this message
* to the RTI. This is its first message to the RTI.
* The RTI will respond with either MSG_TYPE_REJECT, MSG_TYPE_ACK, or MSG_TYPE_UDP_PORT.
* If the federate is a C target LF program, the generated federate
* code does this by calling synchronize_with_other_federates(),
* passing to it its federate ID.
*/
MSG_TYPE_FED_IDS = 1,
/**
* Byte identifying a timestamp message, which is 64 bits long.
* Each federate sends its starting physical time as a message of this
* type, and the RTI broadcasts to all the federates the starting logical
* time as a message of this type.
*/
MSG_TYPE_TIMESTAMP = 2,
/**
* Byte identifying a message to forward to another federate.
* The next two bytes will be the ID of the destination port.
* The next two bytes are the destination federate ID.
* The four bytes after that will be the length of the message.
* The remaining bytes are the message.
* NOTE: This is currently not used. All messages are tagged, even
* on physical connections, because if "after" is used, the message
* may preserve the logical timestamp rather than using the physical time.
*/
MSG_TYPE_MESSAGE = 3,
/**
* Byte identifying that the federate is ending its execution.
*/
MSG_TYPE_RESIGN = 4,
/**
* Byte identifying a timestamped message to forward to another federate.
* The next two bytes will be the ID of the destination reactor port.
* The next two bytes are the destination federate ID.
* The four bytes after that will be the length of the message.
* The next eight bytes will be the timestamp of the message.
* The next four bytes will be the microstep of the message.
* The remaining bytes are the message.
*
* With centralized coordination, all such messages flow through the RTI.
* With decentralized coordination, tagged messages are sent peer-to-peer
* between federates and are marked with MSG_TYPE_P2P_TAGGED_MESSAGE.
*/
MSG_TYPE_TAGGED_MESSAGE = 5,
/**
* Byte identifying a next event tag (NET) message sent from a federate
* in centralized coordination.
* The next eight bytes will be the timestamp.
* The next four bytes will be the microstep.
* This message from a federate tells the RTI the tag of the earliest event
* on that federate's event queue. In other words, absent any further inputs
* from other federates, this will be the least tag of the next set of
* reactions on that federate. If the event queue is empty and a timeout
* time has been specified, then the timeout time will be sent. If there is
* no timeout time, then FOREVER will be sent. Note that this message should
* not be sent if there are physical actions and the earliest event on the event
* queue has a tag that is ahead of physical time (or the queue is empty).
* In that case, send TAN instead.
*/
MSG_TYPE_NEXT_EVENT_TAG = 6,
/**
* Byte identifying a time advance grant (TAG) sent by the RTI to a federate
* in centralized coordination. This message is a promise by the RTI to the federate
* that no later message sent to the federate will have a tag earlier than or
* equal to the tag carried by this TAG message.
* The next eight bytes will be the timestamp.
* The next four bytes will be the microstep.
*/
MSG_TYPE_TAG_ADVANCE_GRANT = 7,
/**
* Byte identifying a provisional time advance grant (PTAG) sent by the RTI to a federate
* in centralized coordination. This message is a promise by the RTI to the federate
* that no later message sent to the federate will have a tag earlier than the tag
* carried by this PTAG message.
* The next eight bytes will be the timestamp.
* The next four bytes will be the microstep.
*/
MSG_TYPE_PROVISIONAL_TAG_ADVANCE_GRANT = 8,
/**
* Byte identifying a logical tag complete (LTC) message sent by a federate
* to the RTI.
* The next eight bytes will be the timestep of the completed tag.
* The next four bytes will be the microsteps of the completed tag.
*/
MSG_TYPE_LOGICAL_TAG_COMPLETE = 9,
// For more information on the algorithm for stop request protocol, please see following link:
// https://github.com/lf-lang/lingua-franca/wiki/Federated-Execution-Protocol#overview-of-the-algorithm
/**
* Byte identifying a stop request. This message is first sent to the RTI by a federate
* that would like to stop execution at the specified tag. The RTI will forward
* the MSG_TYPE_STOP_REQUEST to all other federates. Those federates will either agree to
* the requested tag or propose a larger tag. The RTI will collect all proposed
* tags and broadcast the largest of those to all federates. All federates
* will then be expected to stop at the granted tag.
*
* The next 8 bytes will be the timestamp.
* The next 4 bytes will be the microstep.
*
* NOTE: The RTI may reply with a larger tag than the one specified in this message.
* It has to be that way because if any federate can send a MSG_TYPE_STOP_REQUEST message
* that specifies the stop time on all other federates, then every federate
* depends on every other federate and time cannot be advanced.
* Hence, the actual stop time may be nondeterministic.
*
* If, on the other hand, the federate requesting the stop is upstream of every
* other federate, then it should be possible to respect its requested stop tag.
*/
MSG_TYPE_STOP_REQUEST = 10,
/**
* Byte indicating a federate's reply to a MSG_TYPE_STOP_REQUEST that was sent
* by the RTI. The payload is a proposed stop tag that is at least as large
* as the one sent to the federate in a MSG_TYPE_STOP_REQUEST message.
*
* The next 8 bytes will be the timestamp.
* The next 4 bytes will be the microstep.
*/
MSG_TYPE_STOP_REQUEST_REPLY = 11,
/**
* Byte sent by the RTI indicating that the stop request from some federate
* has been granted. The payload is the tag at which all federates have
* agreed that they can stop.
* The next 8 bytes will be the time at which the federates will stop.
* The next 4 bytes will be the microstep at which the federates will stop.
*/
MSG_TYPE_STOP_GRANTED = 12,
/**
* A message that informs the RTI about connections between this federate and
* other federates where messages are routed through the RTI. Currently, this
* only includes logical connections when the coordination is centralized. This
* information is needed for the RTI to perform the centralized coordination.
*
* @note Only information about the immediate neighbors is required. The RTI can
* transitively obtain the structure of the federation based on each federate's
* immediate neighbor information.
*
* The next 4 bytes is the number of upstream federates.
* The next 4 bytes is the number of downstream federates.
*
* Depending on the first four bytes, the next bytes are pairs of (fed ID (2
* bytes), delay (8 bytes)) for this federate's connection to upstream federates
* (by direct connection). The delay is the minimum "after" delay of all
* connections from the upstream federate.
*
* Depending on the second four bytes, the next bytes are fed IDs (2
* bytes each), of this federate's downstream federates (by direct connection).
*
* @note The upstream and downstream connections are transmitted on the same
* message to prevent (at least to some degree) the scenario where the RTI has
* information about one, but not the other (which is a critical error).
*/
MSG_TYPE_NEIGHBOR_STRUCTURE = 24,
/**
* Byte identifying an acknowledgment of the previously received MSG_TYPE_FED_IDS message
* sent by the RTI to the federate
* with a payload indicating the UDP port to use for clock synchronization.
* The next four bytes will be the port number for the UDP server, or
* 0 or USHRT_MAX if there is no UDP server. 0 means that initial clock synchronization
* is enabled, whereas USHRT_MAX mean that no synchronization should be performed at all.
*/
MSG_TYPE_UDP_PORT = 254,
/**
* Byte identifying an acknowledgment of the previously received message.
* This message carries no payload.
*/
MSG_TYPE_ACK = 255
}
//---------------------------------------------------------------------//
// Federated Execution Classes //
//---------------------------------------------------------------------//
// FIXME: add "FederatedApp" and other class names here
// to the prohibited list of LF names.
/**
* Node.js doesn't export a type for errors with a code,
* so this is a workaround for typing such an Error.
*/
interface NodeJSCodedError extends Error{
code: string;
}
/**
* Custom type guard for a NodeJsCodedError
* @param e The Error to be tested as being a NodeJSCodedError
*/
function isANodeJSCodedError(e: Error): e is NodeJSCodedError {
return (typeof (e as NodeJSCodedError).code === 'string');
}
/**
* An RTIClient is used within a federate to abstract the socket
* connection to the RTI and the RTI's binary protocol over the socket.
* RTIClient exposes functions for federate-level operations like
* establishing a connection to the RTI or sending a message.
* RTIClient is an EventEmitter, and asynchronously emits events for:
* 'startTime', 'connected', 'message', 'timedMessage', and
* 'timeAdvanceGrant'. The federatedApp is responsible for handling the
* events to ensure a correct exeuction.
*/
class RTIClient extends EventEmitter {
// ID of federation that this federate will join.
private federationID:string;
// ID of this federate.
private id:number;
// The socket descriptor for communicating with this federate.
private socket: Socket | null = null;
// The mapping between a federate port ID and the federate port action
// scheduled upon reception of a message designated for that federate port.
/**
* A mapping from port IDs to FederatePortAction instances. Unfortunately, the data type of the action has to be `any`,
* meaning that the type checker cannot check whether uses of the action are type safe.
* In an alternative design, type information might be preserved. TODO(marten): Look into this.
*/
private federatePortActionByID: Map<number, Action<any>> = new Map<number, Action<any>>();
/**
* Establish the mapping between a federate port's action and its ID.
* @param federatePortID The federate port's ID.
* @param federatePort The federate port's action.
*/
public registerFederatePortAction<T extends Present>(federatePortID: number, federatePortAction: Action<T>) {
this.federatePortActionByID.set(federatePortID, federatePortAction);
}
/**
* Constructor for an RTIClient
* @param id The ID of the federate this client communicates
* on behalf of.
*/
public constructor (federationID: string, id: number) {
super();
this.federationID = federationID;
this.id = id;
}
// If the last data sent to handleSocketData contained an incomplete
// or chunked message, that data is copied over to chunkedBuffer so it can
// be saved until the next time handleSocketData is called. If no data has been
// saved, chunkedBuffer is null.
private chunkedBuffer : Buffer | null = null;
// The number of attempts made by this federate to connect to the RTI.
private connectionAttempts = 0;
/**
* Create a socket connection to the RTI and register this federate's
* ID with the RTI. If unable to make a connection, retry.
* @param port The RTI's remote port number.
* @param host The RTI's remote host name.
*/
public connectToRTI(port: number, host: string) {
// Create an IPv4 socket for TCP (not UDP) communication over IP (0)
let thiz = this;
const options: SocketConnectOpts = {
"port": port,
"family": 4, // IPv4,
"localAddress": "0.0.0.0", // All interfaces, 0.0.0.0.
"host": host
}
this.socket = createConnection(options, () => {
// This function is a listener to the 'connection' socket
// event.
// Only set up an event handler for close if the connection is
// created. Otherwise this handler will go off on every reconnection
// attempt.
this.socket?.on('close', () => {
Log.info(this, () => {return 'RTI socket has closed.'});
});
Log.debug(this, () => {return `Federate ID: ${this.id} connected to RTI.`});
// Immediately send a federate ID message after connecting.
const buffer = Buffer.alloc(4);
buffer.writeUInt8(RTIMessageTypes.MSG_TYPE_FED_IDS, 0);
buffer.writeUInt16LE(this.id, 1);
buffer.writeUInt8(this.federationID.length, 3);
try {
Log.debug(this, () => {return `Sending a FED ID message (ID: ${this.federationID}) to the RTI.`});
this.socket?.write(buffer);
this.socket?.write(this.federationID);
} catch (e) {
Log.error(this, () => {return `${e}`});
}
// Finally, emit a connected event.
this.emit('connected');
});
this.socket?.on('data', thiz.handleSocketData.bind(thiz));
// If the socket reports a connection refused error,
// suppress the message and try to reconnect.
this.socket?.on('error', (err: Error ) => {
if (isANodeJSCodedError(err) && err.code === 'ECONNREFUSED' ) {
Log.info(this, () => {
return `Failed to connect to RTI with error: ${err}.`
})
if (this.connectionAttempts < CONNECT_NUM_RETRIES) {
Log.info(this, () => {return `Retrying RTI connection in ${CONNECT_RETRY_INTERVAL}.`})
this.connectionAttempts++;
let a = new Alarm();
a.set(this.connectToRTI.bind(this, port, host), CONNECT_RETRY_INTERVAL)
} else {
Log.error(this, () => {return `Could not connect to RTI after ${CONNECT_NUM_RETRIES} attempts.`})
}
} else {
Log.error(this, () => {return err.toString()})
}
});
}
/**
* Destroy the RTI Client's socket connection to the RTI.
*/
public closeRTIConnection() {
Log.debug( this, () => {return 'Closing RTI connection by destroying and unrefing socket.'});
this.socket?.destroy();
this.socket?.unref(); // Allow the program to exit
}
public sendNeighborStructure(upstreamFedIDs: number[], upstreamFedDelays: TimeValue[], downstreamFedIDs: number[]) {
let msg = Buffer.alloc(9 + upstreamFedIDs.length * 10 + downstreamFedIDs.length * 2);
msg.writeUInt8(RTIMessageTypes.MSG_TYPE_NEIGHBOR_STRUCTURE);
msg.writeUInt32LE(upstreamFedIDs.length, 1);
msg.writeUInt32LE(downstreamFedIDs.length, 5);
let bufferIndex = 9;
for (let i = 0; i < upstreamFedIDs.length; i++) {
msg.writeUInt16LE(upstreamFedIDs[i], bufferIndex);
let delay = upstreamFedDelays[i].toBinary();
delay.copy(msg, bufferIndex + 2);
bufferIndex += 10;
}
for (let i = 0; i < downstreamFedIDs.length; i++) {
msg.writeUInt16LE(downstreamFedIDs[i], bufferIndex);
bufferIndex += 2;
}
try {
this.socket?.write(msg);
} catch (e) {
Log.error(this, () => {return `${e}`});
}
}
public sendUDPPortNumToRTI(udpPort: number) {
let msg = Buffer.alloc(3);
msg.writeUInt8(RTIMessageTypes.MSG_TYPE_UDP_PORT, 0);
msg.writeUInt16BE(udpPort, 1);
try {
this.socket?.write(msg);
} catch (e) {
Log.error(this, () => {return `${e}`});
}
}
/**
* Send the specified TimeValue to the RTI and set up
* a handler for the response.
* The specified TimeValue should be current physical time of the
* federate, and the response will be the designated start time for
* the federate. May only be called after the federate emits a
* 'connected' event. When the RTI responds, this federate will
* emit a 'startTime' event.
* @param myPhysicalTime The physical time at this federate.
*/
public requestStartTimeFromRTI(myPhysicalTime: TimeValue) {
let msg = Buffer.alloc(9)
msg.writeUInt8(RTIMessageTypes.MSG_TYPE_TIMESTAMP, 0);
let time = myPhysicalTime.toBinary();
time.copy(msg, 1);
try {
Log.debug(this, () => {return `Sending RTI start time: ${myPhysicalTime}`});
this.socket?.write(msg);
} catch (e) {
Log.error(this, () => {return `${e}`});
}
}
/**
* Send an RTI (untimed) message to a remote federate.
* @param data The message encoded as a Buffer. The data may be
* arbitrary length.
* @param destFederateID The federate ID of the federate
* to which this message should be sent.
* @param destPortID The port ID for the port on the destination
* federate to which this message should be sent.
*/
public sendRTIMessage<T extends Present>(data: T, destFederateID: number, destPortID: number) {
const value = Buffer.from(JSON.stringify(data), "utf-8");
let msg = Buffer.alloc(value.length + 9);
msg.writeUInt8(RTIMessageTypes.MSG_TYPE_MESSAGE, 0);
msg.writeUInt16LE(destPortID, 1);
msg.writeUInt16LE(destFederateID, 3);
msg.writeUInt32LE(value.length, 5);
value.copy(msg, 9); // Copy data into the message
try {
Log.debug(this, () => {return `Sending RTI (untimed) message to `
+ `federate ID: ${destFederateID} and port ID: ${destPortID}.`});
this.socket?.write(msg);
} catch (e) {
Log.error(this, () => {return `${e}`});
}
}
/**
* Send an RTI timed message to a remote federate.
* @param data The message encoded as a Buffer. The data may be
* arbitrary length.
* @param destFederateID The federate ID of the federate
* to which this message should be sent.
* @param destPortID The port ID for the port on the destination
* federate to which this message should be sent.
* @param time The time of the message encoded as a 64 bit little endian
* unsigned integer in a Buffer.
*/
public sendRTITimedMessage<T extends Present>(data: T, destFederateID: number, destPortID: number, time: Buffer) {
const value = Buffer.from(JSON.stringify(data), "utf-8");
let msg = Buffer.alloc(value.length + 21);
msg.writeUInt8(RTIMessageTypes.MSG_TYPE_TAGGED_MESSAGE, 0);
msg.writeUInt16LE(destPortID, 1);
msg.writeUInt16LE(destFederateID, 3);
msg.writeUInt32LE(value.length, 5);
time.copy(msg, 9); // Copy the current time into the message
// FIXME: Add microstep properly.
value.copy(msg, 21); // Copy data into the message
try {
Log.debug(this, () => {return `Sending RTI (timed) message to `
+ `federate ID: ${destFederateID}, port ID: ${destPortID} `
+ `, time: ${time.toString('hex')}.`});
this.socket?.write(msg);
} catch (e) {
Log.error(this, () => {return `${e}`});
}
}
/**
* Send the RTI a logical time complete message. This should be
* called when the federate has completed all events for a given
* logical time.
* @param completeTime The logical time that is complete. The time
* should be encoded as a 64 bit little endian unsigned integer in
* a Buffer.
*/
public sendRTILogicalTimeComplete(completeTime: Buffer) {
let msg = Buffer.alloc(13);
msg.writeUInt8(RTIMessageTypes.MSG_TYPE_LOGICAL_TAG_COMPLETE, 0);
completeTime.copy(msg, 1);
// FIXME: Add microstep properly.
try {
Log.debug(this, () => {return "Sending RTI logical time complete: " + completeTime.toString('hex');});
this.socket?.write(msg);
} catch (e) {
Log.error(this, () => {return `${e}`});
}
}
/**
* Send the RTI a resign message. This should be called when
* the federate is shutting down.
*/
public sendRTIResign() {
let msg = Buffer.alloc(1);
msg.writeUInt8(RTIMessageTypes.MSG_TYPE_RESIGN, 0);
try {
Log.debug(this, () => {return "Sending RTI resign.";});
this.socket?.write(msg);
} catch (e) {
Log.error(this, () => {return `${e}`});
}
}
/**
* Send the RTI a next event tag message. This should be called when
* the federate would like to advance logical time, but has not yet
* received a sufficiently large time advance grant.
* @param nextTag The time of the message encoded as a 64 bit unsigned
* integer in a Buffer.
*/
public sendRTINextEventTag(nextTag: Buffer) {
let msg = Buffer.alloc(13);
msg.writeUInt8(RTIMessageTypes.MSG_TYPE_NEXT_EVENT_TAG, 0);
nextTag.copy(msg,1);
try {
Log.debug(this, () => {return "Sending RTI Next Event Time.";});
this.socket?.write(msg);
} catch (e) {
Log.error(this, () => {return `${e}`});
}
}
/**
* Send the RTI a stop request message.
*/
public sendRTIStopRequest(stopTag: Buffer) {
let msg = Buffer.alloc(13);
msg.writeUInt8(RTIMessageTypes.MSG_TYPE_STOP_REQUEST, 0);
stopTag.copy(msg, 1);
try {
Log.debug(this, () => {return "Sending RTI Stop Request.";});
this.socket?.write(msg);
} catch (e) {
Log.error(this, () => {return `${e}`});
}
}
/**
* Send the RTI a stop request reply message.
*/
public sendRTIStopRequestReply(stopTag: Buffer) {
let msg = Buffer.alloc(13);
msg.writeUInt8(RTIMessageTypes.MSG_TYPE_STOP_REQUEST_REPLY, 0);
stopTag.copy(msg, 1);
try {
Log.debug(this, () => {return "Sending RTI Stop Request Reply.";});
this.socket?.write(msg);
} catch (e) {
Log.error(this, () => {return `${e}`});
}
}
/**
* The handler for the socket's data event.
* The data Buffer given to the handler may contain 0 or more complete messages.
* Iterate through the complete messages, and if the last message is incomplete
* save it as thiz.chunkedBuffer so it can be prepended onto the
* data when handleSocketData is called again.
* @param assembledData The Buffer of data received by the socket. It may
* contain 0 or more complete messages.
*/
private handleSocketData(data: Buffer) {
let thiz = this;
if (data.length < 1) {
throw new Error( `Received a message from the RTI with 0 length.`);
}
// Used to track the current location within the data Buffer.
let bufferIndex = 0;
// Append the new data to leftover data from chunkedBuffer (if any)
// The result is assembledData.
let assembledData: Buffer;
if (thiz.chunkedBuffer) {
assembledData = Buffer.alloc(thiz.chunkedBuffer.length + data.length);
thiz.chunkedBuffer.copy(assembledData, 0, 0, thiz.chunkedBuffer.length);
data.copy(assembledData, thiz.chunkedBuffer.length);
thiz.chunkedBuffer = null;
} else {
assembledData = data;
}
Log.debug(thiz, () => {return `Assembled data is: ${assembledData.toString('hex')}`});
while (bufferIndex < assembledData.length) {
let messageTypeByte = assembledData[bufferIndex]
switch (messageTypeByte) {
case RTIMessageTypes.MSG_TYPE_FED_IDS: {
// MessageType: 1 byte.
// Federate ID: 2 bytes long.
// Should never be received by a federate.
Log.error(thiz, () => {return "Received MSG_TYPE_FED_IDS message from the RTI."});
throw new Error('Received a MSG_TYPE_FED_IDS message from the RTI. '
+ 'MSG_TYPE_FED_IDS messages may only be sent by federates');
break;
}
case RTIMessageTypes.MSG_TYPE_TIMESTAMP: {
// MessageType: 1 byte.
// Timestamp: 8 bytes.
let incomplete = assembledData.length < 9 + bufferIndex;
if (incomplete) {
thiz.chunkedBuffer = Buffer.alloc(assembledData.length - bufferIndex);
assembledData.copy(thiz.chunkedBuffer, 0, bufferIndex)
} else {
let timeBuffer = Buffer.alloc(8);
assembledData.copy(timeBuffer, 0, bufferIndex + 1, bufferIndex + 9 );
let startTime = TimeValue.fromBinary(timeBuffer);
Log.debug(thiz, () => { return "Received MSG_TYPE_TIMESTAMP buffer from the RTI " +
`with startTime: ${timeBuffer.toString('hex')}`;
})
Log.debug(thiz, () => { return "Received MSG_TYPE_TIMESTAMP message from the RTI " +
`with startTime: ${startTime}`;
})
thiz.emit('startTime', startTime);
}
bufferIndex += 9;
break;
}
case RTIMessageTypes.MSG_TYPE_MESSAGE: {
// MessageType: 1 byte.
// Message: The next two bytes will be the ID of the destination port
// The next two bytes are the destination federate ID (which can be ignored).
// The next four bytes after that will be the length of the message
// The remaining bytes are the message.
let incomplete = assembledData.length < 9 + bufferIndex;
if (incomplete) {
thiz.chunkedBuffer = Buffer.alloc(assembledData.length - bufferIndex);
assembledData.copy(thiz.chunkedBuffer, 0, bufferIndex)
bufferIndex += 9;
} else {
let destPortID = assembledData.readUInt16LE(bufferIndex + 1);
let messageLength = assembledData.readUInt32LE(bufferIndex + 5);
// Once the message length is parsed, we can determine whether
// the body of the message has been chunked.
let isChunked = messageLength > (assembledData.length - (bufferIndex + 9));
if (isChunked) {
// Copy the unprocessed remainder of assembledData into chunkedBuffer
thiz.chunkedBuffer = Buffer.alloc(assembledData.length - bufferIndex);
assembledData.copy(thiz.chunkedBuffer, 0, bufferIndex);
} else {
// Finish processing the complete message.
let messageBuffer = Buffer.alloc(messageLength);
assembledData.copy(messageBuffer, 0, bufferIndex + 9, bufferIndex + 9 + messageLength);
let destPortAction = thiz.federatePortActionByID.get(destPortID);
thiz.emit('message', destPortAction, messageBuffer);
}
bufferIndex += messageLength + 9;
}
break;
}
case RTIMessageTypes.MSG_TYPE_TAGGED_MESSAGE: {
// MessageType: 1 byte.
// The next two bytes will be the ID of the destination port.
// The next two bytes are the destination federate ID.
// The next four bytes after that will be the length of the message
// The next eight bytes will be the timestamp.
// The next four bytes will be the microstep of the message.
// The remaining bytes are the message.
let incomplete = assembledData.length < 21 + bufferIndex;
if (incomplete) {
thiz.chunkedBuffer = Buffer.alloc(assembledData.length - bufferIndex);
assembledData.copy(thiz.chunkedBuffer, 0, bufferIndex)
bufferIndex += 21;
} else {
let destPortID = assembledData.readUInt16LE(bufferIndex + 1);
let messageLength = assembledData.readUInt32LE(bufferIndex + 5);
let tagBuffer = Buffer.alloc(12);
assembledData.copy(tagBuffer, 0, bufferIndex + 9, bufferIndex + 21);
let tag = Tag.fromBinary(tagBuffer);
Log.debug(thiz, () => {return `Received an RTI MSG_TYPE_TAGGED_MESSAGE: Tag Buffer: ${tag}`});
// FIXME: Process microstep properly.
let isChunked = messageLength > (assembledData.length - (bufferIndex + 21));
if (isChunked) {
// Copy the unprocessed remainder of assembledData into chunkedBuffer
thiz.chunkedBuffer = Buffer.alloc(assembledData.length - bufferIndex);
assembledData.copy(thiz.chunkedBuffer, 0, bufferIndex)
} else {
// Finish processing the complete message.
let messageBuffer = Buffer.alloc(messageLength);
assembledData.copy(messageBuffer, 0, bufferIndex + 21, bufferIndex + 21 + messageLength);
let destPort = thiz.federatePortActionByID.get(destPortID);
thiz.emit('timedMessage', destPort, messageBuffer, tag);
}
bufferIndex += messageLength + 21;
break;
}
}
// FIXME: It's unclear what should happen if a federate gets this
// message.
case RTIMessageTypes.MSG_TYPE_RESIGN: {
// MessageType: 1 byte.
Log.debug(thiz, () => {return 'Received an RTI MSG_TYPE_RESIGN.'});
Log.error(thiz, () => {return 'FIXME: No functionality has '
+ 'been implemented yet for a federate receiving a MSG_TYPE_RESIGN message from '
+ 'the RTI'});
bufferIndex += 1;
break;
}
case RTIMessageTypes.MSG_TYPE_NEXT_EVENT_TAG: {
// MessageType: 1 byte.
// Timestamp: 8 bytes.
// Microstep: 4 bytes.
Log.error(thiz, () => {return 'Received an RTI MSG_TYPE_NEXT_EVENT_TAG. This message type '
+ 'should not be received by a federate'});
bufferIndex += 13;
break;
}
case RTIMessageTypes.MSG_TYPE_TAG_ADVANCE_GRANT: {
// MessageType: 1 byte.
// Timestamp: 8 bytes.
// Microstep: 4 bytes.
Log.debug(thiz, () => {return 'Received an RTI MSG_TYPE_TAG_ADVANCE_GRANT'});
let tagBuffer = Buffer.alloc(12);
assembledData.copy(tagBuffer, 0, bufferIndex + 1, bufferIndex + 13);
let tag = Tag.fromBinary(tagBuffer);
thiz.emit('timeAdvanceGrant', tag);
bufferIndex += 13;
break;
}
case RTIMessageTypes.MSG_TYPE_PROVISIONAL_TAG_ADVANCE_GRANT: {
Log.debug(thiz, () => {return 'Received an RTI MSG_TYPE_PROVISIONAL_TAG_ADVANCE_GRANT'});
let tagBuffer = Buffer.alloc(12);
assembledData.copy(tagBuffer, 0, bufferIndex + 1, bufferIndex + 13);
let tag = Tag.fromBinary(tagBuffer);
Log.debug(thiz, () => {return `PTAG value: ${tag}`});
thiz.emit('provisionalTimeAdvanceGrant', tag);
bufferIndex += 13;
break;
}
case RTIMessageTypes.MSG_TYPE_LOGICAL_TAG_COMPLETE: {
// Logial Time Complete: The next eight bytes will be the timestamp.
Log.error(thiz, () => {return 'Received an RTI MSG_TYPE_LOGICAL_TAG_COMPLETE. This message type '
+ 'should not be received by a federate'});
bufferIndex += 13;
break;
}
case RTIMessageTypes.MSG_TYPE_STOP_REQUEST: {
// The next 8 bytes will be the timestamp.
// The next 4 bytes will be the microstep.
Log.debug(thiz, () => {return 'Received an RTI MSG_TYPE_STOP_REQUEST'});
let tagBuffer = Buffer.alloc(12);
assembledData.copy(tagBuffer, 0, bufferIndex + 1, bufferIndex + 13);
let tag = Tag.fromBinary(tagBuffer);
thiz.emit('stopRequest', tag);
bufferIndex += 13;
break;
}
case RTIMessageTypes.MSG_TYPE_STOP_GRANTED: {
// The next 8 bytes will be the time at which the federates will stop.
// The next 4 bytes will be the microstep at which the federates will stop.
Log.debug(thiz, () => {return 'Received an RTI MSG_TYPE_STOP_GRANTED'});
let tagBuffer = Buffer.alloc(12);
assembledData.copy(tagBuffer, 0, bufferIndex + 1, bufferIndex + 13);
let tag = Tag.fromBinary(tagBuffer);
thiz.emit(`stopRequestGranted`, tag);
bufferIndex += 13;
break;
}
case RTIMessageTypes.MSG_TYPE_ACK: {
Log.debug(thiz, () => {return 'Received an RTI MSG_TYPE_ACK'});
bufferIndex += 1;
break;
}
case RTIMessageTypes.MSG_TYPE_REJECT: {
let rejectionReason = assembledData.readUInt8(bufferIndex + 1);
Log.error(thiz, () => {return 'Received an RTI MSG_TYPE_REJECT. Rejection reason: ' + rejectionReason});
bufferIndex += 2;
break;
}
default: {
throw new Error(`Unrecognized message type in message from the RTI: ${assembledData.toString('hex')}.`)
}
}
}
Log.debug(thiz, () => {return 'exiting handleSocketData'})
}
}
/**
* Enum type to store the state of stop request.
* */
enum StopRequestState {
NOT_SENT,
SENT,
GRANTED
}
/**
* Class for storing stop request-related information
* including the current state and the tag associated with the stop requested or stop granted.
*/
class StopRequestInfo {
constructor(state: StopRequestState, tag: Tag | null) {
this.state = state;
this.tag = tag;
}
readonly state: StopRequestState;
readonly tag: Tag | null;
}
/**
* A federated app is an app containing federates as its top level reactors.
* A federate is a component in a distributed reactor execution in which
* reactors from the same (abstract) model run in distinct networked processes.
* A federated app contains the federates designated to run in a particular
* process. The federated program is coordinated by the RTI (Run Time Infrastructure).
* Like an app, a federated app is the top level reactor for a particular process,
* but a federated app must follow the direction of the RTI for beginning execution,
* advancing time, and exchanging messages with other federates.
*
* Note: There is no special class for a federate. A federate is the name for a top
* level reactor of a federated app.
*/
export class FederatedApp extends App {
/**
* A federate's rtiClient establishes the federate's connection to
* the RTI (Run Time Infrastructure). When socket events occur,
* the rtiClient processes socket-level data into events it emits at the
* Federate's level of abstraction.
*/
private rtiClient: RTIClient;
/**
* If a federated app uses logical connections, its execution
* with respect to time advancement must be sychronized with the RTI.
* If this variable is true, logical time in this federate
* cannot advance beyond the time given in the greatest Time Advance Grant
* sent from the RTI.
*/
private rtiSynchronized: boolean = false;
/**
* Stop request-related information
* including the current state and the tag associated with the stop requested or stop granted.
*/
private stopRequestInfo: StopRequestInfo = new StopRequestInfo(StopRequestState.NOT_SENT, null);
/**
* The largest time advance grant received so far from the RTI,
* or null if no time advance grant has been received yet.
* An RTI synchronized Federate cannot advance its logical time
* beyond this value.
*/
private greatestTimeAdvanceGrant: Tag | null = null;
private upstreamFedIDs: number[] = [];
private upstreamFedDelays: TimeValue[] = [];
private downstreamFedIDs: number[] = [];
/**
* The default value, null, indicates there is no output depending on a physical action.
*/
private minDelayFromPhysicalActionToFederateOutput: TimeValue | null = null;
public addUpstreamFederate(fedID: number, fedDelay: TimeValue) {
this.upstreamFedIDs.push(fedID);
this.upstreamFedDelays.push(fedDelay);
this._isLastTAGProvisional = true;
}
public addDownstreamFederate(fedID: number) {
this.downstreamFedIDs.push(fedID);
}
public setMinDelayFromPhysicalActionToFederateOutput(minDelay: TimeValue) {
this.minDelayFromPhysicalActionToFederateOutput = minDelay;
}
/**
* Getter for rtiSynchronized
*/
public _isRTISynchronized() {
return this.rtiSynchronized;
}
/**
* Getter for greatestTimeAdvanceGrant
*/
public _getGreatestTimeAdvanceGrant() {
return this.greatestTimeAdvanceGrant;
}
/**
* @override
* Send RTI the MSG_STOP_REQUEST
* Setting greatest time advance grant needs to modify or remove
*/
protected _shutdown(): void {
// Ignore federatate's _shutdown call if stop is requested.
// The final shutdown should be done by calling super._shutdown.
if (this.stopRequestInfo.state !== StopRequestState.NOT_SENT) {
Log.global.debug("Ignoring FederatedApp._shutdown() as stop is already requested to RTI.");
return;
}
let endTag = this._getEndOfExecution();
if (endTag === undefined || this.util.getCurrentTag().isSmallerThan(endTag)) {
this.sendRTIStopRequest(this.util.getCurrentTag().getMicroStepsLater(1));
} else {
Log.global.debug(`Ignoring FederatedApp._shutdown() since EndOfExecution is already set earlier than current tag.` +
`currentTag: ${this.util.getCurrentTag()} endTag: ${endTag}`);
}
}
/**
* Return whether the next event can be handled, or handling the next event
* has to be postponed to a later time.
*
* If this federated app has not received a sufficiently large time advance
* grant (TAG) from the RTI for the next event, send it a Next Event Time
* (NET) message and return. _next() will be called when a new greatest TAG
* is received. The NET message is not sent if the connection to the RTI is
* closed. FIXME: what happens in that case? Will next be called?
* @param nextEvent
*/
protected _canProceed(nextEvent: TaggedEvent<Present>) {
let tagBarrier = null;
// Set tag barrier using the tag when stop is requested but not granted yet.
// Otherwise, set the tagBarrier using the greated TAG.
if (this.stopRequestInfo.state === StopRequestState.SENT) {
tagBarrier = this.stopRequestInfo.tag;
} else {
tagBarrier = this._getGreatestTimeAdvanceGrant();
}
if (this._isRTISynchronized() || tagBarrier !== null) {
if (tagBarrier === null || tagBarrier.isSmallerThan(nextEvent.tag)) {
if (this.minDelayFromPhysicalActionToFederateOutput !== null &&