Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Removal of the "waiting for tag" field #266

Merged
merged 2 commits into from
Sep 2, 2023
Merged

Conversation

petervdonovan
Copy link
Contributor

@petervdonovan petervdonovan commented Aug 29, 2023

I think that this fixes a bug which was reported over email.

When a federate sends a NET for a certain tag, and it has upstream federates, we incorrectly assumed that the federate needs to receive a TAG or PTAG in order to allow it to proceed to that NET.

This assumption is incorrect because after sending the NET, the federate might receive a message from an upstream federate; that message might lead to a newly lowered NET.

The bug that resulted from the incorrect assumption is that the federate would not proceed to execute the event enabled by the message it received after sending the NET. This is because the federate does not realize that it has anything to do; it still thinks that it cannot do anything until it receives a TAG or PTAG that enables it to process the NET. Therefore, the federation deadlocks.

The fix is to compute a new next event tag whenever the event queue changes.

For my own sake, I also point out that this issue appears to have been in the code for about 2 years, so I do not think it was introduced by the changes I made in the last few months.

The original minimal reproduction is as follows:

/**
 * This program models a redundant fault tolerant system where a primary node, if and when it fails,
 * is replaced by a backup node. The protocol is described in this paper:
 *
 * Bjarne Johansson; Mats Rågberger; Alessandro V. Papadopoulos; Thomas Nolte, "Consistency Before
 * Availability: Network Reference Point based Failure Detection for Controller Redundancy," paper
 * draft 8/15/23.
 *
 * The key idea in this protocol is that when a backup fails to detect the heartbeats of a primary
 * node, it becomes primary only if it has access to Network Reference Point (NRP), which is a point
 * in the network. This way, if the network becomes partitioned, only a backup that is on the side
 * of the partition that still has access to the NRP can become a primary. If a primary loses access
 * to the NRP, then it relinquishes its primary role because it is now on the wrong side of a
 * network partition. A backup on the right side of the partition will take over.
 *
 * @author Edward A. Lee
 * @author Marjan Sirjani
 */
target C {
  timeout: 10 s,
  logging: debug
}

preamble {=
  #ifndef NRF_FD
  #define NRF_FD
  #include "platform.h" // Defines PRINTF_TIME
  enum message_type {
    heartbeat,
    pingNRP,
    pingNRP_response,
    request_new_NRP,
    new_NRP
  };
  typedef struct message_t {
    enum message_type type;
    int source;
    int destination;
  } message_t;
  #endif // NRF_FD
=}

