Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Preemption for system jobs #4794

Merged
merged 38 commits into from
Nov 2, 2018
Merged

Preemption for system jobs #4794

merged 38 commits into from
Nov 2, 2018

Conversation

preetapan
Copy link
Contributor

This PR brings in prior work from PR #4710, rebased to master and modifying all the internal comparison logic to use the new ComparableResources struct.

@preetapan preetapan requested a review from dadgar October 17, 2018 16:08

}

// computeCurrentPreemptions counts the number of other allocations being preempted that match the job and task group of
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

todo this could be calculated ahead of time per jobid in the set of existing allocations on the node

// after accounting for reserved resources and all allocations

// Subtract the reserved resources of the node
if node.ComparableReservedResources() != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if c := node.ComparableReservedResources(); c != nil

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still use the suggestion

@@ -257,3 +258,102 @@ func TestOperator_ServerHealth_Unhealthy(t *testing.T) {
})
})
}

func TestOperator_SchedulerGetConfiguration(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ACL tests?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still needs ACL tests

nomad/fsm.go Show resolved Hide resolved
nomad/fsm.go Outdated
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
defer metrics.MeasureSince([]string{"nomad", "fsm", "scheduler-config"}, time.Now())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you check what other metrics do when they are two words. I feel like it may be underscore

buf, err = structs.Encode(structs.SchedulerConfigRequestType, req)
require.Nil(err)

resp = fsm.Apply(makeLog(buf))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't you check the resp as you are returning an actual value

nomad/leader.go Outdated
return config
}

config = &structs.SchedulerConfiguration{PreemptionConfig: structs.PreemptionConfig{SystemSchedulerEnabled: true}}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be a variable at the top of the file with a comment on it

}

*reply = *config

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Set indexes on the responses

nomad/state/schema.go Show resolved Hide resolved
@@ -3956,3 +3991,81 @@ func (r *StateRestore) addEphemeralDiskToTaskGroups(job *structs.Job) {
}
}
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move the code above the restore functions

nomad/fsm.go Outdated
defer metrics.MeasureSince([]string{"nomad", "fsm", "scheduler-config"}, time.Now())

