-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcandidate.go
81 lines (71 loc) · 2.16 KB
/
candidate.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
package raft
import (
"fmt"
"github.com/Mathew-Estafanous/raft/pb"
)
func (r *Raft) runCandidateState() {
r.electionTimer.Reset(r.randElectTime())
r.setStableStore(keyCurrentTerm, r.fromStableStore(keyCurrentTerm)+1)
r.logger.Printf("Candidate started election for term %v.", r.fromStableStore(keyCurrentTerm))
// Run election for candidate by sending request votes to other nodes.
r.sendVoteRequests()
for r.getState() == Candidate {
select {
case <-r.electionTimer.C:
r.logger.Printf("Election has failed for term %d", r.fromStableStore(keyCurrentTerm))
return
case v := <-r.voteCh:
if v.error != nil {
r.logger.Printf("A vote request has failed: %v", v.error)
break
}
vote := v.resp.(*pb.VoteResponse)
r.handleVoteResponse(vote)
case t := <-r.applyCh:
t.respond(fmt.Errorf("this node is not a leader"))
case <-r.shutdownCh:
return
}
}
}
// sendVoteRequests will initialize and send the vote requests to other nodes
// in the Cluster and return results in a vote channel.
func (r *Raft) sendVoteRequests() {
r.voteCh = make(chan rpcResp, len(r.cluster.AllNodes()))
r.votesNeeded = r.cluster.Quorum() - 1
r.setStableStore(keyVotedFor, r.id)
req := &pb.VoteRequest{
Term: r.fromStableStore(keyCurrentTerm),
CandidateId: r.id,
LastLogIndex: r.log.LastIndex(),
LastLogTerm: r.log.LastTerm(),
}
for _, v := range r.cluster.AllNodes() {
if v.ID == r.id {
continue
}
// Make RPC request in a separate goroutine to prevent blocking operations.
go func(n Node) {
res := r.sendRPC(req, n)
r.voteCh <- res
}(v)
}
}
func (r *Raft) handleVoteResponse(vote *pb.VoteResponse) {
// If term of peer is greater than go back to follower
// and update current term to the peer's term.
if vote.Term > r.fromStableStore(keyCurrentTerm) {
r.logger.Println("Demoting since peer's term is greater than current term")
r.setStableStore(keyCurrentTerm, vote.Term)
r.setState(Follower)
return
}
if vote.VoteGranted {
r.votesNeeded--
// Check if the total votes needed has been reached. If so
// then election has passed and candidate is now the leader.
if r.votesNeeded == 0 {
r.setState(Leader)
}
}
}