reactor Node(
    id: int = 0,
    heartbeat_period: time = 1 s,
    max_missed_heartbeats: int = 2,
    fails_at_time: time = 0,      // For testing. 0 for no failure.
    ping_timeout: time = 500 ms,  // Time until ping is deemed to have failed.
    // Time until new NRP request is deemed to have failed.
    nrp_timeout: time = 500 ms) {
  // There are two network interfaces:
  @side("east")
  input in1: message_t
  @side("east")
  input in2: message_t
  output out1: message_t
  output out2: message_t

  timer node_fails(fails_at_time)

  state heartbeats_missed_1: int = 0
  state heartbeats_missed_2: int = 0

  state NRP_network: int = 1
  state NRP_switch_id: int = 1
  state NRP_pending: bool = true
  state become_primary_on_ping_response: bool = false
  logical action new_NRP_request_timed_out(nrp_timeout)

  state primary: int = 0  // The known primary node.

  state ping_pending: bool = false
  logical action ping_timed_out(ping_timeout)

  initial mode Waiting {
    reaction(startup) -> out1, out2 {=
      // NOTE: The paper does not specify how to select the initial NRP.
      // Here, we send with destination 0, which the switches interpret as first to respond.
      // First to respond will be id 1 at startup; then wait for a reply before
      // actually becoming the primary or backup.
      message_t ping_message ={pingNRP, self->id, 0};
      lf_set(out1, ping_message);
    =}

    reaction(in1, in2) -> out1, out2, reset(Backup), reset(Primary) {=
      // Got a response to the ping from one or both switches.
      // NOTE: The paper calls for user intervention to select which is primary.
      // Here, we just choose id 1 to be primary.
      self->primary = 1;
      if (self->id == 1) {
        // Become primary.
        lf_set_mode(Primary);
        if (in1->is_present && in1->value.type == pingNRP_response) {
          lf_print(PRINTF_TIME ": Primary node %d received ping response on network 1. Making switch %d the NRP.", lf_time_logical_elapsed(), self->id, in1->value.source);
          self->NRP_network = 1;
          self->NRP_switch_id = in1->value.source;
          self->NRP_pending = false;
          // Notify the backup of the NRP.
          message_t message = {new_NRP, in1->value.source, 0};
          lf_set(out1, message);
        } else if (in2->is_present && in2->value.type == pingNRP) {
          lf_print(PRINTF_TIME ": Primary node %d received ping response on network 2. Making switch %d the NRP.", lf_time_logical_elapsed(), self->id, in2->value.source);
          self->NRP_network = 2;
          self->NRP_switch_id = in2->value.source;
          self->NRP_pending = false;
          // Notify the backup of the NRP.
          message_t message = {new_NRP, in1->value.source, 0};
          lf_set(out2, message);
        }
      } else {
        lf_set_mode(Backup);
      }
    =}
  }

  mode Backup {
    timer t(heartbeat_period, heartbeat_period)
    // FIXME: Need SENDIMHERETOPRIMARY with "longer interval"
    reaction(reset) {= lf_print(PRINTF_TIME ": ---- Node %d becomes backup.", lf_time_logical_elapsed(), self->id); =}

    reaction(node_fails) -> reset(Failed) {=
      if(lf_time_logical_elapsed() > 0LL) lf_set_mode(Failed);
    =}

    reaction(in1) -> reset(Primary) {=
      if (in1->value.type == heartbeat) {
        lf_print(PRINTF_TIME ": Backup node %d received heartbeat from node %d on network 1.", lf_time_logical_elapsed(), self->id, in1->value.source);
        self->heartbeats_missed_1 = 0;
      } else if (in1->value.type == pingNRP_response && in1->value.destination == self->id) {
        // Got a response from the NRP to a ping we sent after a partial or complete timeout.
        lf_print(PRINTF_TIME ": Backup node %d received ping response on network 1 from NRP on switch %d.", lf_time_logical_elapsed(), self->id, in1->value.source);
        // If there was a timeout on both networks that was not simultaneous, then
        // we tried pinging the NRP before becoming primary.
        if (self->become_primary_on_ping_response) {
           lf_set_mode(Primary);
           self->become_primary_on_ping_response = false;
        }
        self->ping_pending = false;
      } else if (in1->value.type == new_NRP) {
        // FIXME: Ping the new NRP and send confirmation back to primary.
        self->NRP_network = 1;
        self->NRP_switch_id = in1->value.source;
        self->NRP_pending = false;
      }
    =}

    reaction(in2) -> reset(Primary) {=
      if (in2->value.type == heartbeat) {
        lf_print(PRINTF_TIME ": Backup node %d received heartbeat from node %d on network 2.", lf_time_logical_elapsed(), self->id, in2->value.source);
        self->heartbeats_missed_2 = 0;
      } else if (in2->value.type == pingNRP_response && in2->value.destination == self->id) {
        // Got a response from the NRP to a ping we sent after a partial timeout.
        lf_print(PRINTF_TIME ": Backup node %d received ping response on network 2 from NRP on switch %d.", lf_time_logical_elapsed(), self->id, in2->value.source);
        self->ping_pending = false;
        // If there was a timeout on both networks that was not simultaneous, then
        // we tried pinging the NRP before becoming primary.
        if (self->become_primary_on_ping_response) {
           lf_set_mode(Primary);
           self->become_primary_on_ping_response = false;
        }
      } else if (in2->value.type == new_NRP) {
        // FIXME: Ping the new NRP and send confirmation back to primary.
        self->NRP_network = 2;
        self->NRP_switch_id = in2->value.source;
        self->NRP_pending = false;
      }
    =}

    reaction(t) -> reset(Primary), out1, out2, ping_timed_out {=
      if (self->heartbeats_missed_1 > self->max_missed_heartbeats
          && self->heartbeats_missed_2 > self->max_missed_heartbeats) {
        // Simultaneous heartbeat misses.
        // In the paper, this is tmoAllNotSimul.
        // For the tmoAllSimul optimization in the paper, we assume that if
        // self->heartbeats_missed_1 == self->heartbeats_missed_2, then most likely, it is
        // the primary that failed, and not the network, so can immediately become the primary.
        // Otherwise, it is possible that one network failed, and then the other failed, in which
        // case, we may have a partitioned network.
        lf_print(PRINTF_TIME ": **** Backup node %d detects missing heartbeats on both networks.", lf_time_logical_elapsed(), self->id);
        if (self->heartbeats_missed_1 == self->heartbeats_missed_2) {
          lf_print(PRINTF_TIME ": **** Missing heartbeats on both networks were simultaneous. Assume the primary failed.", lf_time_logical_elapsed());
          lf_set_mode(Primary);
        } else {
          // Ping the NRP because if we can't access it, we are on the wrong side of
          // a network partition and could end up with two primaries.
          message_t message = {pingNRP, self->id, self->NRP_switch_id};
          if (self->NRP_network == 1) {
            lf_set(out1, message);
          } else {
            lf_set(out2, message);
          }
          // Wait for a response before becoming primary.
          self->become_primary_on_ping_response = true;
          lf_schedule(ping_timed_out, 0);
        }
        self->heartbeats_missed_1 = 0; // Prevent detecting again immediately.
        self->heartbeats_missed_2 = 0;
      } else if (self->heartbeats_missed_1 > self->max_missed_heartbeats
          || self->heartbeats_missed_2 > self->max_missed_heartbeats) {
        // Heartbeat missed on one network but not yet on the other.
        // Ping the NRP to make sure we retain access to it so that we can be an effective backup.
        // This corresponds to tmoSomeNotAll in the paper.
        lf_print(PRINTF_TIME ": **** Backup node %d detects missing heartbeats on one network.", lf_time_logical_elapsed(), self->id);
        // Ping the NRP.
        message_t message = {pingNRP, self->id, self->NRP_switch_id};
        if (!self->ping_pending && !self->NRP_pending) {
          if (self->NRP_network == 1) {
            lf_set(out1, message);
          } else {
            lf_set(out2, message);
          }
          lf_print(PRINTF_TIME ": Backup node %d pings NRP on network %d, switch %d", lf_time_logical_elapsed(), self->id, self->NRP_network, self->NRP_switch_id);
          self->ping_pending = true;
          lf_schedule(ping_timed_out, 0);
        }
      }
      // Increment the counters so if they are not reset to 0 by the next time,
      // we detect the missed heartbeat.
      self->heartbeats_missed_1++;
      self->heartbeats_missed_2++;
    =}

    reaction(ping_timed_out) -> out1, out2, new_NRP_request_timed_out {=
      if (self->ping_pending) {
        // Ping timed out.
        lf_print(PRINTF_TIME ": Backup node %d gets no response from ping.", lf_time_logical_elapsed(), self->id);
        if (!self->NRP_pending) {
          // Send request for new NRP on the other network.
          lf_print(PRINTF_TIME ": Backup node %d requests new NRP.", lf_time_logical_elapsed(), self->id);
          message_t message = {request_new_NRP, self->id, self->primary};
          if (self->NRP_network == 1) {
            // Use network 2.
            lf_set(out2, message);
          } else {
            lf_set(out1, message);
          }
          self->NRP_pending = true;
          lf_schedule(new_NRP_request_timed_out, 0);
        }
        self->ping_pending = false;
      }
    =}

    reaction(new_NRP_request_timed_out) {=
      if (self->NRP_pending) {
        self->NRP_pending = false;
        lf_print(PRINTF_TIME ": Backup node %d new NRP request timed out. Will not function as backup.", lf_time_logical_elapsed(), self->id);
        if (self->become_primary_on_ping_response) {
          lf_print(PRINTF_TIME ": Network is likely partitioned. Remaining as (non-functional) backup.", lf_time_logical_elapsed());
          self->become_primary_on_ping_response = false;
        }
      }
    =}
  }

  mode Primary {
    timer heartbeat(0, heartbeat_period)
    reaction(reset) {= lf_print(PRINTF_TIME ": ---- Node %d becomes primary.", lf_time_logical_elapsed(), self->id); =}

    reaction(node_fails) -> reset(Failed) {=
      if(lf_time_logical_elapsed() > 0LL) lf_set_mode(Failed);
    =}

    reaction(heartbeat) -> out1, out2 {=
      lf_print(PRINTF_TIME ": Primary node %d sends heartbeat on both networks.", lf_time_logical_elapsed(), self->id);
      message_t message = {heartbeat, self->id, 0};
      lf_set(out1, message);
      lf_set(out2, message);
    =}

    reaction(in1) -> out1 {=
      if (in1->value.type == request_new_NRP) {
        // Find a new candidate NRP on network 1.
        lf_print(PRINTF_TIME ": Primary node %d looking for new NRP on network 1.", lf_time_logical_elapsed(), self->id);
        message_t message = {pingNRP, self->id, 0};
        lf_set(out1, message);
        self->NRP_pending = true;
      } else if (in1->value.type == pingNRP_response) {
        lf_print(PRINTF_TIME ": Primary node %d received ping response on network 1. NRP is %d.", lf_time_logical_elapsed(), self->id, in1->value.source);
        self->NRP_network = 1;
        self->NRP_switch_id = in1->value.source;
        if (self->NRP_pending) {
          self->NRP_pending = false;
          // Notify backup of new NRP. source field encodes the switch id.
          lf_print(PRINTF_TIME ": Primary node %d notifies backup of new NRP %d.", lf_time_logical_elapsed(), self->id, in1->value.source);
          message_t message = {new_NRP, in1->value.source, 0};
          lf_set(out1, message);
          // FIXME: Wait for confirmation of new NRP with backup.
        }
      }
    =}

    reaction(in2) -> out2 {=
      if (in2->value.type == request_new_NRP) {
        // Find a new candidate NRP on network 2.
        lf_print(PRINTF_TIME ": Primary node %d looking for new NRP on network 2.", lf_time_logical_elapsed(), self->id);
        message_t message = {pingNRP, self->id, 0};
        lf_set(out2, message);
        self->NRP_pending = true;
      } else if (in2->value.type == pingNRP_response) {
        lf_print(PRINTF_TIME ": Primary node %d received ping response on network 2. NRP is %d.", lf_time_logical_elapsed(), self->id, in2->value.source);
        self->NRP_network = 2;
        self->NRP_switch_id = in2->value.source;
        if (self->NRP_pending) {
          self->NRP_pending = false;
          // Notify backup of new NRP. source field encodes the switch id.
          lf_print(PRINTF_TIME ": Primary node %d notifies backup of new NRP %d.", lf_time_logical_elapsed(), self->id, in2->value.source);
          message_t message = {new_NRP, in2->value.source, 0};
          lf_set(out2, message);
          // FIXME: Wait for confirmation of new NRP with backup.
        }
      }
    =}
  }

  mode Failed {
    reaction(reset) {=
      lf_print(PRINTF_TIME ": #### Node %d fails.", lf_time_logical_elapsed(), self->id);
    =}
  }
}