if req.CAS {
act, err := n.state.SchedulerCASConfig(index, req.Config.ModifyIndex, &req.Config)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

act is a weird variable name? applied?

nomad/structs/operator.go Show resolved Hide resolved
nomad/fsm.go Outdated
@@ -1833,6 +1835,23 @@ func (s *nomadSnapshot) persistACLTokens(sink raft.SnapshotSink,
return nil
}

func (n *nomadFSM) applySchedulerConfigUpdate(buf []byte, index uint64) interface{} {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move to be above here:

nomad/fsm.go Outdated
@@ -1833,6 +1835,23 @@ func (s *nomadSnapshot) persistACLTokens(sink raft.SnapshotSink,
return nil
}

func (n *nomadFSM) applySchedulerConfigUpdate(buf []byte, index uint64) interface{} {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is the snapshot and restore code?

nomad/fsm_test.go Show resolved Hide resolved
@@ -1869,6 +1869,58 @@ func TestResource_Add_Network(t *testing.T) {
}
}

func TestResource_Subtract(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Convert to comparable resource test

@@ -130,7 +130,8 @@ func (s *GenericScheduler) Process(eval *structs.Evaluation) error {
structs.EvalTriggerRollingUpdate, structs.EvalTriggerQueuedAllocs,
structs.EvalTriggerPeriodicJob, structs.EvalTriggerMaxPlans,
structs.EvalTriggerDeploymentWatcher, structs.EvalTriggerRetryFailedAlloc,
structs.EvalTriggerFailedFollowUp:
structs.EvalTriggerFailedFollowUp,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Join into one line

// Also gather jobids to create follow up evals
preemptedJobIDs := make(map[structs.NamespacedID]struct{})
for _, alloc := range req.NodePreemptions {
if alloc.CreateTime == 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Create time doesn't make sense since we are preempting an alloc that exists

nomad/plan_apply.go Show resolved Hide resolved
offer, err = netIdx.AssignNetwork(ask)
if offer == nil {
iter.ctx.Metrics().ExhaustedNode(option.Node,
fmt.Sprintf("unexecpted error, unable to create offer after preempting:%v", err))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm seems odd to expose this in the metrics. I would drop that an instead emit an error level log (iter.ctx.Logger().Error(...))

@@ -195,6 +199,16 @@ OUTER:
DiskMB: int64(iter.taskGroup.EphemeralDisk.SizeMB),
},
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this method has to be refactored. It will get even more complex once devices are considered. I had a hard to following the code and I know it quite well

scheduler/rank.go Show resolved Hide resolved
// any allocs can be preempted

// Remove the last element containing the current placement from proposed allocs
current := proposed[:len(proposed)-1]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a fan of this. Capture it outside as a variable

scheduler/system_sched.go Show resolved Hide resolved

return resp, nil

case "PUT":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We accept POST and PUT

// Switch on the method
switch req.Method {
case "GET":
var args structs.GenericRequest
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally split the methods up so that it only handles one type of HTTP verb: https://github.com/hashicorp/nomad/blob/master/command/agent/job_endpoint.go#L15

SchedulerConfig SchedulerConfiguration

// CreateIndex/ModifyIndex store the create/modify indexes of this configuration.
CreateIndex uint64
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return reply, nil

default:
return nil, CodedError(404, ErrInvalidMethod)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

404 -> 405

if !args.CAS {
return true, nil
}
return reply, nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CreateIndex: config.CreateIndex,
ModifyIndex: config.ModifyIndex,
}
*reply = *resp
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just use the reply, there is no need for the resp variable?


// SchedulerCASConfig is used to try updating the scheduler configuration with a
// given Raft index. If the CAS index specified is not equal to the last observed index
// for the config, then the call is a noop,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Last comma should be a period

return nil
}

// SchedulerCASConfig is used to try updating the scheduler configuration with a
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is used to update the scheduler configuration...

// index arg, then we shouldn't update anything and can safely
// return early here.
e, ok := existing.(*structs.SchedulerConfiguration)
if !ok || e.ModifyIndex != cidx {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I may be wrong here butt can't e == nil and ok == true, in which case you can panic dereferencing the modify index.

SchedulerConfig SchedulerConfiguration

// CreateIndex/ModifyIndex store the create/modify indexes of this configuration.
CreateIndex uint64
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

QueryMeta

Copy link
Contributor

@dadgar dadgar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Main comment is that we need to reduce the number of times we call ComparableResources on allocations. It should only ever be called once per alloc

@@ -181,6 +187,36 @@ func (p *planner) applyPlan(plan *structs.Plan, result *structs.PlanResult, snap
alloc.ModifyTime = now
}

// Set create and modify time for preempted allocs if any
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove create and

@@ -318,6 +356,25 @@ func evaluatePlanPlacements(pool *EvaluatePool, snap *state.StateSnapshot, plan
if nodeAlloc := plan.NodeAllocation[nodeID]; len(nodeAlloc) > 0 {
result.NodeAllocation[nodeID] = nodeAlloc
}

if nodePreemptions := plan.NodePreemptions[nodeID]; nodePreemptions != nil {
var filteredNodePreemptions []*structs.Allocation
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks a bit cleaner if you put the slice that is being used right above the for loop. So 1. Comment 2. slice declaration 3. for loop

nomad/plan_apply.go Show resolved Hide resolved
// by reading AllocatedResources which are populated starting in 0.9 and
// falling back to pre 0.9 fields (Resources/TaskResources) if set
func (a *Allocation) CompatibleNetworkResources() []*NetworkResource {
var ret []*NetworkResource
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delete this now?

nomad/structs/structs.go Show resolved Hide resolved
// First try to satisfy needed reserved ports
if len(reservedPortsNeeded) > 0 {
for _, alloc := range currentAllocs {
for _, tr := range alloc.TaskResources {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't you be using comparable here. This doesn't use the new AllocatedResources stanza and will break once TaskResources is deprecated

}

func (n *NetworkPreemptionResource) Distance() float64 {
networkCoord := math.MaxFloat64
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Call networkResourceDistance() to be DRY

networkCoord = float64(resourceNeeded.MBits-resourceUsed.MBits) / float64(resourceNeeded.MBits)
}

originDist := math.Sqrt(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same issue

// Do another pass to eliminate allocations that are a superset of other allocations
// in the preemption set
for _, alloc := range bestAllocs {
if preemptedResources == nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment, you are just accumulating node free resources + preempted resources so just use one object

scheduler/preemption.go Show resolved Hide resolved
if tg != nil && tg.Migrate != nil {
maxParallel = tg.Migrate.MaxParallel
}
p.allocDetails[alloc.ID] = &allocInfo{maxParallel, alloc.ComparableResources()}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think vet will complain at this. You should use the field name when constructing the struct

if p.jobPriority-alloc.Job.Priority < 10 {

// If this allocation uses a needed reserved port
// preemption is impossible so we return early
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment a bit stale

return filteredBestAllocs
}

func usedReservedPorts(net *structs.NetworkResource, portMap map[int]interface{}) bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deadcode?

availableResources.Add(closestResources)

// This step needs the original resources asked for as the second arg, can't use the running total
allRequirementsMet, _ = availableResources.Superset(resourceAsk.Comparable())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

compute resourceAsk.Comparable() once at the top of this method?

reservedPortsNeeded := networkResourceAsk.ReservedPorts

// Build map of reserved ports needed for fast access
reservedPorts := make(map[int]interface{})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use struct{} instead of interface{}. It is a smaller object and more canonical for set implementation.

reservedPorts[port.Value] = struct{}{}
}

filteredReservedPorts := make(map[string]map[int]struct{})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment on what this map is for

// Reset allocsToPreempt since we don't want to preempt across devices for the same task
allocsToPreempt = nil

usedPortToAlloc := make(map[int]*structs.Allocation)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment

@dadgar
Copy link
Contributor

dadgar commented Nov 2, 2018

LGTM after last comments are addressed

@preetapan preetapan merged commit 6aa0c7f into master Nov 2, 2018
@preetapan preetapan deleted the f-preemption-systemjobs branch November 2, 2018 23:13
@github-actions
Copy link

I'm going to lock this pull request because it has been closed for 120 days ⏳. This helps our maintainers find and focus on the active contributions.
If you have found a problem that seems related to this change, please open a new issue and complete the issue template so we can capture all the details necessary to investigate further.

@github-actions github-actions bot locked as resolved and limited conversation to collaborators Feb 25, 2023
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants