Skip to content

Commit

Permalink
Add more comments #2360.
Browse files Browse the repository at this point in the history
Signed-off-by: Anshul Pundir <[email protected]>
  • Loading branch information
anshulpundir committed Aug 29, 2017
1 parent 0554c9b commit bddf91e
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 8 deletions.
20 changes: 14 additions & 6 deletions manager/state/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,9 @@ func (n *Node) Run(ctx context.Context) error {
n.done()
}()

// Flag to indicate if this manager node was the raft leader in the previous iteration of the loop.
// It is mainly used to cancel out proposals that this node might have made when it was the leader
// in the previous iteration. It not only needed to avoid deadlocking processCommitted().
wasLeader := false
transferLeadershipLimit := rate.NewLimiter(rate.Every(time.Minute), 1)

Expand Down Expand Up @@ -630,6 +633,8 @@ func (n *Node) Run(ctx context.Context) error {
// cancelAll, or by its own check of signalledLeadership.
n.wait.cancelAll()
} else if !wasLeader && rd.SoftState.RaftState == raft.StateLeader {
// If this node was the leader in the previous iteration,
// set wasLeader to make sure all outstanding proposals are cancelled.
wasLeader = true
}
}
Expand Down Expand Up @@ -1481,7 +1486,7 @@ func (n *Node) registerNode(node *api.RaftMember) error {
return nil
}

// ProposeValue calls Propose on the raft and waits
// ProposeValue calls Propose on the underlying raft library(etcd/raft) and waits
// on the commit log action before returning a result
func (n *Node) ProposeValue(ctx context.Context, storeAction []api.StoreAction, cb func()) error {
ctx, cancel := n.WithContext(ctx)
Expand Down Expand Up @@ -1657,11 +1662,14 @@ func (n *Node) saveToStorage(
return nil
}

// processInternalRaftRequest sends a message to nodes participating
// in the raft to apply a log entry and then waits for it to be applied
// on the server. It will block until the update is performed, there is
// an error or until the raft node finalizes all the proposals on node
// shutdown.
// processInternalRaftRequest proposes a value to be appended to the raft log.
// It calls Propose() on etcd/raft, which calls back into the raft FSM,
// which then sends a message to each of the participating nodes
// in the raft groupp to apply a log entry and then waits for it to be applied
// on this node. It will block until the this node:
// 1. Gets the necessary replies back from the participating nodes and also performs the commit itself, or
// 2. There is an error, or
// 3. Until the raft node finalizes all the proposals on node shutdown.
func (n *Node) processInternalRaftRequest(ctx context.Context, r *api.InternalRaftRequest, cb func()) (proto.Message, error) {
n.stopMu.RLock()
if !n.IsMember() {
Expand Down
3 changes: 1 addition & 2 deletions manager/state/store/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,7 @@ func register(os ObjectStoreConfig) {
schema.Tables[os.Table.Name] = os.Table
}

// timedMutex wraps a sync.Mutex, and keeps track of how long it has been
// locked.
// timedMutex wraps a sync.Mutex, and keeps track of when it was locked.
type timedMutex struct {
sync.Mutex
lockedAt atomic.Value
Expand Down

0 comments on commit bddf91e

Please sign in to comment.