Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Normalize plan before sending to increase the plan apply throughput #5407

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions api/allocations.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ type Allocation struct {
EvalID string
Name string
NodeID string
NodeName string
JobID string
arshjohar marked this conversation as resolved.
Show resolved Hide resolved
Job *Job
TaskGroup string
Expand Down Expand Up @@ -149,6 +150,7 @@ type AllocationListStub struct {
Name string
Namespace string
NodeID string
NodeName string
JobID string
JobType string
JobVersion uint64
Expand Down
1 change: 1 addition & 0 deletions command/alloc_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ func formatAllocBasicInfo(alloc *api.Allocation, client *api.Client, uuidLength
fmt.Sprintf("Eval ID|%s", limit(alloc.EvalID, uuidLength)),
fmt.Sprintf("Name|%s", alloc.Name),
fmt.Sprintf("Node ID|%s", limit(alloc.NodeID, uuidLength)),
fmt.Sprintf("Node Name|%s", alloc.NodeName),
fmt.Sprintf("Job ID|%s", alloc.JobID),
fmt.Sprintf("Job Version|%d", getVersion(alloc.Job)),
fmt.Sprintf("Client Status|%s", alloc.ClientStatus),
Expand Down
5 changes: 5 additions & 0 deletions command/alloc_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,11 @@ func TestAllocStatusCommand_Run(t *testing.T) {
}
// get an alloc id
allocId1 := ""
nodeName := ""
if allocs, _, err := client.Jobs().Allocations(jobID, false, nil); err == nil {
if len(allocs) > 0 {
allocId1 = allocs[0].ID
nodeName = allocs[0].NodeName
}
}
if allocId1 == "" {
Expand All @@ -141,6 +143,9 @@ func TestAllocStatusCommand_Run(t *testing.T) {
t.Fatalf("expected to have 'Modified' but saw: %s", out)
}

nodeNameRegexpStr := fmt.Sprintf(`\nNode Name\s+= %s\n`, regexp.QuoteMeta(nodeName))
require.Regexp(t, regexp.MustCompile(nodeNameRegexpStr), out)

ui.OutputWriter.Reset()

if code := cmd.Run([]string{"-address=" + url, "-verbose", allocId1}); code != 0 {
Expand Down
5 changes: 3 additions & 2 deletions command/job_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,12 +413,13 @@ func formatAllocListStubs(stubs []*api.AllocationListStub, verbose bool, uuidLen

allocs := make([]string, len(stubs)+1)
if verbose {
allocs[0] = "ID|Eval ID|Node ID|Task Group|Version|Desired|Status|Created|Modified"
allocs[0] = "ID|Eval ID|Node ID|Node Name|Task Group|Version|Desired|Status|Created|Modified"
for i, alloc := range stubs {
allocs[i+1] = fmt.Sprintf("%s|%s|%s|%s|%d|%s|%s|%s|%s",
allocs[i+1] = fmt.Sprintf("%s|%s|%s|%s|%s|%d|%s|%s|%s|%s",
limit(alloc.ID, uuidLength),
limit(alloc.EvalID, uuidLength),
limit(alloc.NodeID, uuidLength),
alloc.NodeName,
alloc.TaskGroup,
alloc.JobVersion,
alloc.DesiredStatus,
Expand Down
18 changes: 18 additions & 0 deletions command/job_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package command

import (
"fmt"
"regexp"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -123,6 +124,14 @@ func TestJobStatusCommand_Run(t *testing.T) {
if code := cmd.Run([]string{"-address=" + url, "-verbose", "job2_sfx"}); code != 0 {
t.Fatalf("expected exit 0, got: %d", code)
}

nodeName := ""
if allocs, _, err := client.Jobs().Allocations("job2_sfx", false, nil); err == nil {
if len(allocs) > 0 {
nodeName = allocs[0].NodeName
}
}

out = ui.OutputWriter.String()
if strings.Contains(out, "job1_sfx") || !strings.Contains(out, "job2_sfx") {
t.Fatalf("expected only job2_sfx, got: %s", out)
Expand All @@ -139,6 +148,15 @@ func TestJobStatusCommand_Run(t *testing.T) {
if !strings.Contains(out, "Modified") {
t.Fatal("should have modified header")
}

// string calculations based on 1-byte chars, not using runes
allocationsTableName := "Allocations\n"
allocationsTableStr := strings.Split(out, allocationsTableName)[1]
nodeNameHeaderStr := "Node Name"
nodeNameHeaderIndex := strings.Index(allocationsTableStr, nodeNameHeaderStr)
nodeNameRegexpStr := fmt.Sprintf(`.*%s.*\n.{%d}%s`, nodeNameHeaderStr, nodeNameHeaderIndex, regexp.QuoteMeta(nodeName))
require.Regexp(t, regexp.MustCompile(nodeNameRegexpStr), out)

ui.ErrorWriter.Reset()
ui.OutputWriter.Reset()

Expand Down
4 changes: 2 additions & 2 deletions nomad/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -1243,7 +1243,7 @@ func (s *Server) getOrCreateAutopilotConfig() *structs.AutopilotConfig {
return config
}

if !ServersMeetMinimumVersion(s.Members(), minAutopilotVersion) {
if !ServersMeetMinimumVersion(s.Members(), minAutopilotVersion, false) {
s.logger.Named("autopilot").Warn("can't initialize until all servers are above minimum version", "min_version", minAutopilotVersion)
return nil
}
Expand All @@ -1270,7 +1270,7 @@ func (s *Server) getOrCreateSchedulerConfig() *structs.SchedulerConfiguration {
if config != nil {
return config
}
if !ServersMeetMinimumVersion(s.Members(), minSchedulerConfigVersion) {
if !ServersMeetMinimumVersion(s.Members(), minSchedulerConfigVersion, false) {
s.logger.Named("core").Warn("can't initialize scheduler config until all servers are above minimum version", "min_version", minSchedulerConfigVersion)
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions nomad/operator_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ func (op *Operator) AutopilotSetConfiguration(args *structs.AutopilotSetConfigRe
}

// All servers should be at or above 0.8.0 to apply this operatation
if !ServersMeetMinimumVersion(op.srv.Members(), minAutopilotVersion) {
if !ServersMeetMinimumVersion(op.srv.Members(), minAutopilotVersion, false) {
return fmt.Errorf("All servers should be running version %v to update autopilot config", minAutopilotVersion)
}

Expand Down Expand Up @@ -305,7 +305,7 @@ func (op *Operator) SchedulerSetConfiguration(args *structs.SchedulerSetConfigRe
}

// All servers should be at or above 0.9.0 to apply this operatation
if !ServersMeetMinimumVersion(op.srv.Members(), minSchedulerConfigVersion) {
if !ServersMeetMinimumVersion(op.srv.Members(), minSchedulerConfigVersion, false) {
return fmt.Errorf("All servers should be running version %v to update scheduler config", minSchedulerConfigVersion)
}
// Apply the update
Expand Down
149 changes: 109 additions & 40 deletions nomad/plan_apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"github.com/hashicorp/raft"
)

// planner is used to mange the submitted allocation plans that are waiting
// planner is used to manage the submitted allocation plans that are waiting
// to be accessed by the leader
type planner struct {
*Server
Expand Down Expand Up @@ -149,52 +149,81 @@ func (p *planner) planApply() {

// applyPlan is used to apply the plan result and to return the alloc index
func (p *planner) applyPlan(plan *structs.Plan, result *structs.PlanResult, snap *state.StateSnapshot) (raft.ApplyFuture, error) {
// Determine the minimum number of updates, could be more if there
// are multiple updates per node
minUpdates := len(result.NodeUpdate)
minUpdates += len(result.NodeAllocation)

// Setup the update request
req := structs.ApplyPlanResultsRequest{
AllocUpdateRequest: structs.AllocUpdateRequest{
Job: plan.Job,
Alloc: make([]*structs.Allocation, 0, minUpdates),
Job: plan.Job,
},
Deployment: result.Deployment,
DeploymentUpdates: result.DeploymentUpdates,
EvalID: plan.EvalID,
NodePreemptions: make([]*structs.Allocation, 0, len(result.NodePreemptions)),
}
for _, updateList := range result.NodeUpdate {
req.Alloc = append(req.Alloc, updateList...)
}
for _, allocList := range result.NodeAllocation {
req.Alloc = append(req.Alloc, allocList...)
}

for _, preemptions := range result.NodePreemptions {
req.NodePreemptions = append(req.NodePreemptions, preemptions...)
}

// Set the time the alloc was applied for the first time. This can be used
// to approximate the scheduling time.
preemptedJobIDs := make(map[structs.NamespacedID]struct{})
now := time.Now().UTC().UnixNano()
for _, alloc := range req.Alloc {
if alloc.CreateTime == 0 {
alloc.CreateTime = now

if ServersMeetMinimumVersion(p.Members(), MinVersionPlanNormalization, true) {
// Initialize the allocs request using the new optimized log entry format.
// Determine the minimum number of updates, could be more if there
// are multiple updates per node
req.AllocsStopped = make([]*structs.Allocation, 0, len(result.NodeUpdate))
req.AllocsUpdated = make([]*structs.Allocation, 0, len(result.NodeAllocation))

for _, updateList := range result.NodeUpdate {
for _, stoppedAlloc := range updateList {
req.AllocsStopped = append(req.AllocsStopped, normalizeStoppedAlloc(stoppedAlloc, now))
}
}
alloc.ModifyTime = now
}

// Set modify time for preempted allocs if any
// Also gather jobids to create follow up evals
preemptedJobIDs := make(map[structs.NamespacedID]struct{})
for _, alloc := range req.NodePreemptions {
alloc.ModifyTime = now
id := structs.NamespacedID{Namespace: alloc.Namespace, ID: alloc.JobID}
_, ok := preemptedJobIDs[id]
if !ok {
preemptedJobIDs[id] = struct{}{}
for _, allocList := range result.NodeAllocation {
req.AllocsUpdated = append(req.AllocsUpdated, allocList...)
}

// Set the time the alloc was applied for the first time. This can be used
// to approximate the scheduling time.
updateAllocTimestamps(req.AllocsUpdated, now)

for _, preemptions := range result.NodePreemptions {
for _, preemptedAlloc := range preemptions {
req.NodePreemptions = append(req.NodePreemptions, normalizePreemptedAlloc(preemptedAlloc, now))

// Gather jobids to create follow up evals
appendNamespacedJobID(preemptedJobIDs, preemptedAlloc)
}
}
} else {
// COMPAT 0.11: This branch is deprecated and will only be used to support
// application of older log entries. Expected to be removed in a future version.

// Determine the minimum number of updates, could be more if there
// are multiple updates per node
minUpdates := len(result.NodeUpdate)
minUpdates += len(result.NodeAllocation)

// Initialize the allocs request using the older log entry format
req.Alloc = make([]*structs.Allocation, 0, minUpdates)

for _, updateList := range result.NodeUpdate {
req.Alloc = append(req.Alloc, updateList...)
}
for _, allocList := range result.NodeAllocation {
req.Alloc = append(req.Alloc, allocList...)
}

for _, preemptions := range result.NodePreemptions {
req.NodePreemptions = append(req.NodePreemptions, preemptions...)
}

// Set the time the alloc was applied for the first time. This can be used
// to approximate the scheduling time.
updateAllocTimestamps(req.Alloc, now)

// Set modify time for preempted allocs if any
// Also gather jobids to create follow up evals
for _, alloc := range req.NodePreemptions {
alloc.ModifyTime = now
appendNamespacedJobID(preemptedJobIDs, alloc)
}
}

Expand Down Expand Up @@ -232,6 +261,39 @@ func (p *planner) applyPlan(plan *structs.Plan, result *structs.PlanResult, snap
return future, nil
}

arshjohar marked this conversation as resolved.
Show resolved Hide resolved
func normalizePreemptedAlloc(preemptedAlloc *structs.Allocation, now int64) *structs.Allocation {
arshjohar marked this conversation as resolved.
Show resolved Hide resolved
return &structs.Allocation{
ID: preemptedAlloc.ID,
PreemptedByAllocation: preemptedAlloc.PreemptedByAllocation,
ModifyTime: now,
}
}

func normalizeStoppedAlloc(stoppedAlloc *structs.Allocation, now int64) *structs.Allocation {
return &structs.Allocation{
ID: stoppedAlloc.ID,
DesiredDescription: stoppedAlloc.DesiredDescription,
ClientStatus: stoppedAlloc.ClientStatus,
ModifyTime: now,
}
}

func appendNamespacedJobID(jobIDs map[structs.NamespacedID]struct{}, alloc *structs.Allocation) {
id := structs.NamespacedID{Namespace: alloc.Namespace, ID: alloc.JobID}
if _, ok := jobIDs[id]; !ok {
jobIDs[id] = struct{}{}
}
}

func updateAllocTimestamps(allocations []*structs.Allocation, timestamp int64) {
for _, alloc := range allocations {
if alloc.CreateTime == 0 {
alloc.CreateTime = timestamp
}
alloc.ModifyTime = timestamp
}
}

// asyncPlanWait is used to apply and respond to a plan async
func (p *planner) asyncPlanWait(waitCh chan struct{}, future raft.ApplyFuture,
result *structs.PlanResult, pending *pendingPlan) {
Expand Down Expand Up @@ -264,6 +326,17 @@ func (p *planner) asyncPlanWait(waitCh chan struct{}, future raft.ApplyFuture,
func evaluatePlan(pool *EvaluatePool, snap *state.StateSnapshot, plan *structs.Plan, logger log.Logger) (*structs.PlanResult, error) {
defer metrics.MeasureSince([]string{"nomad", "plan", "evaluate"}, time.Now())

// Denormalize without the job
err := snap.DenormalizeAllocationsMap(plan.NodeUpdate, nil)
if err != nil {
return nil, err
}
// Denormalize without the job
err = snap.DenormalizeAllocationsMap(plan.NodePreemptions, nil)
if err != nil {
return nil, err
}

// Check if the plan exceeds quota
overQuota, err := evaluatePlanQuota(snap, plan)
if err != nil {
Expand Down Expand Up @@ -521,15 +594,11 @@ func evaluateNodePlan(snap *state.StateSnapshot, plan *structs.Plan, nodeID stri

// Remove any preempted allocs
if preempted := plan.NodePreemptions[nodeID]; len(preempted) > 0 {
for _, allocs := range preempted {
remove = append(remove, allocs)
}
remove = append(remove, preempted...)
}

if updated := plan.NodeAllocation[nodeID]; len(updated) > 0 {
for _, alloc := range updated {
remove = append(remove, alloc)
}
remove = append(remove, updated...)
}
proposed := structs.RemoveAllocs(existingAlloc, remove)
proposed = append(proposed, plan.NodeAllocation[nodeID]...)
Expand Down
Loading