-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Respect alloc job version for lost/failed allocs #8691
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -2,6 +2,7 @@ package scheduler | |||||
|
||||||
import ( | ||||||
"fmt" | ||||||
"sort" | ||||||
"time" | ||||||
|
||||||
log "github.com/hashicorp/go-hclog" | ||||||
|
@@ -387,12 +388,12 @@ func (s *GenericScheduler) computeJobAllocs() error { | |||||
update.DeploymentID = s.deployment.GetID() | ||||||
update.DeploymentStatus = nil | ||||||
} | ||||||
s.ctx.Plan().AppendAlloc(update) | ||||||
s.ctx.Plan().AppendAlloc(update, nil) | ||||||
} | ||||||
|
||||||
// Handle the annotation updates | ||||||
for _, update := range results.attributeUpdates { | ||||||
s.ctx.Plan().AppendAlloc(update) | ||||||
s.ctx.Plan().AppendAlloc(update, nil) | ||||||
} | ||||||
|
||||||
// Nothing remaining to do if placement is not required | ||||||
|
@@ -429,6 +430,32 @@ func (s *GenericScheduler) computeJobAllocs() error { | |||||
return s.computePlacements(destructive, place) | ||||||
} | ||||||
|
||||||
// downgradedJobForPlacement returns the job appropriate for non-canary placement replacement | ||||||
func (s *GenericScheduler) downgradedJobForPlacement(p placementResult) (string, *structs.Job, error) { | ||||||
ns, jobID := s.job.Namespace, s.job.ID | ||||||
tgName := p.TaskGroup().Name | ||||||
|
||||||
// find deployments and use the latest promoted or canaried version | ||||||
deployments, err := s.state.DeploymentsByJobID(nil, ns, jobID, false) | ||||||
if err != nil { | ||||||
return "", nil, fmt.Errorf("failed to lookup job deployments: %v", err) | ||||||
} | ||||||
sort.Slice(deployments, func(i, j int) bool { return deployments[i].JobVersion > deployments[i].JobVersion }) | ||||||
for _, d := range deployments { | ||||||
// It's unexpected to have a recent deployment that doesn't contain the TaskGroup; as all allocations | ||||||
// should be destroyed. In such cases, attempt to find the deployment for that TaskGroup and hopefully | ||||||
// we will kill it soon. This is a defensive measure, have not seen it in practice | ||||||
// | ||||||
// Zero dstate.DesiredCanaries indicates that the TaskGroup allocates were updated in-place without using canaries. | ||||||
if dstate := d.TaskGroups[tgName]; dstate != nil && (dstate.Promoted || dstate.DesiredCanaries == 0) { | ||||||
job, err := s.state.JobByIDAndVersion(nil, ns, jobID, d.JobVersion) | ||||||
return d.ID, job, err | ||||||
} | ||||||
} | ||||||
|
||||||
return "", nil, nil | ||||||
} | ||||||
|
||||||
// computePlacements computes placements for allocations. It is given the set of | ||||||
// destructive updates to place and the set of new placements to place. | ||||||
func (s *GenericScheduler) computePlacements(destructive, place []placementResult) error { | ||||||
|
@@ -457,12 +484,40 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul | |||||
// Get the task group | ||||||
tg := missing.TaskGroup() | ||||||
|
||||||
var downgradedJob *structs.Job | ||||||
|
||||||
if missing.DowngradeNonCanary() { | ||||||
jobDeploymentID, job, err := s.downgradedJobForPlacement(missing) | ||||||
if err != nil { | ||||||
return err | ||||||
} | ||||||
|
||||||
// Defensive check - if there is no appropriate deployment for this job, use the latest | ||||||
if job != nil && job.Version >= missing.MinJobVersion() && job.LookupTaskGroup(tg.Name) != nil { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I made few defensive checks here, where if we see unexpected state (e.g. jobs without expected TaskGroup, no non-promoted version), we'd fallback to using the latest version. This seems better than a panic, but not sure if we should simplify this. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is this unexpected? for jobs without update stanza, there won't be deployments, so that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, it's unexpected. |
||||||
tg = job.LookupTaskGroup(tg.Name) | ||||||
downgradedJob = job | ||||||
deploymentID = jobDeploymentID | ||||||
} else { | ||||||
jobVersion := -1 | ||||||
if job != nil { | ||||||
jobVersion = int(job.Version) | ||||||
} | ||||||
s.logger.Debug("failed to find appropriate job; using the latest", "expected_version", missing.MinJobVersion, "found_version", jobVersion) | ||||||
} | ||||||
} | ||||||
|
||||||
// Check if this task group has already failed | ||||||
if metric, ok := s.failedTGAllocs[tg.Name]; ok { | ||||||
metric.CoalescedFailures += 1 | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need to restore the stack's original Job here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It needs to happen below, after a placement is made - particularly after |
||||||
continue | ||||||
} | ||||||
|
||||||
// Use downgraded job in scheduling stack to honor | ||||||
// old job resources and constraints | ||||||
if downgradedJob != nil { | ||||||
s.stack.SetJob(downgradedJob) | ||||||
} | ||||||
|
||||||
// Find the preferred node | ||||||
preferredNode, err := s.findPreferredNode(missing) | ||||||
if err != nil { | ||||||
|
@@ -489,6 +544,11 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul | |||||
// Compute top K scoring node metadata | ||||||
s.ctx.Metrics().PopulateScoreMetaData() | ||||||
|
||||||
// Restore stack job now that placement is done, to use plan job version | ||||||
if downgradedJob != nil { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe this is better? |
||||||
s.stack.SetJob(s.job) | ||||||
} | ||||||
|
||||||
// Set fields based on if we found an allocation option | ||||||
if option != nil { | ||||||
resources := &structs.AllocatedResources{ | ||||||
|
@@ -547,7 +607,7 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul | |||||
s.handlePreemptions(option, alloc, missing) | ||||||
|
||||||
// Track the placement | ||||||
s.plan.AppendAlloc(alloc) | ||||||
s.plan.AppendAlloc(alloc, downgradedJob) | ||||||
|
||||||
} else { | ||||||
// Lazy initialize the failed map | ||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -424,10 +424,11 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool { | |
|
||
// The fact that we have destructive updates and have less canaries than is | ||
// desired means we need to create canaries | ||
numDestructive := len(destructive) | ||
strategy := tg.Update | ||
canariesPromoted := dstate != nil && dstate.Promoted | ||
requireCanary := numDestructive != 0 && strategy != nil && len(canaries) < strategy.Canary && !canariesPromoted | ||
replaceAllAllocs := len(untainted) == 0 && len(migrate)+len(lost) != 0 | ||
requireCanary := (len(destructive) != 0 || replaceAllAllocs) && | ||
strategy != nil && len(canaries) < strategy.Canary && !canariesPromoted | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is semi-related band-aid that we'll probably need to investigate further. The code here determines if canaries are needed by checking if we have any destructive update. However, if all allocations are dead (because the nodes are lost), There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it might be nice to break this conditional up a bit, and capture some of what's going on here. |
||
if requireCanary { | ||
dstate.DesiredCanaries = strategy.Canary | ||
} | ||
|
@@ -455,7 +456,7 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool { | |
// * There is no delayed stop_after_client_disconnect alloc, which delays scheduling for the whole group | ||
var place []allocPlaceResult | ||
if len(lostLater) == 0 { | ||
place = a.computePlacements(tg, nameIndex, untainted, migrate, rescheduleNow) | ||
place = a.computePlacements(tg, nameIndex, untainted, migrate, rescheduleNow, canaryState) | ||
if !existingDeployment { | ||
dstate.DesiredTotal += len(place) | ||
} | ||
|
@@ -533,9 +534,12 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool { | |
}) | ||
a.result.place = append(a.result.place, allocPlaceResult{ | ||
name: alloc.Name, | ||
canary: false, | ||
canary: alloc.DeploymentStatus.IsCanary(), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The code here assumed that all alloc migrations are non-canary. An odd assumption. |
||
taskGroup: tg, | ||
previousAlloc: alloc, | ||
|
||
downgradeNonCanary: canaryState && !alloc.DeploymentStatus.IsCanary(), | ||
minJobVersion: alloc.Job.Version, | ||
}) | ||
} | ||
|
||
|
@@ -708,7 +712,7 @@ func (a *allocReconciler) computeLimit(group *structs.TaskGroup, untainted, dest | |
// computePlacement returns the set of allocations to place given the group | ||
// definition, the set of untainted, migrating and reschedule allocations for the group. | ||
func (a *allocReconciler) computePlacements(group *structs.TaskGroup, | ||
nameIndex *allocNameIndex, untainted, migrate allocSet, reschedule allocSet) []allocPlaceResult { | ||
nameIndex *allocNameIndex, untainted, migrate allocSet, reschedule allocSet, canaryState bool) []allocPlaceResult { | ||
|
||
// Add rescheduled placement results | ||
var place []allocPlaceResult | ||
|
@@ -719,6 +723,9 @@ func (a *allocReconciler) computePlacements(group *structs.TaskGroup, | |
previousAlloc: alloc, | ||
reschedule: true, | ||
canary: alloc.DeploymentStatus.IsCanary(), | ||
|
||
downgradeNonCanary: canaryState && !alloc.DeploymentStatus.IsCanary(), | ||
minJobVersion: alloc.Job.Version, | ||
}) | ||
} | ||
|
||
|
@@ -732,8 +739,9 @@ func (a *allocReconciler) computePlacements(group *structs.TaskGroup, | |
if existing < group.Count { | ||
for _, name := range nameIndex.Next(uint(group.Count - existing)) { | ||
place = append(place, allocPlaceResult{ | ||
name: name, | ||
taskGroup: group, | ||
name: name, | ||
taskGroup: group, | ||
downgradeNonCanary: canaryState, | ||
}) | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we first compare d.JobVersion against s.job.Version and if they're equal: return nil since they're equivalent?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's reasonable but also seems like a micro-optimization - I may consider it when addressing reviews.