federated reactor(heartbeat_period: time = 1 s, delay: time = 1 ms) {
  node1 = new Node(heartbeat_period=heartbeat_period, id=1)
  node2 = new Node(heartbeat_period=heartbeat_period, id=2)

  node1.out1 -> node2.in1 after delay
  node2.out1 -> node1.in1 after delay

  node2.out2 -> node1.in2 after delay
  node1.out2 -> node2.in2 after delay
}

@petervdonovan
Copy link
Contributor Author

Oops, now there are STP violations. My goal is still to get these changes ready in the next few days.

@lhstrh
Copy link
Member

lhstrh commented Aug 29, 2023

The 0.5.0 release is planned for Sep 1 and I would be happy to include it, but it we don't make it, no worries -- we'll have 0.5.1 ready for that in no time 🚀

When a federate sends a NET for a certain tag, and it has upstream
federates, we incorrectly assumed that the federate needs to receive a
TAG or PTAG in order to allow it to proceed to that NET.

This assumption is incorrect because after sending the NET, the federate
might receive a message from an upstream federate; that message might
lead to a newly lowered NET.

The bug that resulted from the incorrect assumption is that the federate
would not proceed to execute the event enabled by the message it
received after sending the NET. This is because the federate does not
realize that it has anything to do; it still thinks that it cannot do
anything until it receives a TAG or PTAG that enables it to process the
NET. Therefore, the federation deadlocks.

