Skip to content

Commit

Permalink
More comments and logging fixes.
Browse files Browse the repository at this point in the history
Signed-off-by: Anshul Pundir <[email protected]>
  • Loading branch information
anshulpundir committed Oct 2, 2017
1 parent 144ddc5 commit fd751f5
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 15 deletions.
39 changes: 26 additions & 13 deletions manager/state/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,7 @@ func (n *Node) Run(ctx context.Context) error {
n.done()
}()

// Flag that indicates if this manager node is *currently* the raft leader.
wasLeader := false
transferLeadershipLimit := rate.NewLimiter(rate.Every(time.Minute), 1)

Expand All @@ -563,10 +564,17 @@ func (n *Node) Run(ctx context.Context) error {
return errors.Wrap(err, "failed to save entries to storage")
}

// If there is a deadlock, initiate leadership transfer to break out of it.
// 1. This node was the leader in the last iteration of the FSM, and
// 2. The in-memory state indicates this node was the leader in the
// last FSM iteration.
// 3. Memory store is in a deadlock, possibly also pointing to the fact that
// the update lock was acquired by this node when it was still the leader.
if wasLeader &&
(rd.SoftState == nil || rd.SoftState.RaftState == raft.StateLeader) &&
n.memoryStore.Wedged() &&
transferLeadershipLimit.Allow() {
// log.G(ctx).Errorf("Memory store wedged, attempting to transfer leadership from node %x", )
if !n.opts.DisableStackDump {
signal.DumpStacks("")
}
Expand Down Expand Up @@ -612,7 +620,7 @@ func (n *Node) Run(ctx context.Context) error {
if rd.SoftState != nil {
if wasLeader && rd.SoftState.RaftState != raft.StateLeader {
wasLeader = false
log.G(ctx).Infof("soft state changed for node %x to not longer a leader, resetting and cancelling all waits", n.opts.ID)
log.G(ctx).Error("soft state changed, node no longer a leader, resetting and cancelling all waits")

if atomic.LoadUint32(&n.signalledLeadership) == 1 {
atomic.StoreUint32(&n.signalledLeadership, 0)
Expand All @@ -632,6 +640,8 @@ func (n *Node) Run(ctx context.Context) error {
// cancelAll, or by its own check of signalledLeadership.
n.wait.cancelAll()
} else if !wasLeader && rd.SoftState.RaftState == raft.StateLeader {
// If this node was not the leader, but
// the raft soft state changed recently to be a leader.
wasLeader = true
}
}
Expand Down Expand Up @@ -1480,7 +1490,7 @@ func (n *Node) registerNode(node *api.RaftMember) error {
return nil
}

// ProposeValue calls Propose on the raft and waits
// ProposeValue calls Propose on the underlying raft library(etcd/raft) and waits
// on the commit log action before returning a result
func (n *Node) ProposeValue(ctx context.Context, storeAction []api.StoreAction, cb func()) error {
ctx, cancel := n.WithContext(ctx)
Expand Down Expand Up @@ -1656,11 +1666,14 @@ func (n *Node) saveToStorage(
return nil
}

// processInternalRaftRequest sends a message to nodes participating
// in the raft to apply a log entry and then waits for it to be applied
// on the server. It will block until the update is performed, there is
// an error or until the raft node finalizes all the proposals on node
// shutdown.
// processInternalRaftRequest proposes a value to be appended to the raft log.
// It calls Propose() on etcd/raft, which calls back into the raft FSM,
// which then sends a message to each of the participating nodes
// in the raft group to apply a log entry and then waits for it to be applied
// on this node. It will block until the this node:
// 1. Gets the necessary replies back from the participating nodes and also performs the commit itself, or
// 2. There is an error, or
// 3. Until the raft node finalizes all the proposals on node shutdown.
func (n *Node) processInternalRaftRequest(ctx context.Context, r *api.InternalRaftRequest, cb func()) (proto.Message, error) {
n.stopMu.RLock()
if !n.IsMember() {
Expand All @@ -1681,7 +1694,7 @@ func (n *Node) processInternalRaftRequest(ctx context.Context, r *api.InternalRa

// Do this check after calling register to avoid a race.
if atomic.LoadUint32(&n.signalledLeadership) != 1 {
log.G(ctx).Errorf("node %x is no longer leader, aborting propose", n.opts.ID)
log.G(ctx).Error("node is no longer leader, aborting propose")
n.wait.cancel(r.ID)
return nil, ErrLostLeadership
}
Expand All @@ -1707,9 +1720,9 @@ func (n *Node) processInternalRaftRequest(ctx context.Context, r *api.InternalRa
case x, ok := <-ch:
if !ok {
// Wait notification channel was closed. This should only happen if the wait was cancelled.
log.G(ctx).Errorf("wait cancelled, likely because node %x lost leader position", n.opts.ID)
log.G(ctx).Error("wait cancelled")
if atomic.LoadUint32(&n.signalledLeadership) == 1 {
log.G(ctx).Errorf("wait cancelled but node %x is still a leader.", n.opts.ID)
log.G(ctx).Error("wait cancelled but node is still a leader")
}
return nil, ErrLostLeadership
}
Expand All @@ -1719,9 +1732,9 @@ func (n *Node) processInternalRaftRequest(ctx context.Context, r *api.InternalRa
// If we can read from the channel, wait item was triggered. Otherwise it was cancelled.
x, ok := <-ch
if !ok {
log.G(ctx).WithError(waitCtx.Err()).Errorf("wait context cancelled, likeyly because node %x lost leader position", n.opts.ID)
log.G(ctx).WithError(waitCtx.Err()).Error("wait context cancelled")
if atomic.LoadUint32(&n.signalledLeadership) == 1 {
log.G(ctx).Errorf("wait context cancelled but node %x is still a leader", n.opts.ID)
log.G(ctx).Error("wait context cancelled but node is still a leader")
}
return nil, ErrLostLeadership
}
Expand Down Expand Up @@ -1791,7 +1804,7 @@ func (n *Node) processEntry(ctx context.Context, entry raftpb.Entry) error {
}

if !n.wait.trigger(r.ID, r) {
log.G(ctx).Errorf("wait not found for raft id %x", r.ID)
log.G(ctx).Errorf("wait not found for raft request id %x", r.ID)

// There was no wait on this ID, meaning we don't have a
// transaction in progress that would be committed to the
Expand Down
3 changes: 1 addition & 2 deletions manager/state/store/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,7 @@ func register(os ObjectStoreConfig) {
schema.Tables[os.Table.Name] = os.Table
}

// timedMutex wraps a sync.Mutex, and keeps track of how long it has been
// locked.
// timedMutex wraps a sync.Mutex, and keeps track of when it was locked.
type timedMutex struct {
sync.Mutex
lockedAt atomic.Value
Expand Down

0 comments on commit fd751f5

Please sign in to comment.