-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Preemption for system jobs #4794
Conversation
scheduler/preemption.go
Outdated
|
||
} | ||
|
||
// computeCurrentPreemptions counts the number of other allocations being preempted that match the job and task group of |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
todo this could be calculated ahead of time per jobid in the set of existing allocations on the node
scheduler/preemption.go
Outdated
// after accounting for reserved resources and all allocations | ||
|
||
// Subtract the reserved resources of the node | ||
if node.ComparableReservedResources() != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if c := node.ComparableReservedResources(); c != nil
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still use the suggestion
@@ -257,3 +258,102 @@ func TestOperator_ServerHealth_Unhealthy(t *testing.T) { | |||
}) | |||
}) | |||
} | |||
|
|||
func TestOperator_SchedulerGetConfiguration(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ACL tests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still needs ACL tests
nomad/fsm.go
Outdated
if err := structs.Decode(buf, &req); err != nil { | ||
panic(fmt.Errorf("failed to decode request: %v", err)) | ||
} | ||
defer metrics.MeasureSince([]string{"nomad", "fsm", "scheduler-config"}, time.Now()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you check what other metrics do when they are two words. I feel like it may be underscore
buf, err = structs.Encode(structs.SchedulerConfigRequestType, req) | ||
require.Nil(err) | ||
|
||
resp = fsm.Apply(makeLog(buf)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't you check the resp as you are returning an actual value
nomad/leader.go
Outdated
return config | ||
} | ||
|
||
config = &structs.SchedulerConfiguration{PreemptionConfig: structs.PreemptionConfig{SystemSchedulerEnabled: true}} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this be a variable at the top of the file with a comment on it
} | ||
|
||
*reply = *config | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Set indexes on the responses
nomad/state/state_store.go
Outdated
@@ -3956,3 +3991,81 @@ func (r *StateRestore) addEphemeralDiskToTaskGroups(job *structs.Job) { | |||
} | |||
} | |||
} | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move the code above the restore functions
nomad/fsm.go
Outdated
defer metrics.MeasureSince([]string{"nomad", "fsm", "scheduler-config"}, time.Now()) | ||
|
||
if req.CAS { | ||
act, err := n.state.SchedulerCASConfig(index, req.Config.ModifyIndex, &req.Config) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
act is a weird variable name? applied
?
nomad/fsm.go
Outdated
@@ -1833,6 +1835,23 @@ func (s *nomadSnapshot) persistACLTokens(sink raft.SnapshotSink, | |||
return nil | |||
} | |||
|
|||
func (n *nomadFSM) applySchedulerConfigUpdate(buf []byte, index uint64) interface{} { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move to be above here:
Line 998 in 4db7e87
nomad/fsm.go
Outdated
@@ -1833,6 +1835,23 @@ func (s *nomadSnapshot) persistACLTokens(sink raft.SnapshotSink, | |||
return nil | |||
} | |||
|
|||
func (n *nomadFSM) applySchedulerConfigUpdate(buf []byte, index uint64) interface{} { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where is the snapshot and restore code?
nomad/structs/structs_test.go
Outdated
@@ -1869,6 +1869,58 @@ func TestResource_Add_Network(t *testing.T) { | |||
} | |||
} | |||
|
|||
func TestResource_Subtract(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Convert to comparable resource test
scheduler/generic_sched.go
Outdated
@@ -130,7 +130,8 @@ func (s *GenericScheduler) Process(eval *structs.Evaluation) error { | |||
structs.EvalTriggerRollingUpdate, structs.EvalTriggerQueuedAllocs, | |||
structs.EvalTriggerPeriodicJob, structs.EvalTriggerMaxPlans, | |||
structs.EvalTriggerDeploymentWatcher, structs.EvalTriggerRetryFailedAlloc, | |||
structs.EvalTriggerFailedFollowUp: | |||
structs.EvalTriggerFailedFollowUp, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Join into one line
nomad/plan_apply.go
Outdated
// Also gather jobids to create follow up evals | ||
preemptedJobIDs := make(map[structs.NamespacedID]struct{}) | ||
for _, alloc := range req.NodePreemptions { | ||
if alloc.CreateTime == 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Create time doesn't make sense since we are preempting an alloc that exists
scheduler/rank.go
Outdated
offer, err = netIdx.AssignNetwork(ask) | ||
if offer == nil { | ||
iter.ctx.Metrics().ExhaustedNode(option.Node, | ||
fmt.Sprintf("unexecpted error, unable to create offer after preempting:%v", err)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm seems odd to expose this in the metrics. I would drop that an instead emit an error level log (iter.ctx.Logger().Error(...)
)
@@ -195,6 +199,16 @@ OUTER: | |||
DiskMB: int64(iter.taskGroup.EphemeralDisk.SizeMB), | |||
}, | |||
} | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this method has to be refactored. It will get even more complex once devices are considered. I had a hard to following the code and I know it quite well
scheduler/rank.go
Outdated
// any allocs can be preempted | ||
|
||
// Remove the last element containing the current placement from proposed allocs | ||
current := proposed[:len(proposed)-1] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a fan of this. Capture it outside as a variable
This commit implements an allocation selection algorithm for finding allocations to preempt. It currently special cases network resource asks from others (cpu/memory/disk/iops).
21a9b8b
to
f2b0277
Compare
|
||
return resp, nil | ||
|
||
case "PUT": |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We accept POST and PUT
// Switch on the method | ||
switch req.Method { | ||
case "GET": | ||
var args structs.GenericRequest |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally split the methods up so that it only handles one type of HTTP verb: https://github.com/hashicorp/nomad/blob/master/command/agent/job_endpoint.go#L15
SchedulerConfig SchedulerConfiguration | ||
|
||
// CreateIndex/ModifyIndex store the create/modify indexes of this configuration. | ||
CreateIndex uint64 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be WriteMeta and QueryMeta: https://github.com/hashicorp/nomad/blob/master/api/jobs.go#L1039-L1046
return reply, nil | ||
|
||
default: | ||
return nil, CodedError(404, ErrInvalidMethod) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
404 -> 405
if !args.CAS { | ||
return true, nil | ||
} | ||
return reply, nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CreateIndex: config.CreateIndex, | ||
ModifyIndex: config.ModifyIndex, | ||
} | ||
*reply = *resp |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just use the reply, there is no need for the resp variable?
nomad/state/state_store.go
Outdated
|
||
// SchedulerCASConfig is used to try updating the scheduler configuration with a | ||
// given Raft index. If the CAS index specified is not equal to the last observed index | ||
// for the config, then the call is a noop, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Last comma should be a period
nomad/state/state_store.go
Outdated
return nil | ||
} | ||
|
||
// SchedulerCASConfig is used to try updating the scheduler configuration with a |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is used to update the scheduler configuration...
// index arg, then we shouldn't update anything and can safely | ||
// return early here. | ||
e, ok := existing.(*structs.SchedulerConfiguration) | ||
if !ok || e.ModifyIndex != cidx { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I may be wrong here butt can't e == nil
and ok == true
, in which case you can panic dereferencing the modify index.
SchedulerConfig SchedulerConfiguration | ||
|
||
// CreateIndex/ModifyIndex store the create/modify indexes of this configuration. | ||
CreateIndex uint64 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
QueryMeta
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Main comment is that we need to reduce the number of times we call ComparableResources on allocations. It should only ever be called once per alloc
nomad/plan_apply.go
Outdated
@@ -181,6 +187,36 @@ func (p *planner) applyPlan(plan *structs.Plan, result *structs.PlanResult, snap | |||
alloc.ModifyTime = now | |||
} | |||
|
|||
// Set create and modify time for preempted allocs if any |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove create and
nomad/plan_apply.go
Outdated
@@ -318,6 +356,25 @@ func evaluatePlanPlacements(pool *EvaluatePool, snap *state.StateSnapshot, plan | |||
if nodeAlloc := plan.NodeAllocation[nodeID]; len(nodeAlloc) > 0 { | |||
result.NodeAllocation[nodeID] = nodeAlloc | |||
} | |||
|
|||
if nodePreemptions := plan.NodePreemptions[nodeID]; nodePreemptions != nil { | |||
var filteredNodePreemptions []*structs.Allocation |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks a bit cleaner if you put the slice that is being used right above the for loop. So 1. Comment 2. slice declaration 3. for loop
nomad/structs/structs.go
Outdated
// by reading AllocatedResources which are populated starting in 0.9 and | ||
// falling back to pre 0.9 fields (Resources/TaskResources) if set | ||
func (a *Allocation) CompatibleNetworkResources() []*NetworkResource { | ||
var ret []*NetworkResource |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Delete this now?
scheduler/preemption.go
Outdated
// First try to satisfy needed reserved ports | ||
if len(reservedPortsNeeded) > 0 { | ||
for _, alloc := range currentAllocs { | ||
for _, tr := range alloc.TaskResources { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't you be using comparable here. This doesn't use the new AllocatedResources stanza and will break once TaskResources is deprecated
scheduler/preemption.go
Outdated
} | ||
|
||
func (n *NetworkPreemptionResource) Distance() float64 { | ||
networkCoord := math.MaxFloat64 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Call networkResourceDistance() to be DRY
scheduler/preemption.go
Outdated
networkCoord = float64(resourceNeeded.MBits-resourceUsed.MBits) / float64(resourceNeeded.MBits) | ||
} | ||
|
||
originDist := math.Sqrt( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same issue
scheduler/preemption.go
Outdated
// Do another pass to eliminate allocations that are a superset of other allocations | ||
// in the preemption set | ||
for _, alloc := range bestAllocs { | ||
if preemptedResources == nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment, you are just accumulating node free resources + preempted resources so just use one object
…m alloc.Resources
also removed now unused method
Also added test case
scheduler/preemption.go
Outdated
if tg != nil && tg.Migrate != nil { | ||
maxParallel = tg.Migrate.MaxParallel | ||
} | ||
p.allocDetails[alloc.ID] = &allocInfo{maxParallel, alloc.ComparableResources()} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think vet will complain at this. You should use the field name when constructing the struct
scheduler/preemption.go
Outdated
if p.jobPriority-alloc.Job.Priority < 10 { | ||
|
||
// If this allocation uses a needed reserved port | ||
// preemption is impossible so we return early |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment a bit stale
scheduler/preemption.go
Outdated
return filteredBestAllocs | ||
} | ||
|
||
func usedReservedPorts(net *structs.NetworkResource, portMap map[int]interface{}) bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Deadcode?
scheduler/preemption.go
Outdated
availableResources.Add(closestResources) | ||
|
||
// This step needs the original resources asked for as the second arg, can't use the running total | ||
allRequirementsMet, _ = availableResources.Superset(resourceAsk.Comparable()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
compute resourceAsk.Comparable()
once at the top of this method?
scheduler/preemption.go
Outdated
reservedPortsNeeded := networkResourceAsk.ReservedPorts | ||
|
||
// Build map of reserved ports needed for fast access | ||
reservedPorts := make(map[int]interface{}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use struct{} instead of interface{}. It is a smaller object and more canonical for set implementation.
reservedPorts[port.Value] = struct{}{} | ||
} | ||
|
||
filteredReservedPorts := make(map[string]map[int]struct{}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment on what this map is for
// Reset allocsToPreempt since we don't want to preempt across devices for the same task | ||
allocsToPreempt = nil | ||
|
||
usedPortToAlloc := make(map[int]*structs.Allocation) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment
LGTM after last comments are addressed |
I'm going to lock this pull request because it has been closed for 120 days ⏳. This helps our maintainers find and focus on the active contributions. |
This PR brings in prior work from PR #4710, rebased to master and modifying all the internal comparison logic to use the new
ComparableResources
struct.