From d18f4bd082601c62ad53b3f6f4a28d20d7695eb4 Mon Sep 17 00:00:00 2001 From: Matheus Camargo Date: Wed, 2 Nov 2016 23:04:25 -0200 Subject: [PATCH] Solution --- raft/raft.go | 147 +++++++++++++++++++++++++++++++++++---------------- 1 file changed, 102 insertions(+), 45 deletions(-) diff --git a/raft/raft.go b/raft/raft.go index 48893be..a1cfae6 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -107,33 +107,46 @@ func (raft *Raft) followerSelect() { return case rv := <-raft.requestVoteChan: - /////////////////// - // MODIFY HERE // + // If term is outdated, update it + if (rv.Term > raft.currentTerm) { + log.Printf("[FOLLOWER] Current term '%v' outdated. Adjusting it to '%v'.", raft.currentTerm, rv.Term) + raft.currentTerm = rv.Term + raft.votedFor = 0 + } reply := &RequestVoteReply{ Term: raft.currentTerm, } - log.Printf("[FOLLOWER] Vote denied to '%v' for term '%v'.\n", raft.peers[rv.CandidateID], raft.currentTerm) - - reply.VoteGranted = false + reply.VoteGranted = raft.currentTerm <= rv.Term + if (reply.VoteGranted) { + log.Printf("[FOLLOWER] Vote granted to '%v' for term '%v'.\n", raft.peers[rv.CandidateID], raft.currentTerm) + } else { + log.Printf("[FOLLOWER] Vote denied to '%v' for term '%v'.\n", raft.peers[rv.CandidateID], raft.currentTerm) + } + raft.resetElectionTimeout() rv.replyChan <- reply break - // END OF MODIFY // - /////////////////// case ae := <-raft.appendEntryChan: - /////////////////// - // MODIFY HERE // + // If term is outdated, update it + if (ae.Term > raft.currentTerm) { + log.Printf("[FOLLOWER] Current term '%v' outdated. Adjusting it to '%v'.", raft.currentTerm, ae.Term) + raft.currentTerm = ae.Term + raft.votedFor = 0 + } reply := &AppendEntryReply{ Term: raft.currentTerm, } - log.Printf("[FOLLOWER] Accept AppendEntry from '%v'.\n", raft.peers[ae.LeaderID]) - reply.Success = true + reply.Success = raft.currentTerm <= ae.Term + if (reply.Success) { + log.Printf("[FOLLOWER] Accept AppendEntry from '%v'.\n", raft.peers[ae.LeaderID]) + } else { + log.Printf("[FOLLOWER] Rejected AppendEntry from '%v'.\n", raft.peers[ae.LeaderID]) + } + raft.resetElectionTimeout() ae.replyChan <- reply break - // END OF MODIFY // - /////////////////// } } } @@ -162,38 +175,69 @@ func (raft *Raft) candidateSelect() { log.Println("[CANDIDATE] Election timeout.") raft.currentState.Set(candidate) return - case rvr := <-replyChan: - /////////////////// - // MODIFY HERE // + case rvr := <-replyChan: + // If term is outdated, step down + if (rvr.Term > raft.currentTerm) { + log.Println("[CANDIDATE] Term outdated. Stepping down.") + raft.currentTerm = rvr.Term + raft.votedFor = 0 + raft.currentState.Set(follower) + replyChan <- rvr + return + } if rvr.VoteGranted { log.Printf("[CANDIDATE] Vote granted by '%v'.\n", raft.peers[rvr.peerIndex]) voteCount++ log.Println("[CANDIDATE] VoteCount: ", voteCount) + // Candidate received majority of votes. Become leader. + if (voteCount > len(raft.peers) / 2) { + log.Println("[CANDIDATE] Becoming leader.") + raft.currentState.Set(leader) + return + } break } log.Printf("[CANDIDATE] Vote denied by '%v'.\n", raft.peers[rvr.peerIndex]) - // END OF MODIFY // - /////////////////// - case rv := <-raft.requestVoteChan: - /////////////////// - // MODIFY HERE // + // If term is outdated, step down + if (rv.Term > raft.currentTerm) { + log.Println("[CANDIDATE] Term outdated. Stepping down.") + raft.currentTerm = rv.Term + raft.votedFor = 0 + raft.currentState.Set(follower) + raft.requestVoteChan <- rv + return + } reply := &RequestVoteReply{ Term: raft.currentTerm, } - - log.Printf("[CANDIDATE] Vote denied to '%v' for term '%v'.\n", raft.peers[rv.CandidateID], raft.currentTerm) - reply.VoteGranted = false + reply.VoteGranted = rv.Term == raft.currentTerm + if (reply.VoteGranted) { + log.Printf("[CANDIDATE] Vote granted to '%v' for term '%v'.\n", raft.peers[rv.CandidateID], raft.currentTerm) + } else { + log.Printf("[CANDIDATE] Vote denied to '%v' for term '%v'.\n", raft.peers[rv.CandidateID], raft.currentTerm) + } rv.replyChan <- reply + raft.resetElectionTimeout() break - // END OF MODIFY // - /////////////////// case ae := <-raft.appendEntryChan: - /////////////////// - // MODIFY HERE // + // Ignore smaller append entry + if (ae.Term < raft.currentTerm) { + log.Printf("[CANDIDATE] Rejecting AppendEntry from '%v'.\n", raft.peers[ae.LeaderID]) + break + } + // If term is outdated, step down + if (ae.Term > raft.currentTerm) { + log.Println("[CANDIDATE] Term outdated. Stepping down.") + raft.currentTerm = ae.Term + raft.votedFor = 0 + raft.currentState.Set(follower) + raft.appendEntryChan <- ae + return + } reply := &AppendEntryReply{ Term: raft.currentTerm, } @@ -201,9 +245,8 @@ func (raft *Raft) candidateSelect() { log.Printf("[CANDIDATE] Accept AppendEntry from '%v'.\n", raft.peers[ae.LeaderID]) reply.Success = true ae.replyChan <- reply - break - // END OF MODIFY // - /////////////////// + raft.currentState.Set(follower) + return } } } @@ -232,14 +275,26 @@ func (raft *Raft) leaderSelect() { case <-broadcastTick: raft.broadcastAppendEntries(replyChan) case aet := <-replyChan: - /////////////////// - // MODIFY HERE // - _ = aet - // END OF MODIFY // - /////////////////// + // If term is outdated, step down + if (aet.Term > raft.currentTerm) { + log.Println("[LEADER] Term outdated. Stepping down.") + raft.currentTerm = aet.Term + raft.votedFor = 0 + raft.currentState.Set(follower) + replyChan <- aet + return + } + case rv := <-raft.requestVoteChan: - /////////////////// - // MODIFY HERE // + // If term is outdated, step down + if (rv.Term > raft.currentTerm) { + log.Println("[LEADER] Term outdated. Stepping down.") + raft.currentTerm = rv.Term + raft.votedFor = 0 + raft.currentState.Set(follower) + raft.requestVoteChan <- rv + return + } reply := &RequestVoteReply{ Term: raft.currentTerm, @@ -250,12 +305,16 @@ func (raft *Raft) leaderSelect() { rv.replyChan <- reply break - // END OF MODIFY // - /////////////////// - case ae := <-raft.appendEntryChan: - /////////////////// - // MODIFY HERE // + // If term is outdated, step down + if (ae.Term > raft.currentTerm) { + log.Println("[LEADER] Term outdated. Stepping down.") + raft.currentTerm = ae.Term + raft.votedFor = 0 + raft.currentState.Set(follower) + raft.appendEntryChan <- ae + return + } reply := &AppendEntryReply{ Term: raft.currentTerm, } @@ -264,8 +323,6 @@ func (raft *Raft) leaderSelect() { reply.Success = true ae.replyChan <- reply break - // END OF MODIFY // - /////////////////// } } }