The fix is to compute a new next event tag whenever the event queue
changes.
The problem was that the NET obtained by get_next_event_tag might be
farther into the future than the NET that was just submitted to the RTI.
This is possible when executing the start tag because all federates send
a NET for the start tag regardless of their event queues.
@petervdonovan petervdonovan force-pushed the remove-waiting-for-tag branch from 3ba4d73 to 49483cf Compare August 31, 2023 06:26
@petervdonovan
Copy link
Contributor Author

This PR is almost ready, except that there is one failure on macOS in LoopDistributedCentralizedPhysicalAction which I am struggling to reproduce on Ubuntu (I have run the test successfully perhaps a few dozen times). The failure looks related to the changes here:

Federate 1: FATAL ERROR: get_next_event_tag(): Earliest event on the event queue (-2001) is earlier than the current time (0).

But I can't think of any reason why the event queue would ever be in this state, even temporarily. It seems like we are looking at the event queue at a time when it is temporarily in an invalid state.

@petervdonovan
Copy link
Contributor Author

On Ubuntu I ran the test about 700 times without reproducing the error. Will take another look tonight.

@lhstrh lhstrh requested a review from edwardalee September 1, 2023 05:29
@edwardalee
Copy link
Contributor

The remove-waiting-for-tag branch doesn't compile for me, so I can't check the fix. The first compile error is in federate.c:

