Skip to content

Commit

Permalink
reconciler: support disconnected clients (#12058)
Browse files Browse the repository at this point in the history
* Add merge helper for string maps
* structs: add statuses, MaxClientDisconnect, and helper funcs
* taintedNodes: Include disconnected nodes
* upsertAllocsImpl: don't use existing ClientStatus when upserting unknown
* allocSet: update filterByTainted and add delayByMaxClientDisconnect
* allocReconciler: support disconnecting and reconnecting allocs
* GenericScheduler: upsert unknown and queue reconnecting

Co-authored-by: Tim Gross <[email protected]>
  • Loading branch information
2 people authored and DerekStrickland committed Apr 5, 2022
1 parent 6791147 commit e7707bf
Show file tree
Hide file tree
Showing 4 changed files with 5 additions and 84 deletions.
2 changes: 2 additions & 0 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -9923,8 +9923,10 @@ func (a *Allocation) DisconnectTimeout(now time.Time) time.Time {

tg := a.Job.LookupTaskGroup(a.TaskGroup)

// Prefer the duration from the task group.
timeout := tg.MaxClientDisconnect

// If not configured, return now
if timeout == nil {
return now
}
Expand Down
1 change: 1 addition & 0 deletions scheduler/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,7 @@ func (a *allocReconciler) computeGroup(groupName string, all allocSet) bool {
// Validate and add reconnecting allocs to the plan so that they will be logged.
a.computeReconnecting(reconnecting)
desiredChanges.Ignore += uint64(len(a.result.reconnectUpdates))

// Do inplace upgrades where possible and capture the set of upgrades that
// need to be done destructively.
ignore, inplace, destructive := a.computeUpdates(tg, untainted)
Expand Down
85 changes: 1 addition & 84 deletions scheduler/reconcile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,8 @@ type resultExpectation struct {
attributeUpdates int
disconnectUpdates int
reconnectUpdates int
desiredTGUpdates map[string]*structs.DesiredUpdates
stop int
desiredTGUpdates map[string]*structs.DesiredUpdates
}

func assertResults(t *testing.T, r *reconcileResults, exp *resultExpectation) {
Expand Down Expand Up @@ -5615,86 +5615,3 @@ func TestReconciler_Disconnected_Client(t *testing.T) {
})
}
}

// Tests that the future timeout evals that get created when a node disconnects
// stop once the duration passes.
func TestReconciler_Disconnected_Node_FollowUpEvals_Stop_After_Timeout(t *testing.T) {
// TODO: Add table tests and play with the reconciler time/node status to make sure that
// if the expiration time has not passed, it's a no-op.

// Build a set of resumable allocations. Helper will set the timeout to 5 min.
job, allocs := buildResumableAllocations(3, structs.AllocClientStatusRunning, structs.AllocDesiredStatusRun, 2)

// Build a map of disconnected nodes. Only disconnect 2 of the nodes to make it a little
// more discernible that only the affected alloc(s) get marked unknown.
nodes := buildDisconnectedNodes(allocs, 2)

// Invoke the reconciler to queue status changes and get the followup evals.
// Use the allocUpdateFnIngore since alloc.TerminalStatus() will evaluate to
// false and cause the real genericAllocUpdateFn to return ignore=true destructive=false
reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job,
nil, allocs, nodes, "", 50, true)
reconciler.now = time.Now().UTC()
results := reconciler.Compute()

// Verify that 1 follow up eval was created.
evals := results.desiredFollowupEvals[job.TaskGroups[0].Name]
require.Len(t, evals, 1)
eval := evals[0]

// Set the NodeStatus to Down on the 2 disconnected nodes to simulate that
// the resume duration has passed.
for _, node := range nodes {
node.Status = structs.NodeStatusDown
}

// Replace the allocs that were originally created with the updated copies that
// have the unknown ClientStatus.
for i, alloc := range allocs {
for id, updated := range results.disconnectUpdates {
if alloc.ID == id {
allocs[i] = updated
}
}
}

// Run the followup eval through the reconciler and verify the resumable allocs
// have timed out, will be stopped, and new placements are scheduled.
reconciler = NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job,
nil, allocs, nodes, eval.ID, eval.Priority, true)

// Allocs were configured to expire in 5 min, so configure the reconciler
// to believe that time has passed.
// NOTE: this probably isn't really necessary because this value is really
// only used for computing future evals, but it seemed like good practice
// in case there are other unconsidered side effects.
reconciler.now = time.Now().UTC().Add(6 * time.Minute)
results = reconciler.Compute()

// Validate that the queued stops have the right client status.
for _, stopResult := range results.stop {
require.Equal(t, structs.AllocClientStatusLost, stopResult.clientStatus)
}

// 2 to place, 2 to stop, 1 to ignore
assertResults(t, results, &resultExpectation{
createDeployment: nil,
deploymentUpdates: nil,
place: 2,
destructive: 0,
stop: 2,
inplace: 0,
disconnectUpdates: 0,
reconnectUpdates: 0,

desiredTGUpdates: map[string]*structs.DesiredUpdates{
job.TaskGroups[0].Name: {
Place: 2,
Stop: 2,
DestructiveUpdate: 0,
Ignore: 1,
InPlaceUpdate: 0,
},
},
})
}
1 change: 1 addition & 0 deletions scheduler/reconcile_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@ func (a allocSet) filterByTainted(taintedNodes map[string]*structs.Node, support

// All other allocs are untainted
untainted[alloc.ID] = alloc

}

return
Expand Down

0 comments on commit e7707bf

Please sign in to comment.