Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Respect alloc job version for lost/failed allocs #8691

Merged
merged 5 commits into from
Aug 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ IMPROVEMENTS:
* api: Added node purge SDK functionality. [[GH-8142](https://github.com/hashicorp/nomad/issues/8142)]
* driver/docker: Allow configurable image pull context timeout setting. [[GH-5718](https://github.com/hashicorp/nomad/issues/5718)]

BUG FIXES:

* core: Fixed a bug where unpromoted job versions are used when rescheduling failed allocations [[GH-8691](https://github.com/hashicorp/nomad/issues/8691)]

## 0.12.3 (August 13, 2020)

BUG FIXES:
Expand Down
8 changes: 5 additions & 3 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -9830,12 +9830,14 @@ func (p *Plan) PopUpdate(alloc *Allocation) {
}
}

func (p *Plan) AppendAlloc(alloc *Allocation) {
// AppendAlloc appends the alloc to the plan allocations.
// Uses the passed job if explicitly passed, otherwise
// it is assumed the alloc will use the plan Job version.
func (p *Plan) AppendAlloc(alloc *Allocation, job *Job) {
node := alloc.NodeID
existing := p.NodeAllocation[node]

// Normalize the job
alloc.Job = nil
alloc.Job = job

p.NodeAllocation[node] = append(existing, alloc)
}
Expand Down
66 changes: 63 additions & 3 deletions scheduler/generic_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package scheduler

import (
"fmt"
"sort"
"time"

log "github.com/hashicorp/go-hclog"
Expand Down Expand Up @@ -387,12 +388,12 @@ func (s *GenericScheduler) computeJobAllocs() error {
update.DeploymentID = s.deployment.GetID()
update.DeploymentStatus = nil
}
s.ctx.Plan().AppendAlloc(update)
s.ctx.Plan().AppendAlloc(update, nil)
}

// Handle the annotation updates
for _, update := range results.attributeUpdates {
s.ctx.Plan().AppendAlloc(update)
s.ctx.Plan().AppendAlloc(update, nil)
}

// Nothing remaining to do if placement is not required
Expand Down Expand Up @@ -429,6 +430,32 @@ func (s *GenericScheduler) computeJobAllocs() error {
return s.computePlacements(destructive, place)
}

// downgradedJobForPlacement returns the job appropriate for non-canary placement replacement
func (s *GenericScheduler) downgradedJobForPlacement(p placementResult) (string, *structs.Job, error) {
ns, jobID := s.job.Namespace, s.job.ID
tgName := p.TaskGroup().Name

// find deployments and use the latest promoted or canaried version
deployments, err := s.state.DeploymentsByJobID(nil, ns, jobID, false)
if err != nil {
return "", nil, fmt.Errorf("failed to lookup job deployments: %v", err)
}
sort.Slice(deployments, func(i, j int) bool { return deployments[i].JobVersion > deployments[i].JobVersion })
for _, d := range deployments {
// It's unexpected to have a recent deployment that doesn't contain the TaskGroup; as all allocations
// should be destroyed. In such cases, attempt to find the deployment for that TaskGroup and hopefully
// we will kill it soon. This is a defensive measure, have not seen it in practice
//
// Zero dstate.DesiredCanaries indicates that the TaskGroup allocates were updated in-place without using canaries.
if dstate := d.TaskGroups[tgName]; dstate != nil && (dstate.Promoted || dstate.DesiredCanaries == 0) {
job, err := s.state.JobByIDAndVersion(nil, ns, jobID, d.JobVersion)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we first compare d.JobVersion against s.job.Version and if they're equal: return nil since they're equivalent?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's reasonable but also seems like a micro-optimization - I may consider it when addressing reviews.

return d.ID, job, err
}
}

return "", nil, nil
}

// computePlacements computes placements for allocations. It is given the set of
// destructive updates to place and the set of new placements to place.
func (s *GenericScheduler) computePlacements(destructive, place []placementResult) error {
Expand Down Expand Up @@ -457,12 +484,40 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul
// Get the task group
tg := missing.TaskGroup()

var downgradedJob *structs.Job

if missing.DowngradeNonCanary() {
jobDeploymentID, job, err := s.downgradedJobForPlacement(missing)
if err != nil {
return err
}

// Defensive check - if there is no appropriate deployment for this job, use the latest
if job != nil && job.Version >= missing.MinJobVersion() && job.LookupTaskGroup(tg.Name) != nil {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made few defensive checks here, where if we see unexpected state (e.g. jobs without expected TaskGroup, no non-promoted version), we'd fallback to using the latest version. This seems better than a panic, but not sure if we should simplify this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this unexpected? for jobs without update stanza, there won't be deployments, so that downgradedJobForPlacement will return null. (in that case, latest job is exactly what we want.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's unexpected. missing.DowngradeNonCanary() should be always false.

tg = job.LookupTaskGroup(tg.Name)
downgradedJob = job
deploymentID = jobDeploymentID
} else {
jobVersion := -1
if job != nil {
jobVersion = int(job.Version)
}
s.logger.Debug("failed to find appropriate job; using the latest", "expected_version", missing.MinJobVersion, "found_version", jobVersion)
}
}

// Check if this task group has already failed
if metric, ok := s.failedTGAllocs[tg.Name]; ok {
metric.CoalescedFailures += 1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to restore the stack's original Job here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It needs to happen below, after a placement is made - particularly after s.selectNextOption is called. Will update the comment.

continue
}

// Use downgraded job in scheduling stack to honor
// old job resources and constraints
if downgradedJob != nil {
s.stack.SetJob(downgradedJob)
}

// Find the preferred node
preferredNode, err := s.findPreferredNode(missing)
if err != nil {
Expand All @@ -489,6 +544,11 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul
// Compute top K scoring node metadata
s.ctx.Metrics().PopulateScoreMetaData()

// Restore stack job now that placement is done, to use plan job version
if downgradedJob != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if downgradedJob != nil {
if *s.stack.jobVersion != s.job.Version {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe this is better?

s.stack.SetJob(s.job)
}

// Set fields based on if we found an allocation option
if option != nil {
resources := &structs.AllocatedResources{
Expand Down Expand Up @@ -547,7 +607,7 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul
s.handlePreemptions(option, alloc, missing)

// Track the placement
s.plan.AppendAlloc(alloc)
s.plan.AppendAlloc(alloc, downgradedJob)

} else {
// Lazy initialize the failed map
Expand Down
159 changes: 159 additions & 0 deletions scheduler/generic_sched_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5342,3 +5342,162 @@ func TestServiceSched_Preemption(t *testing.T) {
}
require.Equal(expectedPreemptedAllocs, actualPreemptedAllocs)
}

// TestServiceSched_Migrate_CanaryStatus asserts that migrations/rescheduling
// of allocations use the proper versions of allocs rather than latest:
// Canaries should be replaced by canaries, and non-canaries should be replaced
// with the latest promoted version.
func TestServiceSched_Migrate_CanaryStatus(t *testing.T) {
h := NewHarness(t)

node1 := mock.Node()
require.NoError(t, h.State.UpsertNode(h.NextIndex(), node1))

totalCount := 3
desiredCanaries := 1

job := mock.Job()
job.Stable = true
job.TaskGroups[0].Count = totalCount
job.TaskGroups[0].Update = &structs.UpdateStrategy{
MaxParallel: 1,
Canary: desiredCanaries,
}
require.NoError(t, h.State.UpsertJob(h.NextIndex(), job))

deployment := &structs.Deployment{
ID: uuid.Generate(),
JobID: job.ID,
Namespace: job.Namespace,
JobVersion: job.Version,
JobModifyIndex: job.JobModifyIndex,
JobCreateIndex: job.CreateIndex,
TaskGroups: map[string]*structs.DeploymentState{
"web": {DesiredTotal: totalCount},
},
Status: structs.DeploymentStatusSuccessful,
StatusDescription: structs.DeploymentStatusDescriptionSuccessful,
}
require.NoError(t, h.State.UpsertDeployment(h.NextIndex(), deployment))

var allocs []*structs.Allocation
for i := 0; i < 3; i++ {
alloc := mock.Alloc()
alloc.Job = job
alloc.JobID = job.ID
alloc.NodeID = node1.ID
alloc.DeploymentID = deployment.ID
alloc.Name = fmt.Sprintf("my-job.web[%d]", i)
allocs = append(allocs, alloc)
}
require.NoError(t, h.State.UpsertAllocs(h.NextIndex(), allocs))

// new update with new task group
job2 := job.Copy()
job2.Stable = false
job2.TaskGroups[0].Tasks[0].Config["command"] = "/bin/other"
require.NoError(t, h.State.UpsertJob(h.NextIndex(), job2))

// Create a mock evaluation
eval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: 50,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job.ID,
Status: structs.EvalStatusPending,
}
require.NoError(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval}))

// Process the evaluation
err := h.Process(NewServiceScheduler, eval)
require.NoError(t, err)

// Ensure a single plan
require.Len(t, h.Plans, 1)
plan := h.Plans[0]

// Ensure a deployment was created
require.NotNil(t, plan.Deployment)
updateDeployment := plan.Deployment.ID

// Check status first - should be 4 allocs, only one is canary
{
ws := memdb.NewWatchSet()
allocs, err := h.State.AllocsByJob(ws, job.Namespace, job.ID, true)
require.NoError(t, err)
require.Len(t, allocs, 4)

sort.Slice(allocs, func(i, j int) bool { return allocs[i].CreateIndex < allocs[j].CreateIndex })

for _, a := range allocs[:3] {
require.Equal(t, structs.AllocDesiredStatusRun, a.DesiredStatus)
require.Equal(t, uint64(0), a.Job.Version)
require.False(t, a.DeploymentStatus.IsCanary())
require.Equal(t, node1.ID, a.NodeID)
require.Equal(t, deployment.ID, a.DeploymentID)
}
require.Equal(t, structs.AllocDesiredStatusRun, allocs[3].DesiredStatus)
require.Equal(t, uint64(1), allocs[3].Job.Version)
require.True(t, allocs[3].DeploymentStatus.Canary)
require.Equal(t, node1.ID, allocs[3].NodeID)
require.Equal(t, updateDeployment, allocs[3].DeploymentID)
}

// now, drain node1 and ensure all are migrated to node2
node1 = node1.Copy()
node1.Status = structs.NodeStatusDown
require.NoError(t, h.State.UpsertNode(h.NextIndex(), node1))

node2 := mock.Node()
require.NoError(t, h.State.UpsertNode(h.NextIndex(), node2))

neval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: 50,
TriggeredBy: structs.EvalTriggerNodeUpdate,
NodeID: node1.ID,
JobID: job.ID,
Status: structs.EvalStatusPending,
}
require.NoError(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{neval}))

// Process the evaluation
err = h.Process(NewServiceScheduler, eval)
require.NoError(t, err)

// Now test that all node1 allocs are migrated while preserving Version and Canary info
{
ws := memdb.NewWatchSet()
allocs, err := h.State.AllocsByJob(ws, job.Namespace, job.ID, true)
require.NoError(t, err)
require.Len(t, allocs, 8)

nodeAllocs := map[string][]*structs.Allocation{}
for _, a := range allocs {
nodeAllocs[a.NodeID] = append(nodeAllocs[a.NodeID], a)
}

require.Len(t, nodeAllocs[node1.ID], 4)
for _, a := range nodeAllocs[node1.ID] {
require.Equal(t, structs.AllocDesiredStatusStop, a.DesiredStatus)
require.Equal(t, node1.ID, a.NodeID)
}

node2Allocs := nodeAllocs[node2.ID]
require.Len(t, node2Allocs, 4)
sort.Slice(node2Allocs, func(i, j int) bool { return node2Allocs[i].Job.Version < node2Allocs[j].Job.Version })

for _, a := range node2Allocs[:3] {
require.Equal(t, structs.AllocDesiredStatusRun, a.DesiredStatus)
require.Equal(t, uint64(0), a.Job.Version)
require.Equal(t, node2.ID, a.NodeID)
require.Equal(t, deployment.ID, a.DeploymentID)
}
require.Equal(t, structs.AllocDesiredStatusRun, node2Allocs[3].DesiredStatus)
require.Equal(t, uint64(1), node2Allocs[3].Job.Version)
require.Equal(t, node2.ID, node2Allocs[3].NodeID)
require.Equal(t, updateDeployment, node2Allocs[3].DeploymentID)
}
}
22 changes: 15 additions & 7 deletions scheduler/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,10 +424,11 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool {

// The fact that we have destructive updates and have less canaries than is
// desired means we need to create canaries
numDestructive := len(destructive)
strategy := tg.Update
canariesPromoted := dstate != nil && dstate.Promoted
requireCanary := numDestructive != 0 && strategy != nil && len(canaries) < strategy.Canary && !canariesPromoted
replaceAllAllocs := len(untainted) == 0 && len(migrate)+len(lost) != 0
requireCanary := (len(destructive) != 0 || replaceAllAllocs) &&
strategy != nil && len(canaries) < strategy.Canary && !canariesPromoted
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is semi-related band-aid that we'll probably need to investigate further. The code here determines if canaries are needed by checking if we have any destructive update. However, if all allocations are dead (because the nodes are lost), len(destructive) will be 0. I changed the condition to account for such scenario.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it might be nice to break this conditional up a bit, and capture some of what's going on here.

if requireCanary {
dstate.DesiredCanaries = strategy.Canary
}
Expand Down Expand Up @@ -455,7 +456,7 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool {
// * There is no delayed stop_after_client_disconnect alloc, which delays scheduling for the whole group
var place []allocPlaceResult
if len(lostLater) == 0 {
place = a.computePlacements(tg, nameIndex, untainted, migrate, rescheduleNow)
place = a.computePlacements(tg, nameIndex, untainted, migrate, rescheduleNow, canaryState)
if !existingDeployment {
dstate.DesiredTotal += len(place)
}
Expand Down Expand Up @@ -533,9 +534,12 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool {
})
a.result.place = append(a.result.place, allocPlaceResult{
name: alloc.Name,
canary: false,
canary: alloc.DeploymentStatus.IsCanary(),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code here assumed that all alloc migrations are non-canary. An odd assumption.

taskGroup: tg,
previousAlloc: alloc,

downgradeNonCanary: canaryState && !alloc.DeploymentStatus.IsCanary(),
minJobVersion: alloc.Job.Version,
})
}

Expand Down Expand Up @@ -708,7 +712,7 @@ func (a *allocReconciler) computeLimit(group *structs.TaskGroup, untainted, dest
// computePlacement returns the set of allocations to place given the group
// definition, the set of untainted, migrating and reschedule allocations for the group.
func (a *allocReconciler) computePlacements(group *structs.TaskGroup,
nameIndex *allocNameIndex, untainted, migrate allocSet, reschedule allocSet) []allocPlaceResult {
nameIndex *allocNameIndex, untainted, migrate allocSet, reschedule allocSet, canaryState bool) []allocPlaceResult {

// Add rescheduled placement results
var place []allocPlaceResult
Expand All @@ -719,6 +723,9 @@ func (a *allocReconciler) computePlacements(group *structs.TaskGroup,
previousAlloc: alloc,
reschedule: true,
canary: alloc.DeploymentStatus.IsCanary(),

downgradeNonCanary: canaryState && !alloc.DeploymentStatus.IsCanary(),
minJobVersion: alloc.Job.Version,
})
}

Expand All @@ -732,8 +739,9 @@ func (a *allocReconciler) computePlacements(group *structs.TaskGroup,
if existing < group.Count {
for _, name := range nameIndex.Next(uint(group.Count - existing)) {
place = append(place, allocPlaceResult{
name: name,
taskGroup: group,
name: name,
taskGroup: group,
downgradeNonCanary: canaryState,
})
}
}
Expand Down
Loading