Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More comments and logging fixes. #2362

Merged
merged 1 commit into from
Oct 3, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 21 additions & 13 deletions manager/state/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,7 @@ func (n *Node) Run(ctx context.Context) error {
n.done()
}()

// Flag that indicates if this manager node is *currently* the raft leader.
wasLeader := false
transferLeadershipLimit := rate.NewLimiter(rate.Every(time.Minute), 1)

Expand All @@ -563,10 +564,13 @@ func (n *Node) Run(ctx context.Context) error {
return errors.Wrap(err, "failed to save entries to storage")
}

// If the memory store lock has been held for too long,
// transferring leadership is an easy way to break out of it.
if wasLeader &&
(rd.SoftState == nil || rd.SoftState.RaftState == raft.StateLeader) &&
n.memoryStore.Wedged() &&
transferLeadershipLimit.Allow() {
log.G(ctx).Error("Attempting to transfer leadership")
if !n.opts.DisableStackDump {
signal.DumpStacks("")
}
Expand Down Expand Up @@ -612,7 +616,7 @@ func (n *Node) Run(ctx context.Context) error {
if rd.SoftState != nil {
if wasLeader && rd.SoftState.RaftState != raft.StateLeader {
wasLeader = false
log.G(ctx).Infof("soft state changed for node %x to not longer a leader, resetting and cancelling all waits", n.opts.ID)
log.G(ctx).Error("soft state changed, node no longer a leader, resetting and cancelling all waits")

if atomic.LoadUint32(&n.signalledLeadership) == 1 {
atomic.StoreUint32(&n.signalledLeadership, 0)
Expand All @@ -632,6 +636,7 @@ func (n *Node) Run(ctx context.Context) error {
// cancelAll, or by its own check of signalledLeadership.
n.wait.cancelAll()
} else if !wasLeader && rd.SoftState.RaftState == raft.StateLeader {
// Node just became a leader.
wasLeader = true
}
}
Expand Down Expand Up @@ -1480,7 +1485,7 @@ func (n *Node) registerNode(node *api.RaftMember) error {
return nil
}

// ProposeValue calls Propose on the raft and waits
// ProposeValue calls Propose on the underlying raft library(etcd/raft) and waits
// on the commit log action before returning a result
func (n *Node) ProposeValue(ctx context.Context, storeAction []api.StoreAction, cb func()) error {
ctx, cancel := n.WithContext(ctx)
Expand Down Expand Up @@ -1656,11 +1661,14 @@ func (n *Node) saveToStorage(
return nil
}

// processInternalRaftRequest sends a message to nodes participating
// in the raft to apply a log entry and then waits for it to be applied
// on the server. It will block until the update is performed, there is
// an error or until the raft node finalizes all the proposals on node
// shutdown.
// processInternalRaftRequest proposes a value to be appended to the raft log.
// It calls Propose() on etcd/raft, which calls back into the raft FSM,
// which then sends a message to each of the participating nodes
// in the raft group to apply a log entry and then waits for it to be applied
// on this node. It will block until the this node:
// 1. Gets the necessary replies back from the participating nodes and also performs the commit itself, or
// 2. There is an error, or
// 3. Until the raft node finalizes all the proposals on node shutdown.
func (n *Node) processInternalRaftRequest(ctx context.Context, r *api.InternalRaftRequest, cb func()) (proto.Message, error) {
n.stopMu.RLock()
if !n.IsMember() {
Expand All @@ -1681,7 +1689,7 @@ func (n *Node) processInternalRaftRequest(ctx context.Context, r *api.InternalRa

// Do this check after calling register to avoid a race.
if atomic.LoadUint32(&n.signalledLeadership) != 1 {
log.G(ctx).Errorf("node %x is no longer leader, aborting propose", n.opts.ID)
log.G(ctx).Error("node is no longer leader, aborting propose")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@anshulpundir why remove node IDs from these log messages?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

node id is a field already added as a field to the contest logger https://github.com/docker/swarmkit/blob/master/node/node.go#L286 @nishanttotla

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, okay got it.

n.wait.cancel(r.ID)
return nil, ErrLostLeadership
}
Expand All @@ -1707,9 +1715,9 @@ func (n *Node) processInternalRaftRequest(ctx context.Context, r *api.InternalRa
case x, ok := <-ch:
if !ok {
// Wait notification channel was closed. This should only happen if the wait was cancelled.
log.G(ctx).Errorf("wait cancelled, likely because node %x lost leader position", n.opts.ID)
log.G(ctx).Error("wait cancelled")
if atomic.LoadUint32(&n.signalledLeadership) == 1 {
log.G(ctx).Errorf("wait cancelled but node %x is still a leader.", n.opts.ID)
log.G(ctx).Error("wait cancelled but node is still a leader")
}
return nil, ErrLostLeadership
}
Expand All @@ -1719,9 +1727,9 @@ func (n *Node) processInternalRaftRequest(ctx context.Context, r *api.InternalRa
// If we can read from the channel, wait item was triggered. Otherwise it was cancelled.
x, ok := <-ch
if !ok {
log.G(ctx).WithError(waitCtx.Err()).Errorf("wait context cancelled, likeyly because node %x lost leader position", n.opts.ID)
log.G(ctx).WithError(waitCtx.Err()).Error("wait context cancelled")
if atomic.LoadUint32(&n.signalledLeadership) == 1 {
log.G(ctx).Errorf("wait context cancelled but node %x is still a leader", n.opts.ID)
log.G(ctx).Error("wait context cancelled but node is still a leader")
}
return nil, ErrLostLeadership
}
Expand Down Expand Up @@ -1791,7 +1799,7 @@ func (n *Node) processEntry(ctx context.Context, entry raftpb.Entry) error {
}

if !n.wait.trigger(r.ID, r) {
log.G(ctx).Errorf("wait not found for raft id %x", r.ID)
log.G(ctx).Errorf("wait not found for raft request id %x", r.ID)

// There was no wait on this ID, meaning we don't have a
// transaction in progress that would be committed to the
Expand Down
3 changes: 1 addition & 2 deletions manager/state/store/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,7 @@ func register(os ObjectStoreConfig) {
schema.Tables[os.Table.Name] = os.Table
}

// timedMutex wraps a sync.Mutex, and keeps track of how long it has been
// locked.
// timedMutex wraps a sync.Mutex, and keeps track of when it was locked.
type timedMutex struct {
sync.Mutex
lockedAt atomic.Value
Expand Down