From 6c2bf0fead1c3180e83b0ae81b0bd172760ce526 Mon Sep 17 00:00:00 2001 From: Chanhee Lee Date: Sun, 11 Feb 2024 18:22:19 -0700 Subject: [PATCH] Recover parts of original federated tests --- .github/workflows/c-tests-with-rust-rti.yml | 2 +- .../launcher/FedLauncherGenerator.java | 15 +-- test/RustRti/src/federated/Absent.lf | 46 +++++++ .../src/federated/BroadcastFeedback.lf | 33 +++++ .../BroadcastFeedbackWithHierarchy.lf | 40 ++++++ test/RustRti/src/federated/ChainWithDelay.lf | 20 +++ test/RustRti/src/federated/DistributedBank.lf | 24 ++++ .../federated/DistributedBankToMultiport.lf | 33 +++++ .../RustRti/src/federated/DistributedCount.lf | 41 ++++++ .../src/federated/DistributedDoublePort.lf | 52 ++++++++ .../src/federated/DistributedInterleaved.lf | 44 +++++++ .../src/federated/DistributedLoopedAction.lf | 62 +++++++++ .../DistributedLoopedPhysicalAction.lf | 84 +++++++++++++ .../src/federated/DistributedMultiport.lf | 48 +++++++ .../federated/DistributedMultiportToBank.lf | 41 ++++++ .../federated/DistributedMultiportToken.lf | 46 +++++++ .../src/federated/DistributedNetworkOrder.lf | 75 +++++++++++ .../DistributedPhysicalActionUpstream.lf | 60 +++++++++ .../DistributedPhysicalActionUpstreamLong.lf | 88 +++++++++++++ test/RustRti/src/federated/DistributedStop.lf | 118 ++++++++++++++++++ .../src/federated/DistributedStopZero.lf | 84 +++++++++++++ .../federated/EnclaveFederatedRequestStop.lf | 39 ++++++ .../src/federated/FederatedFilePkgReader.lf | 57 +++++++++ .../src/federated/FederatedFileReader.lf | 66 ++++++++++ test/RustRti/src/federated/FeedbackDelay.lf | 85 +++++++++++++ test/RustRti/src/federated/FeedbackDelay3.lf | 41 ++++++ test/RustRti/src/federated/FeedbackDelay5.lf | 57 +++++++++ .../src/federated/FeedbackDelaySimple.lf | 41 ++++++ .../src/federated/InheritanceFederated.lf | 23 ++++ .../federated/LoopDistributedCentralized.lf | 48 +++++++ .../federated/LoopDistributedCentralized2.lf | 75 +++++++++++ ...oopDistributedCentralizedPhysicalAction.lf | 74 +++++++++++ .../LoopDistributedCentralizedPrecedence.lf | 56 +++++++++ ...stributedCentralizedPrecedenceHierarchy.lf | 73 +++++++++++ .../src/federated/ParallelDestinations.lf | 23 ++++ test/RustRti/src/federated/ParallelSources.lf | 24 ++++ .../src/federated/ParallelSourcesMultiport.lf | 34 +++++ .../src/federated/SpuriousDependency.lf | 63 ++++++++++ .../src/federated/TopLevelArtifacts.lf | 44 +++++++ test/RustRti/src/lib/Count.lf | 11 ++ test/RustRti/src/lib/FileLevelPreamble.lf | 12 ++ test/RustRti/src/lib/FileReader.txt | 1 + test/RustRti/src/lib/GenDelay.lf | 21 ++++ test/RustRti/src/lib/Imported.lf | 14 +++ test/RustRti/src/lib/ImportedAgain.lf | 15 +++ test/RustRti/src/lib/ImportedComposition.lf | 22 ++++ test/RustRti/src/lib/InternalDelay.lf | 15 +++ test/RustRti/src/lib/LoopedActionSender.lf | 36 ++++++ test/RustRti/src/lib/PassThrough.lf | 11 ++ test/RustRti/src/lib/Test.lf | 15 +++ test/RustRti/src/lib/TestCount.lf | 34 +++++ test/RustRti/src/lib/TestCountMultiport.lf | 41 ++++++ 52 files changed, 2216 insertions(+), 11 deletions(-) create mode 100644 test/RustRti/src/federated/Absent.lf create mode 100644 test/RustRti/src/federated/BroadcastFeedback.lf create mode 100644 test/RustRti/src/federated/BroadcastFeedbackWithHierarchy.lf create mode 100644 test/RustRti/src/federated/ChainWithDelay.lf create mode 100644 test/RustRti/src/federated/DistributedBank.lf create mode 100644 test/RustRti/src/federated/DistributedBankToMultiport.lf create mode 100644 test/RustRti/src/federated/DistributedCount.lf create mode 100644 test/RustRti/src/federated/DistributedDoublePort.lf create mode 100644 test/RustRti/src/federated/DistributedInterleaved.lf create mode 100644 test/RustRti/src/federated/DistributedLoopedAction.lf create mode 100644 test/RustRti/src/federated/DistributedLoopedPhysicalAction.lf create mode 100644 test/RustRti/src/federated/DistributedMultiport.lf create mode 100644 test/RustRti/src/federated/DistributedMultiportToBank.lf create mode 100644 test/RustRti/src/federated/DistributedMultiportToken.lf create mode 100644 test/RustRti/src/federated/DistributedNetworkOrder.lf create mode 100644 test/RustRti/src/federated/DistributedPhysicalActionUpstream.lf create mode 100644 test/RustRti/src/federated/DistributedPhysicalActionUpstreamLong.lf create mode 100644 test/RustRti/src/federated/DistributedStop.lf create mode 100644 test/RustRti/src/federated/DistributedStopZero.lf create mode 100644 test/RustRti/src/federated/EnclaveFederatedRequestStop.lf create mode 100644 test/RustRti/src/federated/FederatedFilePkgReader.lf create mode 100644 test/RustRti/src/federated/FederatedFileReader.lf create mode 100644 test/RustRti/src/federated/FeedbackDelay.lf create mode 100644 test/RustRti/src/federated/FeedbackDelay3.lf create mode 100644 test/RustRti/src/federated/FeedbackDelay5.lf create mode 100644 test/RustRti/src/federated/FeedbackDelaySimple.lf create mode 100644 test/RustRti/src/federated/InheritanceFederated.lf create mode 100644 test/RustRti/src/federated/LoopDistributedCentralized.lf create mode 100644 test/RustRti/src/federated/LoopDistributedCentralized2.lf create mode 100644 test/RustRti/src/federated/LoopDistributedCentralizedPhysicalAction.lf create mode 100644 test/RustRti/src/federated/LoopDistributedCentralizedPrecedence.lf create mode 100644 test/RustRti/src/federated/LoopDistributedCentralizedPrecedenceHierarchy.lf create mode 100644 test/RustRti/src/federated/ParallelDestinations.lf create mode 100644 test/RustRti/src/federated/ParallelSources.lf create mode 100644 test/RustRti/src/federated/ParallelSourcesMultiport.lf create mode 100644 test/RustRti/src/federated/SpuriousDependency.lf create mode 100644 test/RustRti/src/federated/TopLevelArtifacts.lf create mode 100644 test/RustRti/src/lib/Count.lf create mode 100644 test/RustRti/src/lib/FileLevelPreamble.lf create mode 100644 test/RustRti/src/lib/FileReader.txt create mode 100644 test/RustRti/src/lib/GenDelay.lf create mode 100644 test/RustRti/src/lib/Imported.lf create mode 100644 test/RustRti/src/lib/ImportedAgain.lf create mode 100644 test/RustRti/src/lib/ImportedComposition.lf create mode 100644 test/RustRti/src/lib/InternalDelay.lf create mode 100644 test/RustRti/src/lib/LoopedActionSender.lf create mode 100644 test/RustRti/src/lib/PassThrough.lf create mode 100644 test/RustRti/src/lib/Test.lf create mode 100644 test/RustRti/src/lib/TestCount.lf create mode 100644 test/RustRti/src/lib/TestCountMultiport.lf diff --git a/.github/workflows/c-tests-with-rust-rti.yml b/.github/workflows/c-tests-with-rust-rti.yml index 00a73d5eff..bf096a3efe 100644 --- a/.github/workflows/c-tests-with-rust-rti.yml +++ b/.github/workflows/c-tests-with-rust-rti.yml @@ -14,7 +14,7 @@ jobs: matrix: platform: ${{ (inputs.all-platforms && fromJSON('["ubuntu-latest"]')) || fromJSON('["ubuntu-latest"]') }} runs-on: ${{ matrix.platform }} - timeout-minutes: 10 + timeout-minutes: 20 steps: - name: Check out lingua-franca repository diff --git a/core/src/main/java/org/lflang/federated/launcher/FedLauncherGenerator.java b/core/src/main/java/org/lflang/federated/launcher/FedLauncherGenerator.java index 757cb4487a..175f610826 100644 --- a/core/src/main/java/org/lflang/federated/launcher/FedLauncherGenerator.java +++ b/core/src/main/java/org/lflang/federated/launcher/FedLauncherGenerator.java @@ -319,7 +319,7 @@ public void doGenerateForRustRTI(List federates, RtiConfig rti // Launch the RTI in the foreground. if (host.equals("localhost") || host.equals("0.0.0.0")) { // FIXME: the paths below will not work on Windows - shCode.append(getLaunchCodeForRustRti()).append("\n"); + shCode.append(getLaunchCodeForRustRti(Integer.toString(federates.size()))).append("\n"); } else { // Start the RTI on the remote machine - Not supported yet for Rust RTI. } @@ -329,10 +329,7 @@ public void doGenerateForRustRTI(List federates, RtiConfig rti for (FederateInstance federate : federates) { var buildConfig = getBuildConfig(federate, fileConfig, messageReporter); if (federate.isRemote) { - Path fedRelSrcGenPath = - fileConfig.getOutPath().relativize(fileConfig.getSrcGenPath()).resolve(federate.name); - if (distCode.isEmpty()) distCode.append(distHeader).append("\n"); - String logFileName = String.format("log/%s_%s.log", fileConfig.name, federate.name); + if (distCode.isEmpty()) distCode.append(distHeader).append("\n"); distCode.append(getDistCode(rtiConfig.getDirectory(), federate)).append("\n"); shCode .append(getFedRemoteLaunchCode(rtiConfig.getDirectory(), federate, federateIndex++)) @@ -543,7 +540,8 @@ private String getLaunchCode(String rtiLaunchCode) { "sleep 1"); } - private String getLaunchCodeForRustRti() { + private String getLaunchCodeForRustRti(String numberOfFederates) { + String launchCodeWithoutLogging = new String("cargo run -- -i ${FEDERATION_ID} -n "+ numberOfFederates + " -c init &"); return String.join( "\n", "echo \"#### Launching the Rust runtime infrastructure (RTI).\"", @@ -561,10 +559,7 @@ private String getLaunchCodeForRustRti() { " FIRST_RUST_RTI_PATH=${FIRST_RUST_RTI_REMOTE_PATH[0]%/*}", " cd ${FIRST_RUST_RTI_PATH}; cd ../", "fi", - "cargo run -- -i ${FEDERATION_ID} \\", - "-n 2 \\", - "-c init \\", - "&", + launchCodeWithoutLogging, "# Store the PID of the RTI", "RTI=$!", "# Wait for the RTI to boot up before", diff --git a/test/RustRti/src/federated/Absent.lf b/test/RustRti/src/federated/Absent.lf new file mode 100644 index 0000000000..7130210cf3 --- /dev/null +++ b/test/RustRti/src/federated/Absent.lf @@ -0,0 +1,46 @@ +target C { + tracing: true, + timeout: 100 ms +} + +reactor Sender { + output out1: int + output out2: int + timer t(0, 20 ms) + state c: int = 1 + + reaction(t) -> out1, out2 {= + if (self->c % 2 != 0) { + lf_set(out1, self->c); + } else { + lf_set(out2, self->c); + } + self->c++; + =} +} + +reactor Receiver { + input in1: int + input in2: int + + reaction(in1) {= + lf_print("Received %d on in1", in1->value); + if (in1->value % 2 == 0) { + lf_print_error_and_exit("********* Expected an odd integer!"); + } + =} + + reaction(in2) {= + lf_print("Received %d on in2", in2->value); + if (in2->value % 2 != 0) { + lf_print_error_and_exit("********* Expected an even integer!"); + } + =} +} + +federated reactor(d: time = 1 ms) { + s = new Sender() + r = new Receiver() + s.out1 -> r.in1 + s.out2 -> r.in2 +} diff --git a/test/RustRti/src/federated/BroadcastFeedback.lf b/test/RustRti/src/federated/BroadcastFeedback.lf new file mode 100644 index 0000000000..66a93c275b --- /dev/null +++ b/test/RustRti/src/federated/BroadcastFeedback.lf @@ -0,0 +1,33 @@ +/** This tests an output that is broadcast back to a multiport input of a bank. */ +target C { + timeout: 1 sec, + build-type: RelWithDebInfo +} + +reactor SenderAndReceiver { + output out: int + input[2] in: int + state received: bool = false + + reaction(startup) -> out {= + lf_set(out, 42); + =} + + reaction(in) {= + if (in[0]->is_present && in[1]->is_present && in[0]->value == 42 && in[1]->value == 42) { + lf_print("SUCCESS"); + self->received = true; + } + =} + + reaction(shutdown) {= + if (!self->received == true) { + lf_print_error_and_exit("Failed to receive broadcast"); + } + =} +} + +federated reactor { + s = new[2] SenderAndReceiver() + (s.out)+ -> s.in +} diff --git a/test/RustRti/src/federated/BroadcastFeedbackWithHierarchy.lf b/test/RustRti/src/federated/BroadcastFeedbackWithHierarchy.lf new file mode 100644 index 0000000000..114e42cfd7 --- /dev/null +++ b/test/RustRti/src/federated/BroadcastFeedbackWithHierarchy.lf @@ -0,0 +1,40 @@ +/** This tests an output that is broadcast back to a multiport input of a bank. */ +target C { + timeout: 1 sec +} + +reactor SenderAndReceiver { + output out: int + input[2] in: int + state received: bool = false + + r = new Receiver() + in -> r.in + + reaction(startup) -> out {= + lf_set(out, 42); + =} +} + +reactor Receiver { + input[2] in: int + state received: bool = false + + reaction(in) {= + if (in[0]->is_present && in[1]->is_present && in[0]->value == 42 && in[1]->value == 42) { + lf_print("SUCCESS"); + self->received = true; + } + =} + + reaction(shutdown) {= + if (!self->received == true) { + lf_print_error_and_exit("Failed to receive broadcast"); + } + =} +} + +federated reactor { + s = new[2] SenderAndReceiver() + (s.out)+ -> s.in +} diff --git a/test/RustRti/src/federated/ChainWithDelay.lf b/test/RustRti/src/federated/ChainWithDelay.lf new file mode 100644 index 0000000000..dea606bf51 --- /dev/null +++ b/test/RustRti/src/federated/ChainWithDelay.lf @@ -0,0 +1,20 @@ +/** + * Demonstration that monotonic NET hypothesis is invalid. + * + * @author Edward A. Lee + */ +target C { + timeout: 3 msec +} + +import Count from "../lib/Count.lf" +import InternalDelay from "../lib/InternalDelay.lf" +import TestCount from "../lib/TestCount.lf" + +federated reactor { + c = new Count(period = 1 msec) + i = new InternalDelay(delay = 500 usec) + t = new TestCount(num_inputs=3) + c.out -> i.in + i.out -> t.in +} diff --git a/test/RustRti/src/federated/DistributedBank.lf b/test/RustRti/src/federated/DistributedBank.lf new file mode 100644 index 0000000000..65a6f871c2 --- /dev/null +++ b/test/RustRti/src/federated/DistributedBank.lf @@ -0,0 +1,24 @@ +// Check bank of federates. +target C { + timeout: 1 sec, + coordination: centralized +} + +reactor Node(bank_index: int = 0) { + timer t(0, 100 msec) + state count: int = 0 + + reaction(t) {= + lf_print("Hello world %d.", self->count++); + =} + + reaction(shutdown) {= + if (self->count == 0) { + lf_print_error_and_exit("Timer reactions did not execute."); + } + =} +} + +federated reactor DistributedBank { + n = new[2] Node() +} diff --git a/test/RustRti/src/federated/DistributedBankToMultiport.lf b/test/RustRti/src/federated/DistributedBankToMultiport.lf new file mode 100644 index 0000000000..d73b0959fd --- /dev/null +++ b/test/RustRti/src/federated/DistributedBankToMultiport.lf @@ -0,0 +1,33 @@ +// Check multiport to bank connections between federates. +target C { + timeout: 3 sec +} + +import Count from "../lib/Count.lf" + +reactor Destination { + input[2] in: int + state count: int = 1 + + reaction(in) {= + for (int i = 0; i < in_width; i++) { + lf_print("Received %d.", in[i]->value); + if (self->count != in[i]->value) { + lf_print_error_and_exit("Expected %d.", self->count); + } + } + self->count++; + =} + + reaction(shutdown) {= + if (self->count == 0) { + lf_print_error_and_exit("No data received."); + } + =} +} + +federated reactor { + s = new[2] Count() + d = new Destination() + s.out -> d.in +} diff --git a/test/RustRti/src/federated/DistributedCount.lf b/test/RustRti/src/federated/DistributedCount.lf new file mode 100644 index 0000000000..fb1c86904e --- /dev/null +++ b/test/RustRti/src/federated/DistributedCount.lf @@ -0,0 +1,41 @@ +/** + * Test a particularly simple form of a distributed deterministic system where a federation that + * receives timestamped messages has only those messages as triggers. Therefore, no additional + * coordination of the advancement of time (HLA or Ptides) is needed. + * @author Edward A. Lee + */ +target C { + timeout: 5 sec, + coordination: centralized +} + +import Count from "../lib/Count.lf" + +reactor Print { + input in: int + state c: int = 1 + + reaction(in) {= + interval_t elapsed_time = lf_time_logical_elapsed(); + lf_print("At time " PRINTF_TIME ", received %d", elapsed_time, in->value); + if (in->value != self->c) { + lf_print_error_and_exit("Expected to receive %d.", self->c); + } + if (elapsed_time != MSEC(200) + SEC(1) * (self->c - 1) ) { + lf_print_error_and_exit("Expected received time to be " PRINTF_TIME ".", MSEC(200) * self->c); + } + self->c++; + =} + + reaction(shutdown) {= + if (self->c != 6) { + lf_print_error_and_exit("Expected to receive 5 items."); + } + =} +} + +federated reactor DistributedCount(offset: time = 200 msec) { + c = new Count() + p = new Print() + c.out -> p.in after offset +} diff --git a/test/RustRti/src/federated/DistributedDoublePort.lf b/test/RustRti/src/federated/DistributedDoublePort.lf new file mode 100644 index 0000000000..ec0a6d0b1d --- /dev/null +++ b/test/RustRti/src/federated/DistributedDoublePort.lf @@ -0,0 +1,52 @@ +/** + * Test the case for when two upstream federates send messages to a downstream federate on two + * different ports. One message should carry a microstep delay relative to the other message. + * + * @author Soroush Bateni + */ +target C { + timeout: 900 msec, + coordination: centralized +} + +import Count from "../lib/Count.lf" + +reactor CountMicrostep { + state count: int = 1 + output out: int + logical action act: int + timer t(0, 1 sec) + + reaction(t) -> act {= + lf_schedule_int(act, 0, self->count++); + =} + + reaction(act) -> out {= + lf_set(out, act->value); + =} +} + +reactor Print { + input in: int + input in2: int + + reaction(in, in2) {= + interval_t elapsed_time = lf_time_logical_elapsed(); + lf_print("At tag " PRINTF_TAG ", received in = %d and in2 = %d.", elapsed_time, lf_tag().microstep, in->value, in2->value); + if (in->is_present && in2->is_present) { + lf_print_error_and_exit("ERROR: invalid logical simultaneity."); + } + =} + + reaction(shutdown) {= + lf_print("SUCCESS: messages were at least one microstep apart."); + =} +} + +federated reactor DistributedDoublePort { + c = new Count() + cm = new CountMicrostep() + p = new Print() + c.out -> p.in // Indicating a 'logical' connection. + cm.out -> p.in2 +} diff --git a/test/RustRti/src/federated/DistributedInterleaved.lf b/test/RustRti/src/federated/DistributedInterleaved.lf new file mode 100644 index 0000000000..dc212daf17 --- /dev/null +++ b/test/RustRti/src/federated/DistributedInterleaved.lf @@ -0,0 +1,44 @@ +// Check multiport to bank connections between federates. +target C { + timeout: 3 sec +} + +reactor Count(offset: time = 0, period: time = 1 sec) { + state count: int = 1 + output[4] out: int + timer t(offset, period) + + reaction(t) -> out {= + for (int i = 0; i < out_width; i++) { + lf_set(out[i], self->count++); + } + =} +} + +reactor Destination { + input[2] in: int + state count: int = 0 + + reaction(in) {= + lf_print("Received %d.", in[0]->value); + lf_print("Received %d.", in[1]->value); + // Because the connection is interleaved, the difference between the + // two inputs should be 2, not 1. + if (in[1]->value - in[0]->value != 2) { + lf_print_error_and_exit("Expected a difference of two."); + } + self->count++; + =} + + reaction(shutdown) {= + if (self->count == 0) { + lf_print_error_and_exit("No data received."); + } + =} +} + +federated reactor { + s = new Count() + d = new[2] Destination() + s.out -> interleaved(d.in) +} diff --git a/test/RustRti/src/federated/DistributedLoopedAction.lf b/test/RustRti/src/federated/DistributedLoopedAction.lf new file mode 100644 index 0000000000..88418f84d1 --- /dev/null +++ b/test/RustRti/src/federated/DistributedLoopedAction.lf @@ -0,0 +1,62 @@ +/** + * Test a sender-receiver network system that relies on microsteps being taken into account. + * + * @author Soroush Bateni + */ +target C { + logging: LOG, + timeout: 1 sec +} + +import Sender from "../lib/LoopedActionSender.lf" + +reactor Receiver(take_a_break_after: int = 10, break_interval: time = 400 msec) { + input in: int + state received_messages: int = 0 + state total_received_messages: int = 0 + state breaks: int = 0 + timer t(0, 10 msec) // This will impact the performance + + // but forces the logical time to advance Comment this line for a more sensible log output. + reaction(in) {= + lf_print("At tag " PRINTF_TAG " received value %d.", + lf_time_logical_elapsed(), + lf_tag().microstep, + in->value); + self->total_received_messages++; + if (in->value != self->received_messages++) { + lf_print_error("Expected %d", self->received_messages - 1); + // exit(1); + } + if (lf_time_logical_elapsed() != self->breaks * self->break_interval) { + lf_print_error("Received messages at an incorrect time: " PRINTF_TIME, lf_time_logical_elapsed()); + // exit(2); + } + + if (self->received_messages == self->take_a_break_after) { + // Sender is taking a break; + self->breaks++; + self->received_messages = 0; + } + =} + + reaction(t) {= + // Do nothing + =} + + reaction(shutdown) {= + if (self->breaks != 3 || + (self->total_received_messages != ((SEC(1)/self->break_interval)+1) * self->take_a_break_after) + ) { + lf_print_error_and_exit("Did not receive enough messages."); + } + printf("SUCCESS: Successfully received all messages from the sender.\n"); + =} +} + +federated reactor DistributedLoopedAction { + sender = new Sender() + receiver = new Receiver() + + sender.out -> receiver.in +} diff --git a/test/RustRti/src/federated/DistributedLoopedPhysicalAction.lf b/test/RustRti/src/federated/DistributedLoopedPhysicalAction.lf new file mode 100644 index 0000000000..b30ce210e3 --- /dev/null +++ b/test/RustRti/src/federated/DistributedLoopedPhysicalAction.lf @@ -0,0 +1,84 @@ +/** + * Test a sender-receiver network system that is similar to DistributedLoopedAction, but it uses a + * physical action rather than a logical action. This also demonstrates the advance-message-interval + * coordination option. This specifies the time period between Time Advance Notice (TAN) messages + * sent to the RTI (a form of null message that must be sent because of the physical action). The + * presence of this option also silences a warning about having a physical action that triggers an + * output. + * + * @author Soroush Bateni + */ +target C { + timeout: 1 sec, + tracing: true, + // Silences warning. + coordination-options: { + advance-message-interval: 10 msec + } +} + +reactor Sender(take_a_break_after: int = 10, break_interval: time = 550 msec) { + output out: int + physical action act + state sent_messages: int = 0 + + reaction(startup, act) -> act, out {= + // Send a message on out + lf_set(out, self->sent_messages); + self->sent_messages++; + if (self->sent_messages < self->take_a_break_after) { + lf_schedule(act, 0); + } else { + // Take a break + self->sent_messages = 0; + lf_schedule(act, self->break_interval); + } + =} +} + +reactor Receiver(take_a_break_after: int = 10, break_interval: time = 550 msec) { + input in: int + state received_messages: int = 0 + state total_received_messages: int = 0 + state breaks: int = 0 + timer t(0, 10 msec) // This will impact the performance + + // but forces the logical time to advance Comment this line for a more sensible log output. + reaction(in) {= + tag_t current_tag = lf_tag(); + lf_print("At tag " PRINTF_TAG " received %d.", + current_tag.time - lf_time_start(), + current_tag.microstep, + in->value); + self->total_received_messages++; + if (in->value != self->received_messages++) { + lf_print_error_and_exit("Expected %d.", self->received_messages - 1); + } + + if (self->received_messages == self->take_a_break_after) { + // Sender is taking a break; + self->breaks++; + self->received_messages = 0; + } + =} + + reaction(t) {= + // Do nothing + =} + + reaction(shutdown) {= + if (self->breaks < 2 || + (self->total_received_messages < ((SEC(1)/self->break_interval)+1) * self->take_a_break_after) + ) { + lf_print_error_and_exit("Test failed. Breaks: %d, Messages: %d.", self->breaks, self->total_received_messages); + } + lf_print("SUCCESS: Successfully received all messages from the sender."); + =} +} + +federated reactor DistributedLoopedPhysicalAction { + sender = new Sender() + receiver = new Receiver() + + sender.out -> receiver.in +} diff --git a/test/RustRti/src/federated/DistributedMultiport.lf b/test/RustRti/src/federated/DistributedMultiport.lf new file mode 100644 index 0000000000..44a04c4654 --- /dev/null +++ b/test/RustRti/src/federated/DistributedMultiport.lf @@ -0,0 +1,48 @@ +// Check multiport connections between federates. +target C { + timeout: 1 sec, + coordination: centralized +} + +reactor Source(width: int = 2) { + output[width] out: int + timer t(0, 100 msec) + state count: int = 0 + + reaction(t) -> out {= + for (int i = 0; i < out_width; i++) { + lf_set(out[i], self->count++); + } + =} +} + +reactor Destination(width: int = 3) { + input[width] in: int + state count: int = 0 + + reaction(in) {= + for (int i = 0; i < in_width; i++) { + if (in[i]->is_present) { + tag_t now = lf_tag(); + lf_print("Received %d at channel %d at tag " PRINTF_TAG, in[i]->value, i, + now.time - lf_time_start(), now.microstep + ); + if (in[i]->value != self->count++) { + lf_print_error_and_exit("Expected %d.", self->count - 1); + } + } + } + =} + + reaction(shutdown) {= + if (self->count == 0) { + lf_print_error_and_exit("No data received."); + } + =} +} + +federated reactor DistributedMultiport { + s = new Source(width=4) + d = new Destination(width=4) + s.out -> d.in +} diff --git a/test/RustRti/src/federated/DistributedMultiportToBank.lf b/test/RustRti/src/federated/DistributedMultiportToBank.lf new file mode 100644 index 0000000000..d8171de51e --- /dev/null +++ b/test/RustRti/src/federated/DistributedMultiportToBank.lf @@ -0,0 +1,41 @@ +// Check multiport to bank connections between federates. +target C { + timeout: 1 sec +} + +reactor Source { + output[2] out: int + timer t(0, 100 msec) + state count: int = 0 + + reaction(t) -> out {= + for (int i = 0; i < out_width; i++) { + lf_set(out[i], self->count); + } + self->count++; + =} +} + +reactor Destination { + input in: int + state count: int = 0 + + reaction(in) {= + lf_print("Received %d.", in->value); + if (self->count++ != in->value) { + lf_print_error_and_exit("Expected %d.", self->count - 1); + } + =} + + reaction(shutdown) {= + if (self->count == 0) { + lf_print_error_and_exit("No data received."); + } + =} +} + +federated reactor DistributedMultiportToBank { + s = new Source() + d = new[2] Destination() + s.out -> d.in +} diff --git a/test/RustRti/src/federated/DistributedMultiportToken.lf b/test/RustRti/src/federated/DistributedMultiportToken.lf new file mode 100644 index 0000000000..547fe651d9 --- /dev/null +++ b/test/RustRti/src/federated/DistributedMultiportToken.lf @@ -0,0 +1,46 @@ +// Check multiport connections between federates where the message is carried by a Token (in this +// case, with an array of char). +target C { + timeout: 1 sec, + coordination: centralized +} + +reactor Source { + output[4] out: char* + timer t(0, 200 msec) + state count: int = 0 + + reaction(t) -> out {= + for (int i = 0; i < out_width; i++) { + // With NULL, 0 arguments, snprintf tells us how many bytes are needed. + // Add one for the null terminator. + int length = snprintf(NULL, 0, "Hello %d", self->count) + 1; + // Dynamically allocate memory for the output. + SET_NEW_ARRAY(out[i], length); + // Populate the output string and increment the count. + snprintf(out[i]->value, length, "Hello %d", self->count++); + lf_print("MessageGenerator: At time " PRINTF_TIME ", send message: %s.", + lf_time_logical_elapsed(), + out[i]->value + ); + } + =} +} + +reactor Destination { + input[4] in: char* + + reaction(in) {= + for (int i = 0; i < in_width; i++) { + if (in[i]->is_present) { + lf_print("Received %s.", in[i]->value); + } + } + =} +} + +federated reactor DistributedMultiportToken { + s = new Source() + d = new Destination() + s.out -> d.in +} diff --git a/test/RustRti/src/federated/DistributedNetworkOrder.lf b/test/RustRti/src/federated/DistributedNetworkOrder.lf new file mode 100644 index 0000000000..b1413c11b1 --- /dev/null +++ b/test/RustRti/src/federated/DistributedNetworkOrder.lf @@ -0,0 +1,75 @@ +/** + * This is a test for send_timed_message, which is an internal API. + * + * This test sends a second message at time 5 msec that has the same intended tag as a message that + * it had previously sent at time 0 msec. This results in a warning, but the message microstep is + * incremented and correctly received one microstep later. + * + * @author Soroush Bateni + */ +target C { + timeout: 1 sec, + build-type: RelWithDebInfo // Release with debug info +} + +preamble {= + #ifdef __cplusplus + extern "C" { + #endif + #include "federate.h" + #ifdef __cplusplus + } + #endif +=} + +reactor Sender { + output out: int + timer t(0, 1 msec) + + reaction(t) -> out {= + int payload = 1; + if (lf_time_logical_elapsed() == 0LL) { + lf_send_tagged_message(self->base.environment, MSEC(10), MSG_TYPE_TAGGED_MESSAGE, 0, 1, "federate 1", sizeof(int), + (unsigned char*)&payload); + } else if (lf_time_logical_elapsed() == MSEC(5)) { + payload = 2; + lf_send_tagged_message(self->base.environment, MSEC(5), MSG_TYPE_TAGGED_MESSAGE, 0, 1, "federate 1", sizeof(int), + (unsigned char*)&payload); + } + =} +} + +reactor Receiver { + input in: int + state success: int = 0 + + reaction(in) {= + tag_t current_tag = lf_tag(); + if (current_tag.time == (lf_time_start() + MSEC(10))) { + if (current_tag.microstep == 0 && in->value == 1) { + self->success++; + } else if (current_tag.microstep == 1 && in->value == 2) { + self->success++; + } + } + printf("Received %d at tag " PRINTF_TAG ".\n", + in->value, + lf_time_logical_elapsed(), + lf_tag().microstep); + =} + + reaction(shutdown) {= + if (self->success != 2) { + fprintf(stderr, "ERROR: Failed to receive messages.\n"); + exit(1); + } + printf("SUCCESS.\n"); + =} +} + +federated reactor DistributedNetworkOrder { + sender = new Sender() + receiver = new Receiver() + + sender.out -> receiver.in +} diff --git a/test/RustRti/src/federated/DistributedPhysicalActionUpstream.lf b/test/RustRti/src/federated/DistributedPhysicalActionUpstream.lf new file mode 100644 index 0000000000..3a85c9b3d1 --- /dev/null +++ b/test/RustRti/src/federated/DistributedPhysicalActionUpstream.lf @@ -0,0 +1,60 @@ +/** + * Test that a rapidly produced physical action in an upstream federate can be properly handled in + * federated execution. + */ +target C { + timeout: 10 secs, + coordination-options: { + advance-message-interval: 30 msec + } +} + +import PassThrough from "../lib/PassThrough.lf" +import TestCount from "../lib/TestCount.lf" + +preamble {= + extern int _counter; + void callback(void *a); + void* take_time(void* a); +=} + +reactor WithPhysicalAction { + preamble {= + int _counter = 1; + void callback(void *a) { + lf_schedule_int(a, 0, _counter++); + } + // Simulate time passing before a callback occurs. + void* take_time(void* a) { + while (_counter < 15) { + instant_t sleep_time = MSEC(10); + lf_sleep(sleep_time); + callback(a); + } + return NULL; + } + =} + + output out: int + state thread_id: lf_thread_t = 0 + physical action act(0): int + + reaction(startup) -> act {= + // start new thread, provide callback + lf_thread_create(&self->thread_id, &take_time, act); + =} + + reaction(act) -> out {= + lf_set(out, act->value); + =} +} + +federated reactor { + a = new WithPhysicalAction() + m1 = new PassThrough() + m2 = new PassThrough() + test = new TestCount(num_inputs=14) + a.out -> m1.in + m1.out -> m2.in + m2.out -> test.in +} diff --git a/test/RustRti/src/federated/DistributedPhysicalActionUpstreamLong.lf b/test/RustRti/src/federated/DistributedPhysicalActionUpstreamLong.lf new file mode 100644 index 0000000000..99154b1ad2 --- /dev/null +++ b/test/RustRti/src/federated/DistributedPhysicalActionUpstreamLong.lf @@ -0,0 +1,88 @@ +/** + * Test that a rapidly produced physical action in an upstream federate can be properly handled in a + * long chain of federates. + */ +target C { + timeout: 1 sec, + coordination-options: { + advance-message-interval: 500 usec + } +} + +import PassThrough from "../lib/PassThrough.lf" +import TestCount from "../lib/TestCount.lf" + +preamble {= + extern int _counter; + void callback(void *a); + void* take_time(void* a); +=} + +reactor WithPhysicalAction { + preamble {= + int _counter = 1; + void callback(void *a) { + lf_schedule_int(a, 0, _counter++); + } + // Simulate time passing before a callback occurs. + void* take_time(void* a) { + while (_counter < 20) { + instant_t sleep_time = USEC(50); + lf_sleep(sleep_time); + callback(a); + } + return NULL; + } + =} + output out: int + state thread_id: lf_thread_t = 0 + physical action act(0): int + + reaction(startup) -> act {= + // start new thread, provide callback + lf_thread_create(&self->thread_id, &take_time, act); + =} + + reaction(act) -> out {= + lf_set(out, act->value); + =} +} + +federated reactor { + a = new WithPhysicalAction() + test = new TestCount(num_inputs=19) + + passThroughs1 = new PassThrough() + passThroughs2 = new PassThrough() + passThroughs3 = new PassThrough() + passThroughs4 = new PassThrough() + passThroughs5 = new PassThrough() + passThroughs6 = new PassThrough() + passThroughs7 = new PassThrough() + passThroughs8 = new PassThrough() + passThroughs9 = new PassThrough() + passThroughs10 = new PassThrough() + + a.out, + passThroughs1.out, + passThroughs2.out, + passThroughs3.out, + passThroughs4.out, + passThroughs5.out, + passThroughs6.out, + passThroughs7.out, + passThroughs8.out, + passThroughs9.out, + passThroughs10.out -> + passThroughs1.in, + passThroughs2.in, + passThroughs3.in, + passThroughs4.in, + passThroughs5.in, + passThroughs6.in, + passThroughs7.in, + passThroughs8.in, + passThroughs9.in, + passThroughs10.in, + test.in +} diff --git a/test/RustRti/src/federated/DistributedStop.lf b/test/RustRti/src/federated/DistributedStop.lf new file mode 100644 index 0000000000..6e8796d90b --- /dev/null +++ b/test/RustRti/src/federated/DistributedStop.lf @@ -0,0 +1,118 @@ +/** + * Test for lf_request_stop() in federated execution with centralized coordination. + * + * @author Soroush Bateni + */ +target C + +reactor Sender { + output out: int + timer t(0, 1 usec) + logical action act + state reaction_invoked_correctly: bool = false + + reaction(t, act) -> out, act {= + lf_print("Sending 42 at " PRINTF_TAG ".", + lf_time_logical_elapsed(), + lf_tag().microstep); + lf_set(out, 42); + if (lf_tag().microstep == 0) { + // Instead of having a separate reaction + // for 'act' like Stop.lf, we trigger the + // same reaction to test lf_request_stop() being + // called multiple times + lf_schedule(act, 0); + } + if (lf_time_logical_elapsed() == USEC(1)) { + // Call lf_request_stop() both at (1 usec, 0) and + // (1 usec, 1) + lf_print("Requesting stop at " PRINTF_TAG ".", + lf_time_logical_elapsed(), + lf_tag().microstep); + lf_request_stop(); + } + + tag_t _1usec1 = (tag_t) { .time = USEC(1) + lf_time_start(), .microstep = 1u }; + if (lf_tag_compare(lf_tag(), _1usec1) == 0) { + // The reaction was invoked at (1 usec, 1) as expected + self->reaction_invoked_correctly = true; + } else if (lf_tag_compare(lf_tag(), _1usec1) > 0) { + // The reaction should not have been invoked at tags larger than (1 usec, 1) + lf_print_error_and_exit("ERROR: Invoked reaction(t, act) at tag bigger than shutdown."); + } + =} + + reaction(shutdown) {= + if (lf_time_logical_elapsed() != USEC(1) || + lf_tag().microstep != 1) { + lf_print_error_and_exit("ERROR: Sender failed to stop the federation in time. " + "Stopping at " PRINTF_TAG ".", + lf_time_logical_elapsed(), + lf_tag().microstep); + } else if (self->reaction_invoked_correctly == false) { + lf_print_error_and_exit("ERROR: Sender reaction(t, act) was not invoked at (1 usec, 1). " + "Stopping at " PRINTF_TAG ".", + lf_time_logical_elapsed(), + lf_tag().microstep); + } + lf_print("SUCCESS: Successfully stopped the federation at " PRINTF_TAG ".", + lf_time_logical_elapsed(), + lf_tag().microstep); + =} +} + +reactor Receiver( + // Used in the decentralized variant of the test + stp_offset: time = 10 msec) { + input in: int + state reaction_invoked_correctly: bool = false + + reaction(in) {= + lf_print("Received %d at " PRINTF_TAG ".", + in->value, + lf_time_logical_elapsed(), + lf_tag().microstep); + if (lf_time_logical_elapsed() == USEC(1)) { + lf_print("Requesting stop at " PRINTF_TAG ".", + lf_time_logical_elapsed(), + lf_tag().microstep); + lf_request_stop(); + // The receiver should receive a message at tag + // (1 usec, 1) and trigger this reaction + self->reaction_invoked_correctly = true; + } + + tag_t _1usec1 = (tag_t) { .time = USEC(1) + lf_time_start(), .microstep = 1u }; + if (lf_tag_compare(lf_tag(), _1usec1) > 0) { + self->reaction_invoked_correctly = false; + } + =} + + reaction(shutdown) {= + // Sender should have requested stop earlier than the receiver. + // Therefore, the shutdown events must occur at (1000, 0) on the + // receiver. + if (lf_time_logical_elapsed() != USEC(1) || + lf_tag().microstep != 1) { + lf_print_error_and_exit("Receiver failed to stop the federation at the right time. " + "Stopping at " PRINTF_TAG ".", + lf_time_logical_elapsed(), + lf_tag().microstep); + } else if (self->reaction_invoked_correctly == false) { + lf_print_error_and_exit("Receiver reaction(in) was not invoked the correct number of times. " + "Stopping at " PRINTF_TAG ".", + lf_time_logical_elapsed(), + lf_tag().microstep); + } + lf_print("SUCCESS: Successfully stopped the federation at " PRINTF_TAG ".", + lf_time_logical_elapsed(), + lf_tag().microstep); + =} +} + +federated reactor DistributedStop { + sender = new Sender() + receiver = new Receiver() + + sender.out -> receiver.in +} diff --git a/test/RustRti/src/federated/DistributedStopZero.lf b/test/RustRti/src/federated/DistributedStopZero.lf new file mode 100644 index 0000000000..876bd6f7f4 --- /dev/null +++ b/test/RustRti/src/federated/DistributedStopZero.lf @@ -0,0 +1,84 @@ +/** + * Test for lf_request_stop() in federated execution with centralized coordination at tag (0,0). + * + * @author Soroush Bateni + */ +target C + +reactor Sender { + output out: int + timer t(0, 1 usec) + + reaction(t) -> out {= + printf("Sending 42 at " PRINTF_TAG ".\n", + lf_time_logical_elapsed(), + lf_tag().microstep); + lf_set(out, 42); + + tag_t zero = (tag_t) { .time = lf_time_start(), .microstep = 0u }; + if (lf_tag_compare(lf_tag(), zero) == 0) { + // Request stop at (0,0) + printf("Requesting stop at " PRINTF_TAG ".\n", + lf_time_logical_elapsed(), + lf_tag().microstep); + lf_request_stop(); + } + =} + + reaction(shutdown) {= + if (lf_time_logical_elapsed() != USEC(0) || + lf_tag().microstep != 1) { + fprintf(stderr, "ERROR: Sender failed to stop the federation in time. " + "Stopping at " PRINTF_TAG ".\n", + lf_time_logical_elapsed(), + lf_tag().microstep); + exit(1); + } + printf("SUCCESS: Successfully stopped the federation at " PRINTF_TAG ".\n", + lf_time_logical_elapsed(), + lf_tag().microstep); + =} +} + +reactor Receiver { + input in: int + + reaction(in) {= + printf("Received %d at " PRINTF_TAG ".\n", + in->value, + lf_time_logical_elapsed(), + lf_tag().microstep); + tag_t zero = (tag_t) { .time = lf_time_start(), .microstep = 0u }; + if (lf_tag_compare(lf_tag(), zero) == 0) { + // Request stop at (0,0) + printf("Requesting stop at " PRINTF_TAG ".\n", + lf_time_logical_elapsed(), + lf_tag().microstep); + lf_request_stop(); + } + =} + + reaction(shutdown) {= + // Sender should have requested stop earlier than the receiver. + // Therefore, the shutdown events must occur at (0, 0) on the + // receiver. + if (lf_time_logical_elapsed() != USEC(0) || + lf_tag().microstep != 1) { + fprintf(stderr, "ERROR: Receiver failed to stop the federation in time. " + "Stopping at " PRINTF_TAG ".\n", + lf_time_logical_elapsed(), + lf_tag().microstep); + exit(1); + } + printf("SUCCESS: Successfully stopped the federation at " PRINTF_TAG ".\n", + lf_time_logical_elapsed(), + lf_tag().microstep); + =} +} + +federated reactor { + sender = new Sender() + receiver = new Receiver() + + sender.out -> receiver.in +} diff --git a/test/RustRti/src/federated/EnclaveFederatedRequestStop.lf b/test/RustRti/src/federated/EnclaveFederatedRequestStop.lf new file mode 100644 index 0000000000..0bed9e03d1 --- /dev/null +++ b/test/RustRti/src/federated/EnclaveFederatedRequestStop.lf @@ -0,0 +1,39 @@ +/** + * Test that enclaves within federates all stop at the time requested by the first enclave to + * request a stop. Note that the test has no timeout because any finite timeout can, in theory, + * cause the test to fail. The first federate to request a stop does no at 50 ms, so the program + * should terminate quickly if all goes well. + */ +target C + +reactor Stop( + // Zero value here means "don't stop". + stop_time: time = 0) { + preamble {= + #include "platform.h" // Defines PRINTF_TIME + =} + timer t(stop_time) + + reaction(t) {= + if (self->stop_time > 0) lf_request_stop(); + =} + + reaction(shutdown) {= + lf_print("Stopped at tag (" PRINTF_TIME ", %d)", lf_time_logical_elapsed(), lf_tag().microstep); + if (lf_time_logical_elapsed() != 50000000LL || lf_tag().microstep != 1) { + lf_print_error_and_exit("Expected stop tag to be (50ms, 1)."); + } + =} +} + +reactor Fed(least_stop_time: time = 0) { + @enclave + s1 = new Stop() + @enclave + s2 = new Stop(stop_time=least_stop_time) +} + +federated reactor { + f1 = new Fed() + f2 = new Fed(least_stop_time = 50 ms) +} diff --git a/test/RustRti/src/federated/FederatedFilePkgReader.lf b/test/RustRti/src/federated/FederatedFilePkgReader.lf new file mode 100644 index 0000000000..cf79291acf --- /dev/null +++ b/test/RustRti/src/federated/FederatedFilePkgReader.lf @@ -0,0 +1,57 @@ +/** Test reading a file at a location relative to the source file. */ +target C { + timeout: 0 s +} + +reactor Source { + output out: char* // Use char*, not string, so memory is freed. + + reaction(startup) -> out {= + char* file_path = + LF_PACKAGE_DIRECTORY + LF_FILE_SEPARATOR "src" + LF_FILE_SEPARATOR "lib" + LF_FILE_SEPARATOR "FileReader.txt"; + + FILE* file = fopen(file_path, "rb"); + if (file == NULL) lf_print_error_and_exit("Error opening file at path %s.", file_path); + + // Determine the file size + fseek(file, 0, SEEK_END); + long file_size = ftell(file); + fseek(file, 0, SEEK_SET); + + // Allocate memory for the buffer + char* buffer = (char *) malloc(file_size + 1); + if (buffer == NULL) lf_print_error_and_exit("Out of memory."); + + // Read the file into the buffer + fread(buffer, file_size, 1, file); + buffer[file_size] = '\0'; + fclose(file); + + // For federated version, have to use lf_set_array so array size is know + // to the serializer. + lf_set_array(out, buffer, file_size + 1); + =} +} + +reactor Check { + preamble {= + #include + =} + input in: char* + + reaction(in) {= + printf("Received: %s\n", in->value); + if (strcmp("Hello World", in->value) != 0) { + lf_print_error_and_exit("Expected 'Hello World'"); + } + =} +} + +federated reactor { + s = new Source() + c = new Check() + s.out -> c.in +} diff --git a/test/RustRti/src/federated/FederatedFileReader.lf b/test/RustRti/src/federated/FederatedFileReader.lf new file mode 100644 index 0000000000..617d34c3c8 --- /dev/null +++ b/test/RustRti/src/federated/FederatedFileReader.lf @@ -0,0 +1,66 @@ +/** Test reading a file at a location relative to the source file. */ +target C { + logging: DEBUG, + timeout: 0 s +} + +reactor Source { + output out: char* // Use char*, not string, so memory is freed. + + reaction(startup) -> out {= + char* file_path = + LF_SOURCE_DIRECTORY + LF_FILE_SEPARATOR ".." + LF_FILE_SEPARATOR "lib" + LF_FILE_SEPARATOR "FileReader.txt"; + + FILE* file = fopen(file_path, "rb"); + if (file == NULL) lf_print_error_and_exit("Error opening file at path %s.", file_path); + + // Determine the file size + fseek(file, 0, SEEK_END); + long file_size = ftell(file); + fseek(file, 0, SEEK_SET); + + // Allocate memory for the buffer + char* buffer = (char *) malloc(file_size + 1); + if (buffer == NULL) lf_print_error_and_exit("Out of memory."); + + // Read the file into the buffer + fread(buffer, file_size, 1, file); + buffer[file_size] = '\0'; + fclose(file); + + // For federated version, have to use lf_set_array so array size is know + // to the serializer. + lf_set_array(out, buffer, file_size + 1); + =} +} + +reactor Check { + preamble {= + #include + =} + input in: char* + state received: bool = false + + reaction(in) {= + printf("Received: %s\n", in->value); + self->received = true; + if (strcmp("Hello World", in->value) != 0) { + lf_print_error_and_exit("Expected 'Hello World'"); + } + =} + + reaction(shutdown) {= + if (!self->received) { + lf_print_error_and_exit("No input received."); + } + =} +} + +federated reactor { + s = new Source() + c = new Check() + s.out -> c.in +} diff --git a/test/RustRti/src/federated/FeedbackDelay.lf b/test/RustRti/src/federated/FeedbackDelay.lf new file mode 100644 index 0000000000..88b15945b2 --- /dev/null +++ b/test/RustRti/src/federated/FeedbackDelay.lf @@ -0,0 +1,85 @@ +/** + * This test has two coupled cycles. In this variant, one is a zero-delay cycle (ZDC) and the other + * is not, having a microstep delay. In this variant, the microstep delay is on a connection + * entering the ZDC. + */ +target C { + timeout: 1 sec +} + +reactor PhysicalPlant { + input control: double + output sensor: double + timer t(0, 100 ms) + state last_sensor_time: time = 0 + state previous_sensor_time: time = 0 + state count: int = 0 + + reaction(t) -> sensor {= + lf_set(sensor, 42); + self->previous_sensor_time = self->last_sensor_time; + self->last_sensor_time = lf_time_physical(); + =} + + reaction(control) {= + self->count++; + lf_print("Control input: %f", control->value); + instant_t control_time = lf_time_physical(); + lf_print("Latency: " PRINTF_TIME ".", control_time - self->previous_sensor_time); + lf_print("Logical time: " PRINTF_TIME ".", lf_time_logical_elapsed()); + =} + + reaction(shutdown) {= + if (self->count != 10) { + lf_print_error_and_exit("Received only %d inputs.", self->count); + } + =} +} + +reactor Controller { + input sensor: double + output control: double + + state latest_control: double = 0.0 + state first: bool = true + + output request_for_planning: double + input planning: double + + reaction(planning) {= + self->latest_control = planning->value; + tag_t now = lf_tag(); + lf_print("Controller received planning value %f at tag " PRINTF_TAG, + self->latest_control, now.time - lf_time_start(), now.microstep + ); + =} + + reaction(sensor) -> control, request_for_planning {= + if (!self->first) { + lf_set(control, self->latest_control); + } + self->first = false; + lf_set(request_for_planning, sensor->value); + =} +} + +reactor Planner { + input request: double + output response: double + + reaction(request) -> response {= + lf_sleep(MSEC(10)); + lf_set(response, request->value); + =} +} + +federated reactor { + p = new PhysicalPlant() + c = new Controller() + pl = new Planner() + + p.sensor -> c.sensor + c.request_for_planning -> pl.request + pl.response -> c.planning after 0 + c.control -> p.control +} diff --git a/test/RustRti/src/federated/FeedbackDelay3.lf b/test/RustRti/src/federated/FeedbackDelay3.lf new file mode 100644 index 0000000000..e7d47d9340 --- /dev/null +++ b/test/RustRti/src/federated/FeedbackDelay3.lf @@ -0,0 +1,41 @@ +/** This test has two coupled cycles. In this variant, both are a zero-delay cycles (ZDC). */ +target C { + timeout: 1 sec, + tracing: true +} + +import PhysicalPlant, Planner from "FeedbackDelay.lf" + +reactor Controller { + input sensor: double + output control: double + + state latest_control: double = 0.0 + state first: bool = true + + output request_for_planning: double + input planning: double + + reaction(sensor) -> control, request_for_planning {= + if (!self->first) { + lf_set(control, self->latest_control); + } + self->first = false; + lf_set(request_for_planning, sensor->value); + =} + + reaction(planning) {= + self->latest_control = planning->value; + =} +} + +federated reactor { + p = new PhysicalPlant() + c = new Controller() + pl = new Planner() + + p.sensor -> c.sensor + c.request_for_planning -> pl.request + pl.response -> c.planning + c.control -> p.control +} diff --git a/test/RustRti/src/federated/FeedbackDelay5.lf b/test/RustRti/src/federated/FeedbackDelay5.lf new file mode 100644 index 0000000000..cd7edcd051 --- /dev/null +++ b/test/RustRti/src/federated/FeedbackDelay5.lf @@ -0,0 +1,57 @@ +/** + * This test has two coupled cycles. In this variant, both are zero-delay cycles (ZDC), but one of + * the cycles has two superposed cycles, one of which is zero delay and the other of which is not. + */ +target C { + timeout: 900 ms +} + +import PhysicalPlant from "FeedbackDelay.lf" + +reactor Controller { + input in: double + input sensor: double + output control: double + + state latest_control: double = 0.0 + + output request_for_planning: double + input planning: double + + reaction(in, planning) {= + self->latest_control = planning->value; + =} + + reaction(sensor) -> control, request_for_planning {= + lf_set(control, self->latest_control); + lf_set(request_for_planning, sensor->value); + =} +} + +reactor Planner { + input request: double + output response: double + output out: double + timer t(0, 100 ms) + + reaction(t) -> out {= + lf_set(out, 0); + =} + + reaction(request) -> response {= + lf_sleep(MSEC(10)); + lf_set(response, request->value); + =} +} + +federated reactor { + p = new PhysicalPlant() + c = new Controller() + pl = new Planner() + + p.sensor -> c.sensor + c.request_for_planning -> pl.request + pl.response -> c.planning after 0 + c.control -> p.control + pl.out -> c.in +} diff --git a/test/RustRti/src/federated/FeedbackDelaySimple.lf b/test/RustRti/src/federated/FeedbackDelaySimple.lf new file mode 100644 index 0000000000..655fbe0762 --- /dev/null +++ b/test/RustRti/src/federated/FeedbackDelaySimple.lf @@ -0,0 +1,41 @@ +target C { + timeout: 1 sec +} + +reactor Loop { + input in: int + output out: int + timer t(0, 100 msec) + state count: int = 1 + + reaction(in) {= + lf_print("Received %d.", in->value); + if (in->value != self->count) { + lf_print_error_and_exit( + "Expected %d. Got %d.", + self->count, + in->value + ); + } + self->count++; + =} + + reaction(t) -> out {= + lf_set(out, self->count); + =} + + reaction(shutdown) {= + if (self->count != 11) { + lf_print_error_and_exit( + "Expected 11 messages. Got %d.", + self->count + ); + } + =} +} + +federated reactor { + l = new Loop() + + l.out -> l.in after 0 +} diff --git a/test/RustRti/src/federated/InheritanceFederated.lf b/test/RustRti/src/federated/InheritanceFederated.lf new file mode 100644 index 0000000000..90098b29bb --- /dev/null +++ b/test/RustRti/src/federated/InheritanceFederated.lf @@ -0,0 +1,23 @@ +// Test for inheritance in a federated program. +// Compilation without errors is success. +// Based on https://github.com/lf-lang/lingua-franca/issues/1733. +target C { + timeout: 1 ms +} + +reactor A { + reaction(startup) {= + printf("Hello\n"); + =} +} + +reactor B { + a = new A() +} + +reactor C extends B { +} + +federated reactor { + c = new C() +} diff --git a/test/RustRti/src/federated/LoopDistributedCentralized.lf b/test/RustRti/src/federated/LoopDistributedCentralized.lf new file mode 100644 index 0000000000..968ac2784e --- /dev/null +++ b/test/RustRti/src/federated/LoopDistributedCentralized.lf @@ -0,0 +1,48 @@ +/** + * This tests a feedback loop with physical actions and centralized coordination. + * + * @author Edward A. Lee + */ +target C { + coordination: centralized, + coordination-options: { + advance-message-interval: 100 msec + }, + timeout: 4 sec, + logging: DEBUG +} + +reactor Looper(incr: int = 1, delay: time = 0 msec) { + input in: int + output out: int + physical action a(delay) + state count: int = 0 + + timer t(0, 1 sec) + + reaction(t) -> out {= + lf_set(out, self->count); + self->count += self->incr; + =} + + reaction(in) {= + instant_t time_lag = lf_time_physical() - lf_time_logical(); + char time_buffer[28]; // 28 bytes is enough for the largest 64 bit number: 9,223,372,036,854,775,807 + lf_comma_separated_time(time_buffer, time_lag); + lf_print("Received %d. Logical time is behind physical time by %s nsec.", in->value, time_buffer); + =} + + reaction(shutdown) {= + lf_print("******* Shutdown invoked."); + if (self->count != 5 * self->incr) { + lf_print_error_and_exit("Failed to receive all five expected inputs."); + } + =} +} + +federated reactor LoopDistributedCentralized(delay: time = 0) { + left = new Looper() + right = new Looper(incr=-1) + left.out -> right.in + right.out -> left.in +} diff --git a/test/RustRti/src/federated/LoopDistributedCentralized2.lf b/test/RustRti/src/federated/LoopDistributedCentralized2.lf new file mode 100644 index 0000000000..25de5873e2 --- /dev/null +++ b/test/RustRti/src/federated/LoopDistributedCentralized2.lf @@ -0,0 +1,75 @@ +/** + * This tests a feedback loop with physical actions and centralized coordination. + * + * @author Edward A. Lee + */ +target C { + coordination: centralized, + coordination-options: { + advance-message-interval: 100 msec + }, + timeout: 4 sec +} + +reactor Looper(incr: int = 1, delay: time = 0 msec) { + input in: int + output out: int + physical action a(delay) + state count: int = 0 + + timer t(0, 1 sec) + + reaction(t) -> out {= + lf_set(out, self->count); + self->count += self->incr; + =} + + reaction(in) {= + instant_t time_lag = lf_time_physical() - lf_time_logical(); + char time_buffer[28]; // 28 bytes is enough for the largest 64 bit number: 9,223,372,036,854,775,807 + lf_comma_separated_time(time_buffer, time_lag); + lf_print("Received %d. Logical time is behind physical time by %s nsec.", in->value, time_buffer); + =} + + reaction(shutdown) {= + lf_print("******* Shutdown invoked."); + if (self->count != 5 * self->incr) { + lf_print_error_and_exit("Failed to receive all five expected inputs."); + } + =} +} + +reactor Looper2(incr: int = 1, delay: time = 0 msec) { + input in: int + output out: int + physical action a(delay) + state count: int = 0 + + timer t(0, 1 sec) + + reaction(in) {= + instant_t time_lag = lf_time_physical() - lf_time_logical(); + char time_buffer[28]; // 28 bytes is enough for the largest 64 bit number: 9,223,372,036,854,775,807 + lf_comma_separated_time(time_buffer, time_lag); + lf_print("Received %d. Logical time is behind physical time by %s nsec.", in->value, time_buffer); + =} + + reaction(t) -> out {= + lf_set(out, self->count); + self->count += self->incr; + =} + + reaction(shutdown) {= + lf_print("******* Shutdown invoked."); + if (self->count != 5 * self->incr) { + lf_print_error_and_exit("Failed to receive all five expected inputs."); + } + =} +} + +federated reactor(delay: time = 0) { + left = new Looper() + right = new Looper2(incr=-1) + left.out -> right.in + right.out -> left.in +} diff --git a/test/RustRti/src/federated/LoopDistributedCentralizedPhysicalAction.lf b/test/RustRti/src/federated/LoopDistributedCentralizedPhysicalAction.lf new file mode 100644 index 0000000000..ac783f07cc --- /dev/null +++ b/test/RustRti/src/federated/LoopDistributedCentralizedPhysicalAction.lf @@ -0,0 +1,74 @@ +/** + * This tests a feedback loop with physical actions and centralized coordination. + * + * @author Edward A. Lee + */ +target C { + flags: "-Wall", + coordination: centralized, + coordination-options: { + advance-message-interval: 100 msec + }, + timeout: 5 sec, + logging: warn +} + +preamble {= + #include // Defines sleep() + extern bool stop; + void* ping(void* actionref); +=} + +reactor Looper(incr: int = 1, delay: time = 0 msec) { + preamble {= + bool stop = false; + // Thread to trigger an action once every second. + void* ping(void* actionref) { + while(!stop) { + lf_print("Scheduling action."); + lf_schedule(actionref, 0); + sleep(1); + } + return NULL; + } + =} + input in: int + output out: int + physical action a(delay) + state count: int = 0 + + reaction(startup) -> a {= + // Start the thread that listens for Enter or Return. + lf_thread_t thread_id; + lf_print("Starting thread."); + lf_thread_create(&thread_id, &ping, a); + =} + + reaction(a) -> out {= + lf_set(out, self->count); + self->count += self->incr; + =} + + reaction(in) {= + instant_t time_lag = lf_time_physical() - lf_time_logical(); + char time_buffer[28]; // 28 bytes is enough for the largest 64 bit number: 9,223,372,036,854,775,807 + lf_comma_separated_time(time_buffer, time_lag); + lf_print("Received %d. Logical time is behind physical time by %s nsec.", in->value, time_buffer); + =} + + reaction(shutdown) {= + lf_print("******* Shutdown invoked."); + // Stop the thread that is scheduling actions. + stop = true; + if (self->count != 5 * self->incr) { + lf_print_error_and_exit("Failed to receive all five expected inputs."); + } + =} +} + +federated reactor(delay: time = 0) { + left = new Looper() + right = new Looper(incr=-1) + left.out -> right.in + right.out -> left.in +} diff --git a/test/RustRti/src/federated/LoopDistributedCentralizedPrecedence.lf b/test/RustRti/src/federated/LoopDistributedCentralizedPrecedence.lf new file mode 100644 index 0000000000..51a10faac2 --- /dev/null +++ b/test/RustRti/src/federated/LoopDistributedCentralizedPrecedence.lf @@ -0,0 +1,56 @@ +/** + * This tests that the precedence order of reaction invocation is kept when a feedback loop is + * present in centralized coordination. + * + * @author Edward A. Lee + * @author Soroush Bateni + */ +target C { + flags: "-Wall", + coordination: centralized, + coordination-options: { + advance-message-interval: 100 msec + }, + timeout: 5 sec +} + +reactor Looper(incr: int = 1, delay: time = 0 msec) { + input in: int + output out: int + state count: int = 0 + state received_count: int = 0 + timer t(0, 1 sec) + + reaction(t) -> out {= + lf_set(out, self->count); + self->count += self->incr; + =} + + reaction(in) {= + instant_t time_lag = lf_time_physical() - lf_time_logical(); + char time_buffer[28]; // 28 bytes is enough for the largest 64 bit number: 9,223,372,036,854,775,807 + lf_comma_separated_time(time_buffer, time_lag); + lf_print("Received %d. Logical time is behind physical time by %s nsec.", in->value, time_buffer); + self->received_count = self->count; + =} + + reaction(t) {= + if (self->received_count != self->count) { + lf_print_error_and_exit("reaction(t) was invoked before reaction(in). Precedence order was not kept."); + } + =} + + reaction(shutdown) {= + lf_print("******* Shutdown invoked."); + if (self->count != 6 * self->incr) { + lf_print_error_and_exit("Failed to receive all six expected inputs."); + } + =} +} + +federated reactor(delay: time = 0) { + left = new Looper() + right = new Looper(incr=-1) + left.out -> right.in + right.out -> left.in +} diff --git a/test/RustRti/src/federated/LoopDistributedCentralizedPrecedenceHierarchy.lf b/test/RustRti/src/federated/LoopDistributedCentralizedPrecedenceHierarchy.lf new file mode 100644 index 0000000000..82adfca699 --- /dev/null +++ b/test/RustRti/src/federated/LoopDistributedCentralizedPrecedenceHierarchy.lf @@ -0,0 +1,73 @@ +/** + * This tests that the precedence order of reaction invocation is kept in the hierarchy of reactors + * when a feedback loop is present in centralized coordination. + * + * @author Edward A. Lee + * @author Soroush Bateni + */ +target C { + flags: "-Wall", + coordination: centralized, + coordination-options: { + advance-message-interval: 100 msec + }, + timeout: 5 sec +} + +reactor Contained(incr: int = 1) { + timer t(0, 1 sec) + input in: int + state count: int = 0 + state received_count: int = 0 + + reaction(t) {= + self->count += self->incr; + =} + + reaction(in) {= + self->received_count = self->count; + =} + + reaction(t) {= + if (self->received_count != self->count) { + lf_print_error_and_exit("reaction(t) was invoked before reaction(in). Precedence order was not kept."); + } + =} +} + +reactor Looper(incr: int = 1, delay: time = 0 msec) { + input in: int + output out: int + state count: int = 0 + timer t(0, 1 sec) + + c = new Contained(incr=incr) + in -> c.in + + reaction(t) -> out {= + lf_print("Sending network output %d", self->count); + lf_set(out, self->count); + self->count += self->incr; + =} + + reaction(in) {= + instant_t time_lag = lf_time_physical() - lf_time_logical(); + char time_buffer[28]; // 28 bytes is enough for the largest 64 bit number: 9,223,372,036,854,775,807 + lf_comma_separated_time(time_buffer, time_lag); + lf_print("Received %d. Logical time is behind physical time by %s nsec.", in->value, time_buffer); + =} + + reaction(shutdown) {= + lf_print("******* Shutdown invoked."); + if (self->count != 6 * self->incr) { + lf_print_error_and_exit("Failed to receive all six expected inputs."); + } + =} +} + +federated reactor(delay: time = 0) { + left = new Looper() + right = new Looper(incr=-1) + left.out -> right.in + right.out -> left.in +} diff --git a/test/RustRti/src/federated/ParallelDestinations.lf b/test/RustRti/src/federated/ParallelDestinations.lf new file mode 100644 index 0000000000..a4a4c026db --- /dev/null +++ b/test/RustRti/src/federated/ParallelDestinations.lf @@ -0,0 +1,23 @@ +/** Test parallel connections for federated execution. */ +target C { + timeout: 2 sec +} + +import Count from "../lib/Count.lf" +import TestCount from "../lib/TestCount.lf" + +reactor Source { + output[2] out: int + c1 = new Count() + c2 = new Count() + + c1.out, c2.out -> out +} + +federated reactor { + s = new Source() + t1 = new TestCount(num_inputs=3) + t2 = new TestCount(num_inputs=3) + + s.out -> t1.in, t2.in +} diff --git a/test/RustRti/src/federated/ParallelSources.lf b/test/RustRti/src/federated/ParallelSources.lf new file mode 100644 index 0000000000..0bedc87d68 --- /dev/null +++ b/test/RustRti/src/federated/ParallelSources.lf @@ -0,0 +1,24 @@ +/** Test parallel connections for federated execution. */ +target C { + timeout: 2 sec +} + +import Count from "../lib/Count.lf" +import TestCount from "../lib/TestCount.lf" + +reactor Destination { + input[2] in: int + + t1 = new TestCount(num_inputs=3) + t2 = new TestCount(num_inputs=3) + + in -> t1.in, t2.in +} + +federated reactor { + c1 = new Count() + c2 = new Count() + d = new Destination() + + c1.out, c2.out -> d.in +} diff --git a/test/RustRti/src/federated/ParallelSourcesMultiport.lf b/test/RustRti/src/federated/ParallelSourcesMultiport.lf new file mode 100644 index 0000000000..026c223463 --- /dev/null +++ b/test/RustRti/src/federated/ParallelSourcesMultiport.lf @@ -0,0 +1,34 @@ +/** Test parallel connections for federated execution. */ +target C { + timeout: 2 sec +} + +import Count from "../lib/Count.lf" +import TestCount from "../lib/TestCount.lf" + +reactor Source { + output[2] out: int + c1 = new Count() + c2 = new Count() + + c1.out, c2.out -> out +} + +reactor Destination1 { + input[3] in: int + + t1 = new TestCount(num_inputs=3) + t2 = new TestCount(num_inputs=3) + t3 = new TestCount(num_inputs=3) + + in -> t1.in, t2.in, t3.in +} + +federated reactor { + s1 = new Source() + s2 = new Source() + d1 = new Destination1() + t4 = new TestCount(num_inputs=3) + + s1.out, s2.out -> d1.in, t4.in +} diff --git a/test/RustRti/src/federated/SpuriousDependency.lf b/test/RustRti/src/federated/SpuriousDependency.lf new file mode 100644 index 0000000000..b810d5288f --- /dev/null +++ b/test/RustRti/src/federated/SpuriousDependency.lf @@ -0,0 +1,63 @@ +/** + * This checks that a federated program does not deadlock when it is ambiguous, given the structure + * of a federate, whether it is permissible to require certain network sender/receiver reactions to + * precede others in the execution of a given tag. + */ +target C { + timeout: 1 sec +} + +reactor Passthrough(id: int = 0) { + input in: int + output out: int + + reaction(in) -> out {= + lf_print("Hello from passthrough %d", self->id); + lf_set(out, in->value); + =} +} + +reactor Twisty { + input in0: int + input in1: int + output out0: int + output out1: int + p0 = new Passthrough(id=0) + p1 = new Passthrough(id=1) + in0 -> p0.in + p0.out -> out0 + in1 -> p1.in + p1.out -> out1 +} + +reactor Check { + input in: int + + state count: int = 0 + + reaction(in) {= + lf_print("count is now %d", ++self->count); + =} + + reaction(shutdown) {= + lf_print("******* Shutdown invoked."); + if (self->count != 1) { + lf_print_error_and_exit("Failed to receive expected input."); + } + =} +} + +federated reactor { + t0 = new Twisty() + t1 = new Twisty() + check = new Check() + t0.out1 -> t1.in0 + t1.out1 -> t0.in0 + state count: int = 0 + + t1.out0 -> check.in + + reaction(startup) -> t0.in1 {= + lf_set(t0.in1, 0); + =} +} diff --git a/test/RustRti/src/federated/TopLevelArtifacts.lf b/test/RustRti/src/federated/TopLevelArtifacts.lf new file mode 100644 index 0000000000..d73ea35967 --- /dev/null +++ b/test/RustRti/src/federated/TopLevelArtifacts.lf @@ -0,0 +1,44 @@ +/** + * Test whether top-level reactions, actions, and ports are handled appropriately. + * + * Currently, these artifacts are replicated on all federates. + * + * @note This just tests for the correctness of the code generation. These top-level artifacts might + * be disallowed in the future. + */ +target C { + timeout: 1 msec +} + +import Count from "../lib/Count.lf" +import TestCount from "../lib/TestCount.lf" + +federated reactor { + state successes: int = 0 + timer t(0, 1 sec) + logical action act(0) + + c = new Count() + tc = new TestCount() + c.out -> tc.in + + reaction(startup) {= + self->successes++; + =} + + reaction(t) -> act {= + self->successes++; + lf_schedule(act, 0); + =} + + reaction(act) {= + self->successes++; + =} + + reaction(shutdown) {= + if (self->successes != 3) { + lf_print_error_and_exit("Failed to properly execute top-level reactions"); + } + lf_print("SUCCESS!"); + =} +} diff --git a/test/RustRti/src/lib/Count.lf b/test/RustRti/src/lib/Count.lf new file mode 100644 index 0000000000..ee3953b021 --- /dev/null +++ b/test/RustRti/src/lib/Count.lf @@ -0,0 +1,11 @@ +target C + +reactor Count(offset: time = 0, period: time = 1 sec) { + state count: int = 1 + output out: int + timer t(offset, period) + + reaction(t) -> out {= + lf_set(out, self->count++); + =} +} diff --git a/test/RustRti/src/lib/FileLevelPreamble.lf b/test/RustRti/src/lib/FileLevelPreamble.lf new file mode 100644 index 0000000000..11067d5e63 --- /dev/null +++ b/test/RustRti/src/lib/FileLevelPreamble.lf @@ -0,0 +1,12 @@ +/** Test for ensuring that file-level preambles are inherited when a file is imported. */ +target C + +preamble {= + #define FOO 2 +=} + +reactor FileLevelPreamble { + reaction(startup) {= + printf("FOO: %d\n", FOO); + =} +} diff --git a/test/RustRti/src/lib/FileReader.txt b/test/RustRti/src/lib/FileReader.txt new file mode 100644 index 0000000000..5e1c309dae --- /dev/null +++ b/test/RustRti/src/lib/FileReader.txt @@ -0,0 +1 @@ +Hello World \ No newline at end of file diff --git a/test/RustRti/src/lib/GenDelay.lf b/test/RustRti/src/lib/GenDelay.lf new file mode 100644 index 0000000000..8f21c3de1b --- /dev/null +++ b/test/RustRti/src/lib/GenDelay.lf @@ -0,0 +1,21 @@ +target C + +preamble {= + typedef int message_t; +=} + +reactor Source { + output out: message_t + + reaction(startup) -> out {= + lf_set(out, 42); + =} +} + +reactor Sink { + input in: message_t + + reaction(in) {= + lf_print("Received %d at time %lld", in->value, lf_time_logical_elapsed()); + =} +} diff --git a/test/RustRti/src/lib/Imported.lf b/test/RustRti/src/lib/Imported.lf new file mode 100644 index 0000000000..85d0a2b493 --- /dev/null +++ b/test/RustRti/src/lib/Imported.lf @@ -0,0 +1,14 @@ +// This is used by the test for the ability to import a reactor definition that itself imports a +// reactor definition. +target C + +import ImportedAgain from "./ImportedAgain.lf" + +reactor Imported { + input x: int + a = new ImportedAgain() + + reaction(x) -> a.x {= + lf_set(a.x, x->value); + =} +} diff --git a/test/RustRti/src/lib/ImportedAgain.lf b/test/RustRti/src/lib/ImportedAgain.lf new file mode 100644 index 0000000000..6870526b95 --- /dev/null +++ b/test/RustRti/src/lib/ImportedAgain.lf @@ -0,0 +1,15 @@ +// This is used by the test for the ability to import a reactor definition that itself imports a +// reactor definition. +target C + +reactor ImportedAgain { + input x: int + + reaction(x) {= + printf("Received: %d.\n", x->value); + if (x->value != 42) { + printf("ERROR: Expected input to be 42. Got: %d.\n", x->value); + exit(1); + } + =} +} diff --git a/test/RustRti/src/lib/ImportedComposition.lf b/test/RustRti/src/lib/ImportedComposition.lf new file mode 100644 index 0000000000..e5524f3d22 --- /dev/null +++ b/test/RustRti/src/lib/ImportedComposition.lf @@ -0,0 +1,22 @@ +// This is used by the test for the ability to import a reactor definition that itself imports a +// reactor definition. +target C + +reactor Gain { + input x: int + output y: int + + reaction(x) -> y {= + lf_set(y, x->value * 2); + =} +} + +reactor ImportedComposition { + input x: int + output y: int + g1 = new Gain() + g2 = new Gain() + x -> g1.x after 10 msec + g1.y -> g2.x after 30 msec + g2.y -> y after 15 msec +} diff --git a/test/RustRti/src/lib/InternalDelay.lf b/test/RustRti/src/lib/InternalDelay.lf new file mode 100644 index 0000000000..fb7124a4ec --- /dev/null +++ b/test/RustRti/src/lib/InternalDelay.lf @@ -0,0 +1,15 @@ +target C + +reactor InternalDelay(delay: time = 10 msec) { + input in: int + output out: int + logical action d: int + + reaction(in) -> d {= + lf_schedule_int(d, self->delay, in->value); + =} + + reaction(d) -> out {= + lf_set(out, d->value); + =} +} diff --git a/test/RustRti/src/lib/LoopedActionSender.lf b/test/RustRti/src/lib/LoopedActionSender.lf new file mode 100644 index 0000000000..e9ea36f40a --- /dev/null +++ b/test/RustRti/src/lib/LoopedActionSender.lf @@ -0,0 +1,36 @@ +/** + * A sender reactor that outputs integers in superdense time. + * + * @author Soroush Bateni + */ +target C + +/** + * @param take_a_break_after: Indicates how many messages are sent in consecutive superdense time + * @param break_interval: Determines how long the reactor should take a break after sending + * take_a_break_after messages. + */ +reactor Sender(take_a_break_after: int = 10, break_interval: time = 400 msec) { + output out: int + logical action act + state sent_messages: int = 0 + + reaction(startup, act) -> act, out {= + // Send a message on out + /* printf("At tag (%lld, %u) sending value %d.\n", + lf_time_logical_elapsed(), + lf_tag().microstep, + self->sent_messages + ); */ + lf_set(out, self->sent_messages); + lf_print("Sender sent %d.", self->sent_messages); + self->sent_messages++; + if (self->sent_messages < self->take_a_break_after) { + lf_schedule(act, 0); + } else { + // Take a break + self->sent_messages=0; + lf_schedule(act, self->break_interval); + } + =} +} diff --git a/test/RustRti/src/lib/PassThrough.lf b/test/RustRti/src/lib/PassThrough.lf new file mode 100644 index 0000000000..389905489a --- /dev/null +++ b/test/RustRti/src/lib/PassThrough.lf @@ -0,0 +1,11 @@ +/** Forward the integer input on `in` to the output port `out`. */ +target C + +reactor PassThrough { + input in: int + output out: int + + reaction(in) -> out {= + lf_set(out, in->value); + =} +} diff --git a/test/RustRti/src/lib/Test.lf b/test/RustRti/src/lib/Test.lf new file mode 100644 index 0000000000..69e4f79b2c --- /dev/null +++ b/test/RustRti/src/lib/Test.lf @@ -0,0 +1,15 @@ +target C + +reactor TestDouble(expected: double[] = {1.0, 1.0, 1.0, 1.0}) { + input in: double + state count: int = 0 + + reaction(in) {= + printf("Received: %f\n", in->value); + if (in->value != self->expected[self->count]) { + printf("ERROR: Expected %f.\n", self->expected[self->count]); + exit(1); + } + self->count++; + =} +} diff --git a/test/RustRti/src/lib/TestCount.lf b/test/RustRti/src/lib/TestCount.lf new file mode 100644 index 0000000000..e4fbb82b02 --- /dev/null +++ b/test/RustRti/src/lib/TestCount.lf @@ -0,0 +1,34 @@ +/** + * Test that a counting sequence of inputs starts with the specified start parameter value, + * increments by the specified stride, and receives the specified number of inputs. + * + * @param start The starting value for the expected inputs. Default is 1. + * @param stride The increment for the inputs. Default is 1. + * @param num_inputs The number of inputs expected. Default is 1. + */ +target C + +reactor TestCount(start: int = 1, stride: int = 1, num_inputs: int = 1) { + state count: int = start + state inputs_received: int = 0 + input in: int + + reaction(in) {= + lf_print("Received %d.", in->value); + if (in->value != self->count) { + lf_print_error_and_exit("Expected %d.", self->count); + } + self->count += self->stride; + self->inputs_received++; + =} + + reaction(shutdown) {= + lf_print("Shutdown invoked."); + if (self->inputs_received != self->num_inputs) { + lf_print_error_and_exit("Expected to receive %d inputs, but got %d.", + self->num_inputs, + self->inputs_received + ); + } + =} +} diff --git a/test/RustRti/src/lib/TestCountMultiport.lf b/test/RustRti/src/lib/TestCountMultiport.lf new file mode 100644 index 0000000000..a0b0db294d --- /dev/null +++ b/test/RustRti/src/lib/TestCountMultiport.lf @@ -0,0 +1,41 @@ +/** + * Test that a counting sequence of inputs starts with the specified start parameter value, + * increments by the specified stride, and receives the specified number of inputs. This version has + * a multiport input, and each input is expected to be present and incremented over the previous + * input. + * + * @param start The starting value for the expected inputs. Default is 1. + * @param stride The increment for the inputs. Default is 1. + * @param num_inputs The number of inputs expected on each channel. Default is 1. + */ +target C + +reactor TestCountMultiport(start: int = 1, stride: int = 1, num_inputs: int = 1, width: int = 2) { + state count: int = start + state inputs_received: int = 0 + input[width] in: int + + reaction(in) {= + for (int i = 0; i < in_width; i++) { + if (!in[i]->is_present) { + lf_print_error_and_exit("No input on channel %d.", i); + } + lf_print("Received %d on channel %d.", in[i]->value, i); + if (in[i]->value != self->count) { + lf_print_error_and_exit("Expected %d.", self->count); + } + self->count += self->stride; + } + self->inputs_received++; + =} + + reaction(shutdown) {= + lf_print("Shutdown invoked."); + if (self->inputs_received != self->num_inputs) { + lf_print_error_and_exit("Expected to receive %d inputs, but only got %d.", + self->num_inputs, + self->inputs_received + ); + } + =} +}