/Users/eal/git/playground-lingua-franca/examples/C/fed-gen/NRP_FD/src-gen/federate__switch1/core/federated/federate.c:2751:36: error: use of undeclared identifier 'next_tag'
                if (lf_tag_compare(next_tag, tag) != 0) {
                                   ^

@petervdonovan
Copy link
Contributor Author

Sorry about that. The latest commit was wrong, so I reverted it. This means that I am still not sure whether this commit introduces a rare bug on macOS, or if the bug already existed.

@petervdonovan
Copy link
Contributor Author

petervdonovan commented Sep 1, 2023

I think we should just merge this as-is because I cannot reproduce the error that occurred here (it is not even reproducible in CI), and also because the logs do not include any information about what happened in federate 1 before the error -- they do not even include debug prints from _lf_physical_time, which I think would have had to have been called in order to put anything on the event queue. We know that there are undiscovered, hard-to-reproduce bugs that will make tests flake. If we have to spend a month chasing them in order to merge simple bugfixes like this one, then that can make it even harder to improve the situation.

@petervdonovan petervdonovan marked this pull request as ready for review September 2, 2023 03:17
Copy link
Contributor

@edwardalee edwardalee left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, let's merge this. It is clearly an improvement even if problems remain.

@edwardalee edwardalee merged commit 9760f23 into main Sep 2, 2023
@lhstrh lhstrh changed the title Remove the "waiting for tag" field Removal of the "waiting for tag" field Jan 23, 2024
@lhstrh lhstrh added the bugfix label Jan 23, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants