Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scheduler: better diagnostics for unknown preemption reason #4055

Merged
merged 2 commits into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 59 additions & 23 deletions internal/scheduler/internaltypes/node.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package internaltypes

import (
"fmt"
"math"

"github.com/pkg/errors"
Expand Down Expand Up @@ -44,6 +45,8 @@ type Node struct {
// Total space allocatable on this node
totalResources ResourceList

unallocatableResources map[int32]ResourceList

// This field is set when inserting the Node into a NodeDb.
Keys [][]byte

Expand Down Expand Up @@ -92,6 +95,11 @@ func FromSchedulerObjectsNode(node *schedulerobjects.Node,
}
allocatableByPriority[EvictedPriority] = allocatableByPriority[minimumPriority]

unallocatableResources := map[int32]ResourceList{}
for p, u := range node.UnallocatableResources {
unallocatableResources[p] = resourceListFactory.FromJobResourceListIgnoreUnknown(u.Resources)
}

return CreateNode(
node.Id,
nodeType,
Expand All @@ -102,6 +110,7 @@ func FromSchedulerObjectsNode(node *schedulerobjects.Node,
taints,
labels,
resourceListFactory.FromNodeProto(totalResources.Resources),
unallocatableResources,
allocatableByPriority,
map[string]ResourceList{},
map[string]ResourceList{},
Expand All @@ -119,27 +128,29 @@ func CreateNode(
taints []v1.Taint,
labels map[string]string,
totalResources ResourceList,
unallocatableResources map[int32]ResourceList,
allocatableByPriority map[int32]ResourceList,
allocatedByQueue map[string]ResourceList,
allocatedByJobId map[string]ResourceList,
evictedJobRunIds map[string]bool,
keys [][]byte,
) *Node {
return &Node{
id: id,
nodeType: nodeType,
index: index,
executor: executor,
name: name,
pool: pool,
taints: koTaint.DeepCopyTaints(taints),
labels: deepCopyLabels(labels),
totalResources: totalResources,
AllocatableByPriority: maps.Clone(allocatableByPriority),
AllocatedByQueue: maps.Clone(allocatedByQueue),
AllocatedByJobId: maps.Clone(allocatedByJobId),
EvictedJobRunIds: evictedJobRunIds,
Keys: keys,
id: id,
nodeType: nodeType,
index: index,
executor: executor,
name: name,
pool: pool,
taints: koTaint.DeepCopyTaints(taints),
labels: deepCopyLabels(labels),
totalResources: totalResources,
unallocatableResources: maps.Clone(unallocatableResources),
AllocatableByPriority: maps.Clone(allocatableByPriority),
AllocatedByQueue: maps.Clone(allocatedByQueue),
AllocatedByJobId: maps.Clone(allocatedByJobId),
EvictedJobRunIds: evictedJobRunIds,
Keys: keys,
}
}

Expand Down Expand Up @@ -204,18 +215,23 @@ func (node *Node) GetTotalResources() ResourceList {
return node.totalResources
}

func (node *Node) GetUnallocatableResources() map[int32]ResourceList {
return maps.Clone(node.unallocatableResources)
}

func (node *Node) DeepCopyNilKeys() *Node {
return &Node{
// private fields are immutable so a shallow copy is fine
id: node.id,
index: node.index,
executor: node.executor,
name: node.name,
pool: node.pool,
nodeType: node.nodeType,
taints: node.taints,
labels: node.labels,
totalResources: node.totalResources,
id: node.id,
index: node.index,
executor: node.executor,
name: node.name,
pool: node.pool,
nodeType: node.nodeType,
taints: node.taints,
labels: node.labels,
totalResources: node.totalResources,
unallocatableResources: node.unallocatableResources,

// keys set to nil
Keys: nil,
Expand All @@ -228,6 +244,26 @@ func (node *Node) DeepCopyNilKeys() *Node {
}
}

func (node *Node) SummaryString() string {
if node == nil {
return ""
}

result := fmt.Sprintf("Id: %s\n", node.id)
result += fmt.Sprintf("Index: %d\n", node.index)
result += fmt.Sprintf("Executor: %s\n", node.executor)
result += fmt.Sprintf("Name: %s\n", node.name)
result += fmt.Sprintf("Pool: %s\n", node.pool)
result += fmt.Sprintf("TotalResources: %s\n", node.totalResources.String())
result += fmt.Sprintf("Labels: %v\n", node.labels)
result += fmt.Sprintf("Taints: %v\n", node.taints)
for p, u := range node.unallocatableResources {
result += fmt.Sprintf("Unallocatable at %d: %s\n", p, u.String())
}

return result
}

func deepCopyLabels(labels map[string]string) map[string]string {
result := make(map[string]string, len(labels))
for k, v := range labels {
Expand Down
10 changes: 10 additions & 0 deletions internal/scheduler/internaltypes/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,14 @@ func TestNode(t *testing.T) {
"memory": resource.MustParse("32Gi"),
},
)
unallocatableResources := map[int32]ResourceList{
1: resourceListFactory.FromJobResourceListIgnoreUnknown(
map[string]resource.Quantity{
"cpu": resource.MustParse("8"),
"memory": resource.MustParse("16Gi"),
},
),
}
allocatableByPriority := map[int32]ResourceList{
1: resourceListFactory.FromNodeProto(
map[string]resource.Quantity{
Expand Down Expand Up @@ -103,6 +111,7 @@ func TestNode(t *testing.T) {
taints,
labels,
totalResources,
unallocatableResources,
allocatableByPriority,
allocatedByQueue,
allocatedByJobId,
Expand All @@ -119,6 +128,7 @@ func TestNode(t *testing.T) {
assert.Equal(t, taints, node.GetTaints())
assert.Equal(t, labels, node.GetLabels())
assert.Equal(t, totalResources, node.GetTotalResources())
assert.Equal(t, unallocatableResources, node.GetUnallocatableResources())
assert.Equal(t, allocatableByPriority, node.AllocatableByPriority)
assert.Equal(t, allocatedByQueue, node.AllocatedByQueue)
assert.Equal(t, allocatedByJobId, node.AllocatedByJobId)
Expand Down
24 changes: 24 additions & 0 deletions internal/scheduler/internaltypes/testfixtures/testfixtures.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package testfixtures

import (
"github.com/armadaproject/armada/internal/scheduler/internaltypes"
)

func TestSimpleNode(id string) *internaltypes.Node {
return internaltypes.CreateNode(
id,
nil,
0,
"",
"",
"",
nil,
nil,
internaltypes.ResourceList{},
nil,
nil,
nil,
nil,
nil,
nil)
}
5 changes: 3 additions & 2 deletions internal/scheduler/nodedb/nodedb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/armadaproject/armada/internal/common/util"
schedulerconfig "github.com/armadaproject/armada/internal/scheduler/configuration"
"github.com/armadaproject/armada/internal/scheduler/internaltypes"
ittestfixtures "github.com/armadaproject/armada/internal/scheduler/internaltypes/testfixtures"
"github.com/armadaproject/armada/internal/scheduler/jobdb"
"github.com/armadaproject/armada/internal/scheduler/schedulerobjects"
"github.com/armadaproject/armada/internal/scheduler/scheduling/context"
Expand Down Expand Up @@ -81,7 +82,7 @@ func TestSelectNodeForPod_NodeIdLabel_Success(t *testing.T) {
jctxs := context.JobSchedulingContextsFromJobs(jobs)
for _, jctx := range jctxs {
txn := db.Txn(false)
jctx.SetAssignedNodeId(nodeId)
jctx.SetAssignedNode(ittestfixtures.TestSimpleNode(nodeId))
node, err := db.SelectNodeForJobWithTxn(txn, jctx)
txn.Abort()
require.NoError(t, err)
Expand All @@ -106,7 +107,7 @@ func TestSelectNodeForPod_NodeIdLabel_Failure(t *testing.T) {
jctxs := context.JobSchedulingContextsFromJobs(jobs)
for _, jctx := range jctxs {
txn := db.Txn(false)
jctx.SetAssignedNodeId("non-existent node")
jctx.SetAssignedNode(ittestfixtures.TestSimpleNode("non-existent node"))
node, err := db.SelectNodeForJobWithTxn(txn, jctx)
txn.Abort()
if !assert.NoError(t, err) {
Expand Down
1 change: 1 addition & 0 deletions internal/scheduler/nodedb/nodeidindex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func makeTestNode(id string) *internaltypes.Node {
[]v1.Taint{},
map[string]string{},
internaltypes.ResourceList{},
nil,
map[int32]internaltypes.ResourceList{},
map[string]internaltypes.ResourceList{},
map[string]internaltypes.ResourceList{},
Expand Down
2 changes: 2 additions & 0 deletions internal/scheduler/nodedb/nodematching_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -662,6 +662,7 @@ func makeTestNodeTaintsLabels(taints []v1.Taint, labels map[string]string) *inte
taints,
labels,
internaltypes.ResourceList{},
nil,
map[int32]internaltypes.ResourceList{},
map[string]internaltypes.ResourceList{},
map[string]internaltypes.ResourceList{},
Expand All @@ -685,6 +686,7 @@ func makeTestNodeResources(t *testing.T, allocatableByPriority map[int32]interna
[]v1.Taint{},
map[string]string{},
totalResources,
nil,
allocatableByPriority,
map[string]internaltypes.ResourceList{},
map[string]internaltypes.ResourceList{},
Expand Down
19 changes: 13 additions & 6 deletions internal/scheduler/scheduling/context/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ type JobSchedulingContext struct {
GangInfo
// This is the node the pod is assigned to.
// This is only set for evicted jobs and is set alongside adding an additionalNodeSelector for the node
AssignedNodeId string
AssignedNode *internaltypes.Node
// Id of job that preempted this pod
PreemptingJobId string
// Description of the cause of preemption
Expand Down Expand Up @@ -109,14 +109,21 @@ func (jctx *JobSchedulingContext) Fail(unschedulableReason string) {
}
}

func (jctx *JobSchedulingContext) GetAssignedNode() *internaltypes.Node {
return jctx.AssignedNode
}

func (jctx *JobSchedulingContext) GetAssignedNodeId() string {
return jctx.AssignedNodeId
if jctx.AssignedNode == nil {
return ""
}
return jctx.AssignedNode.GetId()
}

func (jctx *JobSchedulingContext) SetAssignedNodeId(assignedNodeId string) {
if assignedNodeId != "" {
jctx.AssignedNodeId = assignedNodeId
jctx.AddNodeSelector(schedulerconfig.NodeIdLabel, assignedNodeId)
func (jctx *JobSchedulingContext) SetAssignedNode(assignedNode *internaltypes.Node) {
if assignedNode != nil {
jctx.AssignedNode = assignedNode
jctx.AddNodeSelector(schedulerconfig.NodeIdLabel, assignedNode.GetId())
}
}

Expand Down
17 changes: 11 additions & 6 deletions internal/scheduler/scheduling/context/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,26 @@ import (
"github.com/stretchr/testify/assert"

"github.com/armadaproject/armada/internal/scheduler/configuration"
ittestfixtures "github.com/armadaproject/armada/internal/scheduler/internaltypes/testfixtures"
"github.com/armadaproject/armada/internal/scheduler/testfixtures"
)

func TestJobSchedulingContext_SetAssignedNodeId(t *testing.T) {
func TestJobSchedulingContext_SetAssignedNode(t *testing.T) {
jctx := &JobSchedulingContext{}

assert.Equal(t, "", jctx.GetAssignedNodeId())
assert.Nil(t, jctx.GetAssignedNode())
assert.Empty(t, jctx.GetAssignedNodeId())
assert.Empty(t, jctx.AdditionalNodeSelectors)

// Will not add a node selector if input is empty
jctx.SetAssignedNodeId("")
assert.Equal(t, "", jctx.GetAssignedNodeId())
// Will not add a node selector if input is nil
jctx.SetAssignedNode(nil)
assert.Nil(t, jctx.GetAssignedNode())
assert.Empty(t, jctx.GetAssignedNodeId())
assert.Empty(t, jctx.AdditionalNodeSelectors)

jctx.SetAssignedNodeId("node1")
n := ittestfixtures.TestSimpleNode("node1")
jctx.SetAssignedNode(n)
assert.Equal(t, n, jctx.GetAssignedNode())
assert.Equal(t, "node1", jctx.GetAssignedNodeId())
assert.Len(t, jctx.AdditionalNodeSelectors, 1)
assert.Equal(t, map[string]string{configuration.NodeIdLabel: "node1"}, jctx.AdditionalNodeSelectors)
Expand Down
2 changes: 1 addition & 1 deletion internal/scheduler/scheduling/eviction.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func (evi *Evictor) Evict(ctx *armadacontext.Context, nodeDbTxn *memdb.Txn) (*Ev
// TODO(albin): We can remove the checkOnlyDynamicRequirements flag in the nodeDb now that we've added the tolerations.
jctx := schedulercontext.JobSchedulingContextFromJob(job)
jctx.IsEvicted = true
jctx.SetAssignedNodeId(node.GetId())
jctx.SetAssignedNode(node)
evictedJctxsByJobId[job.Id()] = jctx
jctx.AdditionalTolerations = append(jctx.AdditionalTolerations, node.GetTolerationsForTaints()...)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2507,6 +2507,7 @@ func testNodeWithTaints(node *internaltypes.Node, taints []v1.Taint) *internalty
taints,
node.GetLabels(),
node.GetTotalResources(),
nil,
node.AllocatableByPriority,
node.AllocatedByQueue,
node.AllocatedByJobId,
Expand Down
4 changes: 2 additions & 2 deletions internal/scheduler/scheduling/preemption_description.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
)

const (
unknownPreemptionCause = "Preempted by scheduler due to the job failing to reschedule - possibly node resource changed causing this job to be unschedulable"
unknownPreemptionCause = "Preempted by scheduler due to the job failing to reschedule - possibly node resource changed causing this job to be unschedulable\nNode Summary:\n%s"
unknownGangPreemptionCause = "Preempted by scheduler due to the job failing to reschedule - possibly another job in the gang was preempted or the node resource changed causing this job to be unschedulable"
fairSharePreemptionTemplate = "Preempted by scheduler using fair share preemption - preempting job %s"
urgencyPreemptionTemplate = "Preempted by scheduler using urgency preemption - preempting job %s"
Expand Down Expand Up @@ -45,7 +45,7 @@ func PopulatePreemptionDescriptions(preemptedJobs []*context.JobSchedulingContex
if isGang {
preemptedJctx.PreemptionDescription = fmt.Sprintf(unknownGangPreemptionCause)
} else {
preemptedJctx.PreemptionDescription = fmt.Sprintf(unknownPreemptionCause)
preemptedJctx.PreemptionDescription = fmt.Sprintf(unknownPreemptionCause, preemptedJctx.GetAssignedNode().SummaryString())
}
} else if len(potentialPreemptingJobs) == 1 {
preemptedJctx.PreemptionDescription = fmt.Sprintf(urgencyPreemptionTemplate, potentialPreemptingJobs[0].JobId)
Expand Down
Loading