From 49fdefc8a7b9f440176ee8695c577740e79f0c00 Mon Sep 17 00:00:00 2001 From: Robert Smith <robertdavidsmith@yahoo.com> Date: Thu, 2 Jan 2025 17:16:21 +0000 Subject: [PATCH 1/5] Scheduler: refactor: move scheduling tests to internaltypes Signed-off-by: Robert Smith <robertdavidsmith@yahoo.com> --- internal/scheduler/nodedb/nodedb_test.go | 6 +- .../scheduler/nodedb/nodeiteration_test.go | 169 ++++++-------- .../scheduling/fairness/fairness_test.go | 218 +++++------------- .../scheduling/gang_scheduler_test.go | 153 ++++++------ .../preempting_queue_scheduler_test.go | 194 +++++++--------- .../scheduling/queue_scheduler_test.go | 118 +++++----- .../scheduler/testfixtures/testfixtures.go | 27 +++ 7 files changed, 372 insertions(+), 513 deletions(-) diff --git a/internal/scheduler/nodedb/nodedb_test.go b/internal/scheduler/nodedb/nodedb_test.go index 5e87f425876..84eb691f41b 100644 --- a/internal/scheduler/nodedb/nodedb_test.go +++ b/internal/scheduler/nodedb/nodedb_test.go @@ -833,7 +833,7 @@ func BenchmarkScheduleMany100CpuNodes1CpuUnused(b *testing.B) { b, testfixtures.ItWithUsedResourcesNodes( 0, - cpu("31"), + testfixtures.Cpu("31"), testfixtures.ItN32CpuNodes(100, testfixtures.TestPriorities), ), testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 100), @@ -845,7 +845,7 @@ func BenchmarkScheduleMany1000CpuNodes1CpuUnused(b *testing.B) { b, testfixtures.ItWithUsedResourcesNodes( 0, - cpu("31"), + testfixtures.Cpu("31"), testfixtures.ItN32CpuNodes(1000, testfixtures.TestPriorities), ), testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 1000), @@ -857,7 +857,7 @@ func BenchmarkScheduleMany10000CpuNodes1CpuUnused(b *testing.B) { b, testfixtures.ItWithUsedResourcesNodes( 0, - cpu("31"), + testfixtures.Cpu("31"), testfixtures.ItN32CpuNodes(10000, testfixtures.TestPriorities), ), testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 10000), diff --git a/internal/scheduler/nodedb/nodeiteration_test.go b/internal/scheduler/nodedb/nodeiteration_test.go index aa699293139..27f9d901329 100644 --- a/internal/scheduler/nodedb/nodeiteration_test.go +++ b/internal/scheduler/nodedb/nodeiteration_test.go @@ -112,24 +112,24 @@ func TestNodeTypeIterator(t *testing.T) { armadaslices.Concatenate( testfixtures.ItWithUsedResourcesNodes( 0, - cpu("15"), + testfixtures.Cpu("15"), testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), testfixtures.ItWithUsedResourcesNodes( 0, - cpu("16"), + testfixtures.Cpu("16"), testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), testfixtures.ItWithUsedResourcesNodes( 0, - cpu("17"), + testfixtures.Cpu("17"), testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), ), ), nodeTypeId: nodeTypeA.GetId(), priority: 0, - resourceRequests: cpu("16"), + resourceRequests: testfixtures.Cpu("16"), expected: []int{1, 0}, }, "filter nodes with insufficient resources at priority and return in increasing order": { @@ -138,54 +138,54 @@ func TestNodeTypeIterator(t *testing.T) { armadaslices.Concatenate( testfixtures.ItWithUsedResourcesNodes( 0, - cpu("15"), + testfixtures.Cpu("15"), testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), testfixtures.ItWithUsedResourcesNodes( 0, - cpu("16"), + testfixtures.Cpu("16"), testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), testfixtures.ItWithUsedResourcesNodes( 0, - cpu("17"), + testfixtures.Cpu("17"), testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), testfixtures.ItWithUsedResourcesNodes( 1, - cpu("15"), + testfixtures.Cpu("15"), testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), testfixtures.ItWithUsedResourcesNodes( 1, - cpu("16"), + testfixtures.Cpu("16"), testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), testfixtures.ItWithUsedResourcesNodes( 1, - cpu("17"), + testfixtures.Cpu("17"), testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), testfixtures.ItWithUsedResourcesNodes( 2, - cpu("15"), + testfixtures.Cpu("15"), testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), testfixtures.ItWithUsedResourcesNodes( 2, - cpu("16"), + testfixtures.Cpu("16"), testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), testfixtures.ItWithUsedResourcesNodes( 2, - cpu("17"), + testfixtures.Cpu("17"), testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), ), ), nodeTypeId: nodeTypeA.GetId(), priority: 1, - resourceRequests: cpu("16"), + resourceRequests: testfixtures.Cpu("16"), expected: []int{4, 7, 3, 6, 0, 1, 2}, }, "nested ordering": { @@ -194,54 +194,54 @@ func TestNodeTypeIterator(t *testing.T) { armadaslices.Concatenate( testfixtures.ItWithUsedResourcesNodes( 0, - cpuMem("15", "1Gi"), + testfixtures.CpuMem("15", "1Gi"), testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), testfixtures.ItWithUsedResourcesNodes( 0, - cpuMem("15", "2Gi"), + testfixtures.CpuMem("15", "2Gi"), testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), testfixtures.ItWithUsedResourcesNodes( 0, - cpuMem("15", "129Gi"), + testfixtures.CpuMem("15", "129Gi"), testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), testfixtures.ItWithUsedResourcesNodes( 0, - cpuMem("15", "130Gi"), + testfixtures.CpuMem("15", "130Gi"), testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), testfixtures.ItWithUsedResourcesNodes( 0, - cpuMem("15", "131Gi"), + testfixtures.CpuMem("15", "131Gi"), testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), testfixtures.ItWithUsedResourcesNodes( 0, - cpuMem("16", "130Gi"), + testfixtures.CpuMem("16", "130Gi"), testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), testfixtures.ItWithUsedResourcesNodes( 0, - cpuMem("16", "128Gi"), + testfixtures.CpuMem("16", "128Gi"), testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), testfixtures.ItWithUsedResourcesNodes( 0, - cpuMem("16", "129Gi"), + testfixtures.CpuMem("16", "129Gi"), testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), testfixtures.ItWithUsedResourcesNodes( 0, - cpu("17"), + testfixtures.Cpu("17"), testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), ), ), nodeTypeId: nodeTypeA.GetId(), priority: 0, - resourceRequests: cpuMem("16", "128Gi"), + resourceRequests: testfixtures.CpuMem("16", "128Gi"), expected: []int{6, 1, 0}, }, "double-nested ordering": { @@ -250,59 +250,59 @@ func TestNodeTypeIterator(t *testing.T) { armadaslices.Concatenate( testfixtures.ItWithUsedResourcesNodes( 0, - cpuMem("31", "1Gi"), + testfixtures.CpuMem("31", "1Gi"), testfixtures.ItN8GpuNodes(1, testfixtures.TestPriorities), ), testfixtures.ItWithUsedResourcesNodes( 0, - cpuMemGpu("31", "1Gi", "1"), + testfixtures.CpuMemGpu("31", "1Gi", "1"), testfixtures.ItN8GpuNodes(1, testfixtures.TestPriorities), ), testfixtures.ItWithUsedResourcesNodes( 0, - cpuMemGpu("31", "1Gi", "2"), + testfixtures.CpuMemGpu("31", "1Gi", "2"), testfixtures.ItN8GpuNodes(1, testfixtures.TestPriorities), ), testfixtures.ItWithUsedResourcesNodes( 0, - cpuMemGpu("31", "1Gi", "5"), + testfixtures.CpuMemGpu("31", "1Gi", "5"), testfixtures.ItN8GpuNodes(1, testfixtures.TestPriorities), ), testfixtures.ItWithUsedResourcesNodes( 0, - cpuMem("31", "2Gi"), + testfixtures.CpuMem("31", "2Gi"), testfixtures.ItN8GpuNodes(1, testfixtures.TestPriorities), ), testfixtures.ItWithUsedResourcesNodes( 0, - cpuMemGpu("31", "2Gi", "1"), + testfixtures.CpuMemGpu("31", "2Gi", "1"), testfixtures.ItN8GpuNodes(1, testfixtures.TestPriorities), ), testfixtures.ItWithUsedResourcesNodes( 0, - cpuMem("32", "514Gi"), + testfixtures.CpuMem("32", "514Gi"), testfixtures.ItN8GpuNodes(1, testfixtures.TestPriorities), ), testfixtures.ItWithUsedResourcesNodes( 0, - cpuMem("32", "512Gi"), + testfixtures.CpuMem("32", "512Gi"), testfixtures.ItN8GpuNodes(1, testfixtures.TestPriorities), ), testfixtures.ItWithUsedResourcesNodes( 0, - cpuMem("32", "513Gi"), + testfixtures.CpuMem("32", "513Gi"), testfixtures.ItN8GpuNodes(1, testfixtures.TestPriorities), ), testfixtures.ItWithUsedResourcesNodes( 0, - cpu("33"), + testfixtures.Cpu("33"), testfixtures.ItN8GpuNodes(1, testfixtures.TestPriorities), ), ), ), nodeTypeId: nodeTypeA.GetId(), priority: 0, - resourceRequests: cpuMemGpu("32", "512Gi", "4"), + resourceRequests: testfixtures.CpuMemGpu("32", "512Gi", "4"), expected: []int{7, 5, 4, 2, 1, 0}, }, } @@ -414,7 +414,7 @@ func TestNodeTypesIterator(t *testing.T) { testfixtures.ItWithNodeTypeNodes( nodeTypeA, testfixtures.ItWithUsedResourcesNodes(0, - cpu("15"), + testfixtures.Cpu("15"), testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), ), @@ -422,7 +422,7 @@ func TestNodeTypesIterator(t *testing.T) { nodeTypeB, testfixtures.ItWithUsedResourcesNodes( 0, - cpu("16"), + testfixtures.Cpu("16"), testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), ), @@ -430,7 +430,7 @@ func TestNodeTypesIterator(t *testing.T) { nodeTypeC, testfixtures.ItWithUsedResourcesNodes( 0, - cpu("17"), + testfixtures.Cpu("17"), testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), ), @@ -438,14 +438,14 @@ func TestNodeTypesIterator(t *testing.T) { nodeTypeD, testfixtures.ItWithUsedResourcesNodes( 0, - cpu("14"), + testfixtures.Cpu("14"), testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), ), ), nodeTypeIds: []uint64{nodeTypeA.GetId(), nodeTypeB.GetId(), nodeTypeC.GetId()}, priority: 0, - resourceRequests: cpu("16"), + resourceRequests: testfixtures.Cpu("16"), expected: []int{1, 0}, }, "filter nodes with insufficient resources at priority and return in increasing order": { @@ -454,54 +454,54 @@ func TestNodeTypesIterator(t *testing.T) { armadaslices.Concatenate( testfixtures.ItWithUsedResourcesNodes( 0, - cpu("15"), + testfixtures.Cpu("15"), testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), testfixtures.ItWithUsedResourcesNodes( 0, - cpu("16"), + testfixtures.Cpu("16"), testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), testfixtures.ItWithUsedResourcesNodes( 0, - cpu("17"), + testfixtures.Cpu("17"), testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), testfixtures.ItWithUsedResourcesNodes( 1, - cpu("15"), + testfixtures.Cpu("15"), testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), testfixtures.ItWithUsedResourcesNodes( 1, - cpu("16"), + testfixtures.Cpu("16"), testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), testfixtures.ItWithUsedResourcesNodes( 1, - cpu("17"), + testfixtures.Cpu("17"), testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), testfixtures.ItWithUsedResourcesNodes( 2, - cpu("15"), + testfixtures.Cpu("15"), testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), testfixtures.ItWithUsedResourcesNodes( 2, - cpu("16"), + testfixtures.Cpu("16"), testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), testfixtures.ItWithUsedResourcesNodes( 2, - cpu("17"), + testfixtures.Cpu("17"), testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), ), ), nodeTypeIds: []uint64{nodeTypeA.GetId()}, priority: 1, - resourceRequests: cpu("16"), + resourceRequests: testfixtures.Cpu("16"), expected: []int{4, 7, 3, 6, 0, 1, 2}, }, "nested ordering": { @@ -510,54 +510,54 @@ func TestNodeTypesIterator(t *testing.T) { armadaslices.Concatenate( testfixtures.ItWithUsedResourcesNodes( 0, - cpuMem("15", "1Gi"), + testfixtures.CpuMem("15", "1Gi"), testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), testfixtures.ItWithUsedResourcesNodes( 0, - cpuMem("15", "2Gi"), + testfixtures.CpuMem("15", "2Gi"), testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), testfixtures.ItWithUsedResourcesNodes( 0, - cpuMem("15", "129Gi"), + testfixtures.CpuMem("15", "129Gi"), testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), testfixtures.ItWithUsedResourcesNodes( 0, - cpuMem("15", "130Gi"), + testfixtures.CpuMem("15", "130Gi"), testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), testfixtures.ItWithUsedResourcesNodes( 0, - cpuMem("15", "131Gi"), + testfixtures.CpuMem("15", "131Gi"), testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), testfixtures.ItWithUsedResourcesNodes( 0, - cpuMem("16", "130Gi"), + testfixtures.CpuMem("16", "130Gi"), testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), testfixtures.ItWithUsedResourcesNodes( 0, - cpuMem("16", "128Gi"), + testfixtures.CpuMem("16", "128Gi"), testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), testfixtures.ItWithUsedResourcesNodes( 0, - cpuMem("16", "129Gi"), + testfixtures.CpuMem("16", "129Gi"), testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), testfixtures.ItWithUsedResourcesNodes( 0, - cpu("17"), + testfixtures.Cpu("17"), testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), ), ), nodeTypeIds: []uint64{nodeTypeA.GetId()}, priority: 0, - resourceRequests: cpuMem("16", "128Gi"), + resourceRequests: testfixtures.CpuMem("16", "128Gi"), expected: []int{6, 1, 0}, }, "double-nested ordering": { @@ -567,22 +567,22 @@ func TestNodeTypesIterator(t *testing.T) { armadaslices.Concatenate( testfixtures.ItWithUsedResourcesNodes( 0, - cpuMem("31", "1Gi"), + testfixtures.CpuMem("31", "1Gi"), testfixtures.ItN8GpuNodes(1, testfixtures.TestPriorities), ), testfixtures.ItWithUsedResourcesNodes( 0, - cpuMemGpu("31", "1Gi", "1"), + testfixtures.CpuMemGpu("31", "1Gi", "1"), testfixtures.ItN8GpuNodes(1, testfixtures.TestPriorities), ), testfixtures.ItWithUsedResourcesNodes( 0, - cpuMemGpu("31", "1Gi", "2"), + testfixtures.CpuMemGpu("31", "1Gi", "2"), testfixtures.ItN8GpuNodes(1, testfixtures.TestPriorities), ), testfixtures.ItWithUsedResourcesNodes( 0, - cpuMemGpu("31", "1Gi", "5"), + testfixtures.CpuMemGpu("31", "1Gi", "5"), testfixtures.ItN8GpuNodes(1, testfixtures.TestPriorities), ), ), @@ -592,22 +592,22 @@ func TestNodeTypesIterator(t *testing.T) { armadaslices.Concatenate( testfixtures.ItWithUsedResourcesNodes( 0, - cpuMem("31", "2Gi"), + testfixtures.CpuMem("31", "2Gi"), testfixtures.ItN8GpuNodes(1, testfixtures.TestPriorities), ), testfixtures.ItWithUsedResourcesNodes( 0, - cpuMemGpu("31", "2Gi", "1"), + testfixtures.CpuMemGpu("31", "2Gi", "1"), testfixtures.ItN8GpuNodes(1, testfixtures.TestPriorities), ), testfixtures.ItWithUsedResourcesNodes( 0, - cpuMem("32", "514Gi"), + testfixtures.CpuMem("32", "514Gi"), testfixtures.ItN8GpuNodes(1, testfixtures.TestPriorities), ), testfixtures.ItWithUsedResourcesNodes( 0, - cpuMem("32", "512Gi"), + testfixtures.CpuMem("32", "512Gi"), testfixtures.ItN8GpuNodes(1, testfixtures.TestPriorities), ), ), @@ -617,12 +617,12 @@ func TestNodeTypesIterator(t *testing.T) { armadaslices.Concatenate( testfixtures.ItWithUsedResourcesNodes( 0, - cpuMem("32", "513Gi"), + testfixtures.CpuMem("32", "513Gi"), testfixtures.ItN8GpuNodes(1, testfixtures.TestPriorities), ), testfixtures.ItWithUsedResourcesNodes( 0, - cpu("33"), + testfixtures.Cpu("33"), testfixtures.ItN8GpuNodes(1, testfixtures.TestPriorities), ), ), @@ -630,7 +630,7 @@ func TestNodeTypesIterator(t *testing.T) { ), nodeTypeIds: []uint64{nodeTypeA.GetId(), nodeTypeB.GetId(), nodeTypeC.GetId()}, priority: 0, - resourceRequests: cpuMemGpu("32", "512Gi", "4"), + resourceRequests: testfixtures.CpuMemGpu("32", "512Gi", "4"), expected: []int{7, 5, 4, 2, 1, 0}, }, } @@ -759,30 +759,3 @@ func labelsToNodeType(labels map[string]string) *internaltypes.NodeType { ) return nodeType } - -func cpu(cpu string) internaltypes.ResourceList { - return testfixtures.TestResourceListFactory.FromNodeProto( - map[string]resource.Quantity{ - "cpu": resource.MustParse(cpu), - }, - ) -} - -func cpuMem(cpu string, memory string) internaltypes.ResourceList { - return testfixtures.TestResourceListFactory.FromNodeProto( - map[string]resource.Quantity{ - "cpu": resource.MustParse(cpu), - "memory": resource.MustParse(memory), - }, - ) -} - -func cpuMemGpu(cpu string, memory string, gpu string) internaltypes.ResourceList { - return testfixtures.TestResourceListFactory.FromNodeProto( - map[string]resource.Quantity{ - "cpu": resource.MustParse(cpu), - "memory": resource.MustParse(memory), - "nvidia.com/gpu": resource.MustParse(gpu), - }, - ) -} diff --git a/internal/scheduler/scheduling/fairness/fairness_test.go b/internal/scheduler/scheduling/fairness/fairness_test.go index 16cc4b4554e..52a9a86584a 100644 --- a/internal/scheduler/scheduling/fairness/fairness_test.go +++ b/internal/scheduler/scheduling/fairness/fairness_test.go @@ -9,7 +9,6 @@ import ( "github.com/armadaproject/armada/internal/scheduler/configuration" "github.com/armadaproject/armada/internal/scheduler/internaltypes" - "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" ) type MinimalQueue struct { @@ -28,201 +27,100 @@ func (q MinimalQueue) GetWeight() float64 { func TestNewDominantResourceFairness(t *testing.T) { rlFactory := makeTestResourceListFactory() _, err := NewDominantResourceFairness( - rlFactory.FromNodeProto(map[string]resource.Quantity{ - "foo": resource.MustParse("1"), - }, - ), + fooBarBaz(rlFactory, "1", "0", "0"), configuration.SchedulingConfig{DominantResourceFairnessResourcesToConsider: []string{}}, ) require.Error(t, err) } func TestDominantResourceFairness(t *testing.T) { + rlFactory := makeTestResourceListFactory() + tests := map[string]struct { - totalResources schedulerobjects.ResourceList + totalResources internaltypes.ResourceList config configuration.SchedulingConfig - allocation schedulerobjects.ResourceList + allocation internaltypes.ResourceList weight float64 expectedCost float64 }{ "single resource 1": { - totalResources: schedulerobjects.ResourceList{ - Resources: map[string]resource.Quantity{ - "foo": resource.MustParse("1"), - "bar": resource.MustParse("2"), - "baz": resource.MustParse("3"), - }, - }, - config: configuration.SchedulingConfig{DominantResourceFairnessResourcesToConsider: []string{"foo", "bar"}}, - allocation: schedulerobjects.ResourceList{ - Resources: map[string]resource.Quantity{ - "foo": resource.MustParse("0.5"), - }, - }, - weight: 1.0, - expectedCost: 0.5, + totalResources: fooBarBaz(rlFactory, "1", "2", "3"), + config: configuration.SchedulingConfig{DominantResourceFairnessResourcesToConsider: []string{"foo", "bar"}}, + allocation: fooBarBaz(rlFactory, "0.5", "0", "0"), + weight: 1.0, + expectedCost: 0.5, }, "single resource 2": { - totalResources: schedulerobjects.ResourceList{ - Resources: map[string]resource.Quantity{ - "foo": resource.MustParse("1"), - "bar": resource.MustParse("2"), - "baz": resource.MustParse("3"), - }, - }, - config: configuration.SchedulingConfig{DominantResourceFairnessResourcesToConsider: []string{"foo", "bar"}}, - allocation: schedulerobjects.ResourceList{ - Resources: map[string]resource.Quantity{ - "bar": resource.MustParse("0.5"), - }, - }, - weight: 1.0, - expectedCost: 0.25, + totalResources: fooBarBaz(rlFactory, "1", "2", "3"), + config: configuration.SchedulingConfig{DominantResourceFairnessResourcesToConsider: []string{"foo", "bar"}}, + allocation: fooBarBaz(rlFactory, "0", "0.5", "0"), + weight: 1.0, + expectedCost: 0.25, }, "multiple resources": { - totalResources: schedulerobjects.ResourceList{ - Resources: map[string]resource.Quantity{ - "foo": resource.MustParse("1"), - "bar": resource.MustParse("2"), - "baz": resource.MustParse("3"), - }, - }, - config: configuration.SchedulingConfig{DominantResourceFairnessResourcesToConsider: []string{"foo", "bar"}}, - allocation: schedulerobjects.ResourceList{ - Resources: map[string]resource.Quantity{ - "foo": resource.MustParse("0.5"), - "bar": resource.MustParse("1.1"), - }, - }, - weight: 1.0, - expectedCost: 1.1 / 2, + totalResources: fooBarBaz(rlFactory, "1", "2", "3"), + config: configuration.SchedulingConfig{DominantResourceFairnessResourcesToConsider: []string{"foo", "bar"}}, + allocation: fooBarBaz(rlFactory, "0.5", "1.1", "0"), + weight: 1.0, + expectedCost: 1.1 / 2, }, "considered resources": { - totalResources: schedulerobjects.ResourceList{ - Resources: map[string]resource.Quantity{ - "foo": resource.MustParse("1"), - "bar": resource.MustParse("2"), - "baz": resource.MustParse("3"), - }, - }, - config: configuration.SchedulingConfig{DominantResourceFairnessResourcesToConsider: []string{"foo", "bar"}}, - allocation: schedulerobjects.ResourceList{ - Resources: map[string]resource.Quantity{ - "foo": resource.MustParse("0.5"), - "baz": resource.MustParse("3"), - }, - }, - weight: 1.0, - expectedCost: 0.5, + totalResources: fooBarBaz(rlFactory, "1", "2", "3"), + config: configuration.SchedulingConfig{DominantResourceFairnessResourcesToConsider: []string{"foo", "bar"}}, + allocation: fooBarBaz(rlFactory, "0.5", "0", "3"), + weight: 1.0, + expectedCost: 0.5, }, "zero available resource": { - totalResources: schedulerobjects.ResourceList{ - Resources: map[string]resource.Quantity{ - "foo": resource.MustParse("1"), - "bar": resource.MustParse("0"), - "baz": resource.MustParse("3"), - }, - }, - config: configuration.SchedulingConfig{DominantResourceFairnessResourcesToConsider: []string{"foo", "bar"}}, - allocation: schedulerobjects.ResourceList{ - Resources: map[string]resource.Quantity{ - "foo": resource.MustParse("0.5"), - "bar": resource.MustParse("2.0"), - }, - }, - weight: 1.0, - expectedCost: 0.5, + totalResources: fooBarBaz(rlFactory, "1", "0", "3"), + config: configuration.SchedulingConfig{DominantResourceFairnessResourcesToConsider: []string{"foo", "bar"}}, + allocation: fooBarBaz(rlFactory, "0.5", "2.0", "0"), + weight: 1.0, + expectedCost: 0.5, }, "weight": { - totalResources: schedulerobjects.ResourceList{ - Resources: map[string]resource.Quantity{ - "foo": resource.MustParse("1"), - "bar": resource.MustParse("2"), - "baz": resource.MustParse("3"), - }, - }, - config: configuration.SchedulingConfig{DominantResourceFairnessResourcesToConsider: []string{"foo", "bar"}}, - allocation: schedulerobjects.ResourceList{ - Resources: map[string]resource.Quantity{ - "foo": resource.MustParse("0.5"), - }, - }, - weight: 2.0, - expectedCost: 0.25, + totalResources: fooBarBaz(rlFactory, "1", "2", "3"), + config: configuration.SchedulingConfig{DominantResourceFairnessResourcesToConsider: []string{"foo", "bar"}}, + allocation: fooBarBaz(rlFactory, "0.5", "0", "0"), + weight: 2.0, + expectedCost: 0.25, }, "experimental config": { - totalResources: schedulerobjects.ResourceList{ - Resources: map[string]resource.Quantity{ - "foo": resource.MustParse("1"), - "bar": resource.MustParse("2"), - "baz": resource.MustParse("3"), - }, - }, - config: configuration.SchedulingConfig{ExperimentalDominantResourceFairnessResourcesToConsider: []configuration.DominantResourceFairnessResource{{"foo", 1}, {"bar", 1}}}, - allocation: schedulerobjects.ResourceList{ - Resources: map[string]resource.Quantity{ - "foo": resource.MustParse("0.5"), - "bar": resource.MustParse("1.1"), - }, - }, - weight: 1.0, - expectedCost: 1.1 / 2, + totalResources: fooBarBaz(rlFactory, "1", "2", "3"), + config: configuration.SchedulingConfig{ExperimentalDominantResourceFairnessResourcesToConsider: []configuration.DominantResourceFairnessResource{{"foo", 1}, {"bar", 1}}}, + allocation: fooBarBaz(rlFactory, "0.5", "1.1", "0"), + weight: 1.0, + expectedCost: 1.1 / 2, }, "experimental config defaults multipliers to one": { - totalResources: schedulerobjects.ResourceList{ - Resources: map[string]resource.Quantity{ - "foo": resource.MustParse("1"), - "bar": resource.MustParse("2"), - "baz": resource.MustParse("3"), - }, - }, - config: configuration.SchedulingConfig{ExperimentalDominantResourceFairnessResourcesToConsider: []configuration.DominantResourceFairnessResource{{"foo", 0}, {"bar", 0}}}, - allocation: schedulerobjects.ResourceList{ - Resources: map[string]resource.Quantity{ - "foo": resource.MustParse("0.5"), - "bar": resource.MustParse("1.1"), - }, - }, - weight: 1.0, - expectedCost: 1.1 / 2, + totalResources: fooBarBaz(rlFactory, "1", "2", "3"), + config: configuration.SchedulingConfig{ExperimentalDominantResourceFairnessResourcesToConsider: []configuration.DominantResourceFairnessResource{{"foo", 0}, {"bar", 0}}}, + allocation: fooBarBaz(rlFactory, "0.5", "1.1", "0"), + weight: 1.0, + expectedCost: 1.1 / 2, }, "experimental config non-unit multiplier": { - totalResources: schedulerobjects.ResourceList{ - Resources: map[string]resource.Quantity{ - "foo": resource.MustParse("1"), - "bar": resource.MustParse("2"), - "baz": resource.MustParse("3"), - }, - }, - config: configuration.SchedulingConfig{ExperimentalDominantResourceFairnessResourcesToConsider: []configuration.DominantResourceFairnessResource{{"foo", 4}, {"bar", 1}}}, - allocation: schedulerobjects.ResourceList{ - Resources: map[string]resource.Quantity{ - "foo": resource.MustParse("0.5"), - "bar": resource.MustParse("1.1"), - }, - }, - weight: 1.0, - expectedCost: 2, + totalResources: fooBarBaz(rlFactory, "1", "2", "3"), + config: configuration.SchedulingConfig{ExperimentalDominantResourceFairnessResourcesToConsider: []configuration.DominantResourceFairnessResource{{"foo", 4}, {"bar", 1}}}, + allocation: fooBarBaz(rlFactory, "0.5", "1.1", "0"), + weight: 1.0, + expectedCost: 2, }, } - rlFactory := makeTestResourceListFactory() - for name, tc := range tests { t.Run(name, func(t *testing.T) { - totalResources := rlFactory.FromNodeProto(tc.totalResources.Resources) - allocation := rlFactory.FromJobResourceListIgnoreUnknown(tc.allocation.Resources) - f, err := NewDominantResourceFairness(totalResources, tc.config) + f, err := NewDominantResourceFairness(tc.totalResources, tc.config) require.NoError(t, err) assert.Equal( t, tc.expectedCost, - f.WeightedCostFromAllocation(allocation, tc.weight), + f.WeightedCostFromAllocation(tc.allocation, tc.weight), ) assert.Equal( t, - f.WeightedCostFromAllocation(allocation, tc.weight), - f.WeightedCostFromQueue(MinimalQueue{allocation: allocation, weight: tc.weight}), + f.WeightedCostFromAllocation(tc.allocation, tc.weight), + f.WeightedCostFromQueue(MinimalQueue{allocation: tc.allocation, weight: tc.weight}), ) }) } @@ -243,3 +141,11 @@ func makeTestResourceListFactory() *internaltypes.ResourceListFactory { } return rlFactory } + +func fooBarBaz(factory *internaltypes.ResourceListFactory, foo, bar, baz string) internaltypes.ResourceList { + return factory.FromNodeProto(map[string]resource.Quantity{ + "foo": resource.MustParse(foo), + "bar": resource.MustParse(bar), + "baz": resource.MustParse(baz), + }) +} diff --git a/internal/scheduler/scheduling/gang_scheduler_test.go b/internal/scheduler/scheduling/gang_scheduler_test.go index d1626839282..8c075e2c956 100644 --- a/internal/scheduler/scheduling/gang_scheduler_test.go +++ b/internal/scheduler/scheduling/gang_scheduler_test.go @@ -33,10 +33,10 @@ func TestGangScheduler(t *testing.T) { tests := map[string]struct { SchedulingConfig configuration.SchedulingConfig // Nodes to be considered by the scheduler. - Nodes []*schedulerobjects.Node + Nodes []*internaltypes.Node // Total resources across all clusters. // Set to the total resources across all nodes if not provided. - TotalResources schedulerobjects.ResourceList + TotalResources internaltypes.ResourceList // Gangs to try scheduling. Gangs [][]*jobdb.Job // Indices of gangs expected to be scheduled. @@ -54,7 +54,7 @@ func TestGangScheduler(t *testing.T) { }{ "simple success": { SchedulingConfig: testfixtures.TestSchedulingConfig(), - Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), Gangs: [][]*jobdb.Job{ testfixtures.WithGangAnnotationsJobs(testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 32)), }, @@ -65,7 +65,7 @@ func TestGangScheduler(t *testing.T) { }, "simple failure": { SchedulingConfig: testfixtures.TestSchedulingConfig(), - Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), Gangs: [][]*jobdb.Job{ testfixtures.WithGangAnnotationsJobs(testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 33)), }, @@ -76,7 +76,7 @@ func TestGangScheduler(t *testing.T) { }, "one success and one failure": { SchedulingConfig: testfixtures.TestSchedulingConfig(), - Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), Gangs: [][]*jobdb.Job{ testfixtures.WithGangAnnotationsJobs(testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 32)), testfixtures.WithGangAnnotationsJobs(testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 1)), @@ -88,7 +88,7 @@ func TestGangScheduler(t *testing.T) { }, "multiple nodes": { SchedulingConfig: testfixtures.TestSchedulingConfig(), - Nodes: testfixtures.N32CpuNodes(2, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(2, testfixtures.TestPriorities), Gangs: [][]*jobdb.Job{ testfixtures.WithGangAnnotationsJobs(testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 64)), }, @@ -103,7 +103,7 @@ func TestGangScheduler(t *testing.T) { cfg.ExperimentalFloatingResources = testfixtures.TestFloatingResourceConfig return cfg }(), - Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), Gangs: [][]*jobdb.Job{ // we have 10 of test-floating-resource so only the first of these two jobs should fit addFloatingResourceRequest("6", testfixtures.WithGangAnnotationsJobs(testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 1))), @@ -119,7 +119,7 @@ func TestGangScheduler(t *testing.T) { map[string]float64{"cpu": 0.5}, testfixtures.TestSchedulingConfig(), ), - Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), Gangs: [][]*jobdb.Job{ testfixtures.WithGangAnnotationsJobs(testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 8)), testfixtures.WithGangAnnotationsJobs(testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 16)), @@ -138,7 +138,7 @@ func TestGangScheduler(t *testing.T) { testfixtures.TestSchedulingConfig(), ), ), - Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), Gangs: [][]*jobdb.Job{ testfixtures.WithGangAnnotationsJobs(testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 1)), testfixtures.WithGangAnnotationsJobs(testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 1)), @@ -159,7 +159,7 @@ func TestGangScheduler(t *testing.T) { testfixtures.TestSchedulingConfig(), ), ), - Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), Gangs: [][]*jobdb.Job{ testfixtures.WithGangAnnotationsJobs(testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 1)), testfixtures.WithGangAnnotationsJobs(testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 1)), @@ -182,7 +182,7 @@ func TestGangScheduler(t *testing.T) { }, testfixtures.TestSchedulingConfig(), ), - Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), Gangs: [][]*jobdb.Job{ testfixtures.WithGangAnnotationsJobs(testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 1)), testfixtures.WithGangAnnotationsJobs(testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 2)), @@ -206,7 +206,7 @@ func TestGangScheduler(t *testing.T) { }, testfixtures.TestSchedulingConfig(), ), - Nodes: testfixtures.N32CpuNodes(3, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(3, testfixtures.TestPriorities), Gangs: [][]*jobdb.Job{ testfixtures.WithGangAnnotationsJobs(testfixtures.N16Cpu128GiJobs("A", testfixtures.PriorityClass0, 1)), testfixtures.WithGangAnnotationsJobs(testfixtures.N16Cpu128GiJobs("A", testfixtures.PriorityClass0, 1)), @@ -228,7 +228,7 @@ func TestGangScheduler(t *testing.T) { }, testfixtures.TestSchedulingConfig(), ), - Nodes: testfixtures.N32CpuNodes(3, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(3, testfixtures.TestPriorities), Gangs: [][]*jobdb.Job{ testfixtures.WithGangAnnotationsJobs(testfixtures.N16Cpu128GiJobs("A", testfixtures.PriorityClass0, 1)), testfixtures.WithGangAnnotationsJobs(testfixtures.N16Cpu128GiJobs("A", testfixtures.PriorityClass0, 1)), @@ -250,25 +250,15 @@ func TestGangScheduler(t *testing.T) { testfixtures.TestSchedulingConfig(), ), Nodes: armadaslices.Concatenate( - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{ - Resources: map[string]resource.Quantity{ - "cpu": resource.MustParse("31.5"), - "memory": resource.MustParse("512Gi"), - "nvidia.com/gpu": resource.MustParse("8"), - }, - }, - testfixtures.N8GpuNodes(1, testfixtures.TestPriorities), + testfixtures.CpuMemGpu("31.5", "512Gi", "8"), + testfixtures.ItN8GpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithUsedResourcesNodes( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{ - Resources: map[string]resource.Quantity{ - "cpu": resource.MustParse("32"), - }, - }, - testfixtures.N8GpuNodes(1, testfixtures.TestPriorities), + testfixtures.Cpu("32"), + testfixtures.ItN8GpuNodes(1, testfixtures.TestPriorities), ), ), Gangs: [][]*jobdb.Job{ @@ -281,9 +271,9 @@ func TestGangScheduler(t *testing.T) { }, "NodeUniformityLabel set but not indexed": { SchedulingConfig: testfixtures.TestSchedulingConfig(), - Nodes: testfixtures.WithLabelsNodes( + Nodes: testfixtures.TestNodeFactory.AddLabels( + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), map[string]string{"foo": "foov"}, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), ), Gangs: [][]*jobdb.Job{ testfixtures.WithGangAnnotationsJobs( @@ -302,7 +292,7 @@ func TestGangScheduler(t *testing.T) { []string{"foo", "bar"}, testfixtures.TestSchedulingConfig(), ), - Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), Gangs: [][]*jobdb.Job{ testfixtures.WithGangAnnotationsJobs( testfixtures.WithNodeUniformityLabelAnnotationJobs( @@ -321,13 +311,13 @@ func TestGangScheduler(t *testing.T) { testfixtures.TestSchedulingConfig(), ), Nodes: armadaslices.Concatenate( - testfixtures.WithLabelsNodes( + testfixtures.TestNodeFactory.AddLabels( + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), map[string]string{"foo": "foov1"}, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithLabelsNodes( + testfixtures.TestNodeFactory.AddLabels( + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), map[string]string{"foo": "foov2"}, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), ), ), Gangs: [][]*jobdb.Job{ @@ -346,25 +336,25 @@ func TestGangScheduler(t *testing.T) { testfixtures.TestSchedulingConfig(), ), Nodes: armadaslices.Concatenate( - testfixtures.WithLabelsNodes( - map[string]string{"foo": "foov1"}, - testfixtures.WithUsedResourcesNodes( + testfixtures.TestNodeFactory.AddLabels( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("1")}}, - testfixtures.N32CpuNodes(2, testfixtures.TestPriorities), + testfixtures.Cpu("1"), + testfixtures.ItN32CpuNodes(2, testfixtures.TestPriorities), ), + map[string]string{"foo": "foov1"}, ), - testfixtures.WithLabelsNodes( + testfixtures.TestNodeFactory.AddLabels( + testfixtures.ItN32CpuNodes(2, testfixtures.TestPriorities), map[string]string{"foo": "foov2"}, - testfixtures.N32CpuNodes(2, testfixtures.TestPriorities), ), - testfixtures.WithLabelsNodes( - map[string]string{"foo": "foov3"}, - testfixtures.WithUsedResourcesNodes( + testfixtures.TestNodeFactory.AddLabels( + testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("1")}}, - testfixtures.N32CpuNodes(2, testfixtures.TestPriorities), + testfixtures.Cpu("1"), + testfixtures.ItN32CpuNodes(2, testfixtures.TestPriorities), ), + map[string]string{"foo": "foov3"}, ), ), Gangs: [][]*jobdb.Job{ @@ -385,13 +375,13 @@ func TestGangScheduler(t *testing.T) { testfixtures.TestSchedulingConfig(), ), Nodes: append( - testfixtures.WithLabelsNodes( + testfixtures.TestNodeFactory.AddLabels( + testfixtures.ItN32CpuNodes(2, testfixtures.TestPriorities), map[string]string{"my-cool-node-uniformity": "a"}, - testfixtures.N32CpuNodes(2, testfixtures.TestPriorities), ), - testfixtures.WithLabelsNodes( + testfixtures.TestNodeFactory.AddLabels( + testfixtures.ItN32CpuNodes(3, testfixtures.TestPriorities), map[string]string{"my-cool-node-uniformity": "b"}, - testfixtures.N32CpuNodes(3, testfixtures.TestPriorities), )..., ), Gangs: [][]*jobdb.Job{ @@ -454,16 +444,12 @@ func TestGangScheduler(t *testing.T) { } return config }(), - Nodes: func() []*schedulerobjects.Node { - nodes := testfixtures.N8GpuNodes(1, []int32{29000, 30000}) - for _, node := range nodes { - node.Taints = []v1.Taint{ - {Key: "taint-a", Value: "true", Effect: v1.TaintEffectNoSchedule}, - {Key: "taint-b", Value: "true", Effect: v1.TaintEffectNoSchedule}, - } - } - return nodes - }(), + Nodes: testfixtures.TestNodeFactory.AddTaints( + testfixtures.ItN8GpuNodes(1, []int32{29000, 30000}), + []v1.Taint{ + {Key: "taint-a", Value: "true", Effect: v1.TaintEffectNoSchedule}, + {Key: "taint-b", Value: "true", Effect: v1.TaintEffectNoSchedule}, + }), Gangs: func() (gangs [][]*jobdb.Job) { var jobId ulid.ULID jobId = util.ULID() @@ -507,15 +493,12 @@ func TestGangScheduler(t *testing.T) { } return config }(), - Nodes: func() []*schedulerobjects.Node { - nodes := testfixtures.N32CpuNodes(1, []int32{29000, 30000}) - for _, node := range nodes { - node.Taints = []v1.Taint{ - {Key: "taint-a", Value: "true", Effect: v1.TaintEffectNoSchedule}, - } - } - return nodes - }(), + Nodes: testfixtures.TestNodeFactory.AddTaints( + testfixtures.ItN32CpuNodes(1, []int32{29000, 30000}), + []v1.Taint{ + {Key: "taint-a", Value: "true", Effect: v1.TaintEffectNoSchedule}, + }, + ), Gangs: func() (gangs [][]*jobdb.Job) { jobId := util.ULID() gangs = append(gangs, []*jobdb.Job{testfixtures.TestJob("A", jobId, "armada-preemptible-away", testfixtures.Test32Cpu256GiPodReqs("A", jobId, 30000))}) @@ -535,7 +518,7 @@ func TestGangScheduler(t *testing.T) { cfg.MaximumPerQueueSchedulingRate = 2 return cfg }(), - Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), Gangs: [][]*jobdb.Job{ testfixtures.WithGangAnnotationsJobs(testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 2)), testfixtures.WithGangAnnotationsJobs(testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 1)), @@ -548,7 +531,7 @@ func TestGangScheduler(t *testing.T) { }, "Hitting globally applicable constraint makes job scheduling key unfeasible": { SchedulingConfig: testfixtures.TestSchedulingConfig(), - Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), Gangs: [][]*jobdb.Job{ testfixtures.WithGangAnnotationsJobs(testfixtures.N1GpuJobs("A", testfixtures.PriorityClass0, 1)), testfixtures.WithGangAnnotationsJobs(testfixtures.N1GpuJobs("B", testfixtures.PriorityClass0, 1)), @@ -584,9 +567,9 @@ func TestGangScheduler(t *testing.T) { expectedUnfeasibleJobSchedulingKeys = append(expectedUnfeasibleJobSchedulingKeys, key) } - nodesById := make(map[string]*schedulerobjects.Node, len(tc.Nodes)) + nodesById := make(map[string]*internaltypes.Node, len(tc.Nodes)) for _, node := range tc.Nodes { - nodesById[node.Id] = node + nodesById[node.GetId()] = node } nodeDb, err := nodedb.NewNodeDb( tc.SchedulingConfig.PriorityClasses, @@ -599,17 +582,13 @@ func TestGangScheduler(t *testing.T) { require.NoError(t, err) txn := nodeDb.Txn(true) for _, node := range tc.Nodes { - dbNode, err := testfixtures.TestNodeFactory.FromSchedulerObjectsNode(node) - require.NoError(t, err) - err = nodeDb.CreateAndInsertWithJobDbJobsWithTxn(txn, nil, dbNode) + err = nodeDb.CreateAndInsertWithJobDbJobsWithTxn(txn, nil, node) require.NoError(t, err) } txn.Commit() - if tc.TotalResources.Resources == nil { + if tc.TotalResources.AllZero() { // Default to NodeDb total. - tc.TotalResources = schedulerobjects.ResourceList{ - Resources: nodeDb.TotalKubernetesResources().ToMap(), - } + tc.TotalResources = nodeDb.TotalKubernetesResources() } priorityFactorByQueue := make(map[string]float64) for _, jobs := range tc.Gangs { @@ -618,10 +597,8 @@ func TestGangScheduler(t *testing.T) { } } - totalResources := testfixtures.TestResourceListFactory.FromNodeProto(tc.TotalResources.Resources) - fairnessCostProvider, err := fairness.NewDominantResourceFairness( - totalResources, + tc.TotalResources, tc.SchedulingConfig, ) require.NoError(t, err) @@ -632,7 +609,7 @@ func TestGangScheduler(t *testing.T) { rate.Limit(tc.SchedulingConfig.MaximumSchedulingRate), tc.SchedulingConfig.MaximumSchedulingBurst, ), - totalResources, + tc.TotalResources, ) for queue, priorityFactor := range priorityFactorByQueue { err := sctx.AddQueueSchedulingContext( @@ -650,7 +627,7 @@ func TestGangScheduler(t *testing.T) { } constraints := schedulerconstraints.NewSchedulingConstraints( "pool", - totalResources, + tc.TotalResources, tc.SchedulingConfig, armadaslices.Map( maps.Keys(priorityFactorByQueue), @@ -687,7 +664,7 @@ func TestGangScheduler(t *testing.T) { } node := nodesById[pctx.NodeId] require.NotNil(t, node) - value, ok := node.Labels[nodeUniformity] + value, ok := node.GetLabelValue(nodeUniformity) require.True(t, ok, "gang job scheduled onto node with missing nodeUniformityLabel") nodeUniformityLabelValues[value] = true } diff --git a/internal/scheduler/scheduling/preempting_queue_scheduler_test.go b/internal/scheduler/scheduling/preempting_queue_scheduler_test.go index 76e6dc492fb..003713801dd 100644 --- a/internal/scheduler/scheduling/preempting_queue_scheduler_test.go +++ b/internal/scheduler/scheduling/preempting_queue_scheduler_test.go @@ -24,7 +24,6 @@ import ( "github.com/armadaproject/armada/internal/scheduler/internaltypes" "github.com/armadaproject/armada/internal/scheduler/jobdb" "github.com/armadaproject/armada/internal/scheduler/nodedb" - "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" schedulerconstraints "github.com/armadaproject/armada/internal/scheduler/scheduling/constraints" "github.com/armadaproject/armada/internal/scheduler/scheduling/context" "github.com/armadaproject/armada/internal/scheduler/scheduling/fairness" @@ -52,13 +51,12 @@ func TestEvictOversubscribed(t *testing.T) { stringInterner := stringinterner.New(1024) - node := testfixtures.Test32CpuNode(priorities) + node := testfixtures.ItTest32CpuNode(priorities) nodeDb, err := NewNodeDb(config, stringInterner) require.NoError(t, err) - dbNode, err := testfixtures.TestNodeFactory.FromSchedulerObjectsNode(node) require.NoError(t, err) nodeDbTxn := nodeDb.Txn(true) - err = nodeDb.CreateAndInsertWithJobDbJobsWithTxn(nodeDbTxn, jobs, dbNode) + err = nodeDb.CreateAndInsertWithJobDbJobsWithTxn(nodeDbTxn, jobs, node) require.NoError(t, err) jobDb := jobdb.NewJobDb(config.PriorityClasses, config.DefaultPriorityClassName, stringInterner, testfixtures.TestResourceListFactory) @@ -100,7 +98,7 @@ func TestPreemptingQueueScheduler(t *testing.T) { tests := map[string]struct { SchedulingConfig configuration.SchedulingConfig // Nodes to be considered by the scheduler. - Nodes []*schedulerobjects.Node + Nodes []*internaltypes.Node // Each item corresponds to a call to Reschedule(). Rounds []SchedulingRound // Map from queue to the priority factor associated with that queue. @@ -110,7 +108,7 @@ func TestPreemptingQueueScheduler(t *testing.T) { }{ "balancing three queues": { SchedulingConfig: testfixtures.TestSchedulingConfig(), - Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), Rounds: []SchedulingRound{ { JobsByQueue: map[string][]*jobdb.Job{ @@ -166,7 +164,7 @@ func TestPreemptingQueueScheduler(t *testing.T) { }, "balancing two queues weighted": { SchedulingConfig: testfixtures.TestSchedulingConfig(), - Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), Rounds: []SchedulingRound{ { JobsByQueue: map[string][]*jobdb.Job{ @@ -204,7 +202,7 @@ func TestPreemptingQueueScheduler(t *testing.T) { }, "balancing two queues weighted with inactive queues": { SchedulingConfig: testfixtures.TestSchedulingConfig(), - Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), Rounds: []SchedulingRound{ { JobsByQueue: map[string][]*jobdb.Job{ @@ -244,7 +242,7 @@ func TestPreemptingQueueScheduler(t *testing.T) { }, "don't prempt jobs where we don't know the queue": { SchedulingConfig: testfixtures.TestSchedulingConfig(), - Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), Rounds: []SchedulingRound{ { JobsByQueue: map[string][]*jobdb.Job{ @@ -264,7 +262,7 @@ func TestPreemptingQueueScheduler(t *testing.T) { }, "avoid preemption when not improving fairness": { SchedulingConfig: testfixtures.TestSchedulingConfig(), - Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), Rounds: []SchedulingRound{ { JobsByQueue: map[string][]*jobdb.Job{ @@ -287,7 +285,7 @@ func TestPreemptingQueueScheduler(t *testing.T) { }, "avoid preemption when not improving fairness reverse queue naming": { SchedulingConfig: testfixtures.TestSchedulingConfig(), - Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), Rounds: []SchedulingRound{ { JobsByQueue: map[string][]*jobdb.Job{ @@ -310,7 +308,7 @@ func TestPreemptingQueueScheduler(t *testing.T) { }, "preemption when improving fairness": { SchedulingConfig: testfixtures.TestSchedulingConfig(), - Nodes: testfixtures.N32CpuNodes(2, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(2, testfixtures.TestPriorities), Rounds: []SchedulingRound{ { JobsByQueue: map[string][]*jobdb.Job{ @@ -341,7 +339,7 @@ func TestPreemptingQueueScheduler(t *testing.T) { }, "reschedule onto same node": { SchedulingConfig: testfixtures.TestSchedulingConfig(), - Nodes: testfixtures.N32CpuNodes(2, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(2, testfixtures.TestPriorities), Rounds: []SchedulingRound{ { JobsByQueue: map[string][]*jobdb.Job{ @@ -368,7 +366,7 @@ func TestPreemptingQueueScheduler(t *testing.T) { }, "reschedule onto same node reverse order": { SchedulingConfig: testfixtures.TestSchedulingConfig(), - Nodes: testfixtures.N32CpuNodes(2, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(2, testfixtures.TestPriorities), Rounds: []SchedulingRound{ { JobsByQueue: map[string][]*jobdb.Job{ @@ -395,7 +393,7 @@ func TestPreemptingQueueScheduler(t *testing.T) { }, "urgency-based preemption stability": { SchedulingConfig: testfixtures.TestSchedulingConfig(), - Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), Rounds: []SchedulingRound{ { JobsByQueue: map[string][]*jobdb.Job{ @@ -432,7 +430,7 @@ func TestPreemptingQueueScheduler(t *testing.T) { }, "avoid urgency-based preemption when possible": { SchedulingConfig: testfixtures.TestSchedulingConfig(), - Nodes: testfixtures.N32CpuNodes(2, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(2, testfixtures.TestPriorities), Rounds: []SchedulingRound{ { JobsByQueue: map[string][]*jobdb.Job{ @@ -458,7 +456,7 @@ func TestPreemptingQueueScheduler(t *testing.T) { }, "preempt in order of priority": { SchedulingConfig: testfixtures.TestSchedulingConfig(), - Nodes: testfixtures.N32CpuNodes(2, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(2, testfixtures.TestPriorities), Rounds: []SchedulingRound{ { JobsByQueue: map[string][]*jobdb.Job{ @@ -497,7 +495,7 @@ func TestPreemptingQueueScheduler(t *testing.T) { }, "avoid urgency-based preemption when possible cross-queue": { SchedulingConfig: testfixtures.TestSchedulingConfig(), - Nodes: testfixtures.N32CpuNodes(3, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(3, testfixtures.TestPriorities), Rounds: []SchedulingRound{ { JobsByQueue: map[string][]*jobdb.Job{ @@ -549,7 +547,7 @@ func TestPreemptingQueueScheduler(t *testing.T) { }, "gang preemption": { SchedulingConfig: testfixtures.TestSchedulingConfig(), - Nodes: testfixtures.N32CpuNodes(2, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(2, testfixtures.TestPriorities), Rounds: []SchedulingRound{ { // Fill half of node 1 and half of node 2. @@ -596,7 +594,7 @@ func TestPreemptingQueueScheduler(t *testing.T) { "gang preemption avoid cascading preemption": { SchedulingConfig: testfixtures.TestSchedulingConfig(), - Nodes: testfixtures.N32CpuNodes(3, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(3, testfixtures.TestPriorities), Rounds: []SchedulingRound{ { // Schedule a gang spanning nodes 1 and 2. @@ -642,7 +640,7 @@ func TestPreemptingQueueScheduler(t *testing.T) { }, "rescheduled jobs don't count towards global scheduling rate limit": { SchedulingConfig: testfixtures.WithGlobalSchedulingRateLimiterConfig(2, 5, testfixtures.TestSchedulingConfig()), - Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), Rounds: []SchedulingRound{ { JobsByQueue: map[string][]*jobdb.Job{ @@ -667,7 +665,7 @@ func TestPreemptingQueueScheduler(t *testing.T) { }, "MaximumSchedulingRate": { SchedulingConfig: testfixtures.WithGlobalSchedulingRateLimiterConfig(2, 4, testfixtures.TestSchedulingConfig()), - Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), Rounds: []SchedulingRound{ { JobsByQueue: map[string][]*jobdb.Job{ @@ -703,7 +701,7 @@ func TestPreemptingQueueScheduler(t *testing.T) { }, "rescheduled jobs don't count towards maxQueueLookback": { SchedulingConfig: testfixtures.WithMaxLookbackPerQueueConfig(5, testfixtures.TestSchedulingConfig()), - Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), Rounds: []SchedulingRound{ { JobsByQueue: map[string][]*jobdb.Job{ @@ -733,7 +731,7 @@ func TestPreemptingQueueScheduler(t *testing.T) { }, testfixtures.TestSchedulingConfig(), ), - Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), Rounds: []SchedulingRound{ { JobsByQueue: map[string][]*jobdb.Job{ @@ -758,7 +756,7 @@ func TestPreemptingQueueScheduler(t *testing.T) { }, "priority class preemption two classes": { SchedulingConfig: testfixtures.TestSchedulingConfig(), - Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), Rounds: []SchedulingRound{ { JobsByQueue: map[string][]*jobdb.Job{ @@ -788,7 +786,7 @@ func TestPreemptingQueueScheduler(t *testing.T) { }, "priority class preemption cross-queue": { SchedulingConfig: testfixtures.TestSchedulingConfig(), - Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), Rounds: []SchedulingRound{ { JobsByQueue: map[string][]*jobdb.Job{ @@ -819,7 +817,7 @@ func TestPreemptingQueueScheduler(t *testing.T) { }, "priority class preemption not scheduled": { SchedulingConfig: testfixtures.TestSchedulingConfig(), - Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), Rounds: []SchedulingRound{ { JobsByQueue: map[string][]*jobdb.Job{ @@ -840,7 +838,7 @@ func TestPreemptingQueueScheduler(t *testing.T) { // - 17 jobs should be preempted to make space; all the priority zero jobs and a single priority 1 job "priority class preemption through multiple levels": { SchedulingConfig: testfixtures.TestSchedulingConfig(), - Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), Rounds: []SchedulingRound{ { JobsByQueue: map[string][]*jobdb.Job{ @@ -884,7 +882,7 @@ func TestPreemptingQueueScheduler(t *testing.T) { // time="2024-09-27T09:09:05+01:00" level=info msg="Scheduling new jobs; affected queues [B]; resources map[B:{cpu: 16, memory: 64Gi}]; jobs per queue map[B:16]" round=1 //"broken priority class preemption with non-preemptible priority classes": { // SchedulingConfig: testfixtures.TestSchedulingConfig(), - // Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + // Nodes: testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), // Rounds: []SchedulingRound{ // { // JobsByQueue: map[string][]*jobdb.Job{ @@ -906,7 +904,7 @@ func TestPreemptingQueueScheduler(t *testing.T) { //}, "priority class preemption four classes": { SchedulingConfig: testfixtures.TestSchedulingConfig(), - Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), Rounds: []SchedulingRound{ { JobsByQueue: map[string][]*jobdb.Job{ @@ -948,7 +946,7 @@ func TestPreemptingQueueScheduler(t *testing.T) { }, testfixtures.TestSchedulingConfig(), ), - Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), Rounds: []SchedulingRound{ { JobsByQueue: map[string][]*jobdb.Job{ @@ -984,7 +982,7 @@ func TestPreemptingQueueScheduler(t *testing.T) { }, testfixtures.TestSchedulingConfig(), ), - Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), Rounds: []SchedulingRound{ { JobsByQueue: map[string][]*jobdb.Job{ @@ -1052,7 +1050,7 @@ func TestPreemptingQueueScheduler(t *testing.T) { map[string]float64{"cpu": 15.0 / 64.0}, testfixtures.TestSchedulingConfig(), ), - Nodes: testfixtures.N32CpuNodes(2, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(2, testfixtures.TestPriorities), Rounds: []SchedulingRound{ { JobsByQueue: map[string][]*jobdb.Job{ @@ -1099,7 +1097,7 @@ func TestPreemptingQueueScheduler(t *testing.T) { }, "Queued jobs are not preempted cross queue": { SchedulingConfig: testfixtures.TestSchedulingConfig(), - Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), Rounds: []SchedulingRound{ { JobsByQueue: map[string][]*jobdb.Job{ @@ -1119,7 +1117,7 @@ func TestPreemptingQueueScheduler(t *testing.T) { }, "Queued jobs are not preempted cross queue with some scheduled": { SchedulingConfig: testfixtures.TestSchedulingConfig(), - Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), Rounds: []SchedulingRound{ { JobsByQueue: map[string][]*jobdb.Job{ @@ -1140,7 +1138,7 @@ func TestPreemptingQueueScheduler(t *testing.T) { }, "Queued jobs are not preempted cross queue with non-preemptible jobs": { SchedulingConfig: testfixtures.TestSchedulingConfig(), - Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), Rounds: []SchedulingRound{ { JobsByQueue: map[string][]*jobdb.Job{ @@ -1160,7 +1158,7 @@ func TestPreemptingQueueScheduler(t *testing.T) { }, "Queued jobs are not preempted cross queue multiple rounds": { SchedulingConfig: testfixtures.TestSchedulingConfig(), - Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), Rounds: []SchedulingRound{ { JobsByQueue: map[string][]*jobdb.Job{ @@ -1188,7 +1186,7 @@ func TestPreemptingQueueScheduler(t *testing.T) { }, "Oversubscribed eviction does not evict non-preemptible": { SchedulingConfig: testfixtures.TestSchedulingConfig(), - Nodes: testfixtures.N32CpuNodes(2, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(2, testfixtures.TestPriorities), Rounds: []SchedulingRound{ { JobsByQueue: map[string][]*jobdb.Job{ @@ -1226,7 +1224,7 @@ func TestPreemptingQueueScheduler(t *testing.T) { }, "Cordoning prevents scheduling new jobs but not re-scheduling running jobs": { SchedulingConfig: testfixtures.TestSchedulingConfig(), - Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), Rounds: []SchedulingRound{ { JobsByQueue: map[string][]*jobdb.Job{ @@ -1259,7 +1257,7 @@ func TestPreemptingQueueScheduler(t *testing.T) { 1.0, testfixtures.TestSchedulingConfig(), ), - Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), Rounds: []SchedulingRound{ { JobsByQueue: map[string][]*jobdb.Job{ @@ -1295,7 +1293,7 @@ func TestPreemptingQueueScheduler(t *testing.T) { 0.5, testfixtures.TestSchedulingConfig(), ), - Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), Rounds: []SchedulingRound{ { JobsByQueue: map[string][]*jobdb.Job{ @@ -1331,7 +1329,7 @@ func TestPreemptingQueueScheduler(t *testing.T) { 0.5, testfixtures.TestSchedulingConfig(), ), - Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), Rounds: []SchedulingRound{ { JobsByQueue: map[string][]*jobdb.Job{ @@ -1375,7 +1373,7 @@ func TestPreemptingQueueScheduler(t *testing.T) { 1.0, testfixtures.TestSchedulingConfig(), ), - Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), Rounds: []SchedulingRound{ { JobsByQueue: map[string][]*jobdb.Job{ @@ -1411,7 +1409,7 @@ func TestPreemptingQueueScheduler(t *testing.T) { 1.0, testfixtures.TestSchedulingConfig(), ), - Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), Rounds: []SchedulingRound{ { JobsByQueue: map[string][]*jobdb.Job{ @@ -1439,7 +1437,7 @@ func TestPreemptingQueueScheduler(t *testing.T) { }, "DominantResourceFairness": { SchedulingConfig: testfixtures.TestSchedulingConfig(), - Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), Rounds: []SchedulingRound{ { JobsByQueue: map[string][]*jobdb.Job{ @@ -1461,19 +1459,19 @@ func TestPreemptingQueueScheduler(t *testing.T) { "nodeAffinity node notIn": { SchedulingConfig: testfixtures.TestSchedulingConfig(), Nodes: armadaslices.Concatenate( - testfixtures.WithLabelsNodes( + testfixtures.TestNodeFactory.AddLabels( + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), map[string]string{"key": "val1"}, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithLabelsNodes( + testfixtures.TestNodeFactory.AddLabels( + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), map[string]string{"key": "val2"}, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithLabelsNodes( + testfixtures.TestNodeFactory.AddLabels( + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), map[string]string{"key": "val3"}, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), Rounds: []SchedulingRound{ { @@ -1507,7 +1505,7 @@ func TestPreemptingQueueScheduler(t *testing.T) { // Schedule job from queue C. // This does not prevent re-scheduling. SchedulingConfig: testfixtures.TestSchedulingConfig(), - Nodes: testfixtures.N32CpuNodes(2, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(2, testfixtures.TestPriorities), Rounds: []SchedulingRound{ { JobsByQueue: map[string][]*jobdb.Job{ @@ -1544,7 +1542,7 @@ func TestPreemptingQueueScheduler(t *testing.T) { // Try to schedule a job from queue B. This fails as the node is cordoned. // This should not prevent re-scheduling jobs in queue A. SchedulingConfig: testfixtures.TestSchedulingConfig(), - Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), Rounds: []SchedulingRound{ { JobsByQueue: map[string][]*jobdb.Job{ @@ -1575,13 +1573,10 @@ func TestPreemptingQueueScheduler(t *testing.T) { } return config }(), - Nodes: func() []*schedulerobjects.Node { - nodes := testfixtures.N8GpuNodes(2, []int32{29000, 30000}) - for _, node := range nodes { - node.Taints = []v1.Taint{{Key: "gpu", Value: "true", Effect: v1.TaintEffectNoSchedule}} - } - return nodes - }(), + Nodes: testfixtures.TestNodeFactory.AddTaints( + testfixtures.ItN8GpuNodes(2, []int32{29000, 30000}), + []v1.Taint{{Key: "gpu", Value: "true", Effect: v1.TaintEffectNoSchedule}}, + ), Rounds: []SchedulingRound{ { JobsByQueue: map[string][]*jobdb.Job{ @@ -1635,13 +1630,10 @@ func TestPreemptingQueueScheduler(t *testing.T) { config.DefaultPriorityClassName = "armada-preemptible" return config }(), - Nodes: func() []*schedulerobjects.Node { - nodes := testfixtures.N8GpuNodes(2, []int32{29000, 30000}) - for _, node := range nodes { - node.Taints = []v1.Taint{{Key: "gpu", Value: "true", Effect: v1.TaintEffectNoSchedule}} - } - return nodes - }(), + Nodes: testfixtures.TestNodeFactory.AddTaints( + testfixtures.ItN8GpuNodes(2, []int32{29000, 30000}), + []v1.Taint{{Key: "gpu", Value: "true", Effect: v1.TaintEffectNoSchedule}}, + ), Rounds: []SchedulingRound{ { JobsByQueue: map[string][]*jobdb.Job{ @@ -1701,13 +1693,13 @@ func TestPreemptingQueueScheduler(t *testing.T) { } return config }(), - Nodes: func() []*schedulerobjects.Node { + Nodes: func() []*internaltypes.Node { priorities := []int32{29000, 30000} - gpuNodes := testfixtures.N8GpuNodes(1, priorities) - for _, node := range gpuNodes { - node.Taints = []v1.Taint{{Key: "gpu", Value: "true", Effect: v1.TaintEffectNoSchedule}} - } - return append(testfixtures.N32CpuNodes(1, priorities), gpuNodes...) + gpuNodes := testfixtures.TestNodeFactory.AddTaints( + testfixtures.ItN8GpuNodes(1, priorities), + []v1.Taint{{Key: "gpu", Value: "true", Effect: v1.TaintEffectNoSchedule}}, + ) + return append(testfixtures.ItN32CpuNodes(1, priorities), gpuNodes...) }(), Rounds: []SchedulingRound{ { @@ -1762,13 +1754,13 @@ func TestPreemptingQueueScheduler(t *testing.T) { config.DefaultPriorityClassName = "armada-preemptible" return config }(), - Nodes: func() []*schedulerobjects.Node { + Nodes: func() []*internaltypes.Node { priorities := []int32{29000, 30000} - gpuNodes := testfixtures.N8GpuNodes(1, priorities) - for _, node := range gpuNodes { - node.Taints = []v1.Taint{{Key: "gpu", Value: "true", Effect: v1.TaintEffectNoSchedule}} - } - return append(testfixtures.N32CpuNodes(1, priorities), gpuNodes...) + gpuNodes := testfixtures.TestNodeFactory.AddTaints( + testfixtures.ItN8GpuNodes(1, priorities), + []v1.Taint{{Key: "gpu", Value: "true", Effect: v1.TaintEffectNoSchedule}}, + ) + return append(testfixtures.ItN32CpuNodes(1, priorities), gpuNodes...) }(), Rounds: []SchedulingRound{ { @@ -1836,12 +1828,10 @@ func TestPreemptingQueueScheduler(t *testing.T) { } return config }(), - Nodes: func() []*schedulerobjects.Node { - priorities := []int32{29000, 28000, 30000} - nodes := testfixtures.N32CpuNodes(1, priorities) - nodes[0].Taints = []v1.Taint{{Key: "gpu", Value: "true", Effect: v1.TaintEffectNoSchedule}} - return nodes - }(), + Nodes: testfixtures.TestNodeFactory.AddTaints( + testfixtures.ItN32CpuNodes(1, []int32{29000, 28000, 30000}), + []v1.Taint{{Key: "gpu", Value: "true", Effect: v1.TaintEffectNoSchedule}}, + ), Rounds: []SchedulingRound{ { JobsByQueue: map[string][]*jobdb.Job{ @@ -1962,9 +1952,7 @@ func TestPreemptingQueueScheduler(t *testing.T) { require.NoError(t, err) nodeDbTxn := nodeDb.Txn(true) for _, node := range tc.Nodes { - dbNode, err := testfixtures.TestNodeFactory.FromSchedulerObjectsNode(node) - require.NoError(t, err) - err = nodeDb.CreateAndInsertWithJobDbJobsWithTxn(nodeDbTxn, jobsByNode[node.Name], dbNode) + err = nodeDb.CreateAndInsertWithJobDbJobsWithTxn(nodeDbTxn, jobsByNode[node.GetName()], node.DeepCopyNilKeys()) require.NoError(t, err) } nodeDbTxn.Commit() @@ -2011,7 +1999,7 @@ func TestPreemptingQueueScheduler(t *testing.T) { } for idx, isCordoned := range cordonedNodes { if isCordoned { - node, err := nodeDb.GetNode(tc.Nodes[idx].Id) + node, err := nodeDb.GetNode(tc.Nodes[idx].GetId()) require.NoError(t, err) ctx.Infof("Cordoned node %s", node.GetId()) taints := append(slices.Clone(node.GetTaints()), internaltypes.UnschedulableTaint()) @@ -2260,7 +2248,7 @@ func jobIdsByQueueFromJobContexts(jctxs []*context.JobSchedulingContext) map[str func BenchmarkPreemptingQueueScheduler(b *testing.B) { tests := map[string]struct { SchedulingConfig configuration.SchedulingConfig - Nodes []*schedulerobjects.Node + Nodes []*internaltypes.Node JobFunc func(queue string, priorityClassName string, n int) []*jobdb.Job NumQueues int NumJobsPerQueue int @@ -2269,7 +2257,7 @@ func BenchmarkPreemptingQueueScheduler(b *testing.B) { }{ "1 node 1 queue 320 jobs": { SchedulingConfig: testfixtures.TestSchedulingConfig(), - Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), JobFunc: testfixtures.N1Cpu4GiJobs, NumQueues: 1, NumJobsPerQueue: 320, @@ -2278,7 +2266,7 @@ func BenchmarkPreemptingQueueScheduler(b *testing.B) { }, "1 node 10 queues 320 jobs": { SchedulingConfig: testfixtures.TestSchedulingConfig(), - Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), JobFunc: testfixtures.N1Cpu4GiJobs, NumQueues: 10, NumJobsPerQueue: 320, @@ -2287,7 +2275,7 @@ func BenchmarkPreemptingQueueScheduler(b *testing.B) { }, "10 nodes 1 queue 3200 jobs": { SchedulingConfig: testfixtures.TestSchedulingConfig(), - Nodes: testfixtures.N32CpuNodes(10, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(10, testfixtures.TestPriorities), JobFunc: testfixtures.N1Cpu4GiJobs, NumQueues: 1, NumJobsPerQueue: 3200, @@ -2296,7 +2284,7 @@ func BenchmarkPreemptingQueueScheduler(b *testing.B) { }, "10 nodes 10 queues 3200 jobs": { SchedulingConfig: testfixtures.TestSchedulingConfig(), - Nodes: testfixtures.N32CpuNodes(10, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(10, testfixtures.TestPriorities), JobFunc: testfixtures.N1Cpu4GiJobs, NumQueues: 10, NumJobsPerQueue: 3200, @@ -2305,7 +2293,7 @@ func BenchmarkPreemptingQueueScheduler(b *testing.B) { }, "100 nodes 1 queue 32000 jobs": { SchedulingConfig: testfixtures.TestSchedulingConfig(), - Nodes: testfixtures.N32CpuNodes(100, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(100, testfixtures.TestPriorities), JobFunc: testfixtures.N1Cpu4GiJobs, NumQueues: 1, NumJobsPerQueue: 32000, @@ -2314,7 +2302,7 @@ func BenchmarkPreemptingQueueScheduler(b *testing.B) { }, "100 nodes 10 queues 32000 jobs": { SchedulingConfig: testfixtures.TestSchedulingConfig(), - Nodes: testfixtures.N32CpuNodes(100, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(100, testfixtures.TestPriorities), JobFunc: testfixtures.N1Cpu4GiJobs, NumQueues: 10, NumJobsPerQueue: 32000, @@ -2323,7 +2311,7 @@ func BenchmarkPreemptingQueueScheduler(b *testing.B) { }, "1000 nodes 1 queue 320000 jobs": { SchedulingConfig: testfixtures.TestSchedulingConfig(), - Nodes: testfixtures.N32CpuNodes(1000, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(1000, testfixtures.TestPriorities), JobFunc: testfixtures.N1Cpu4GiJobs, NumQueues: 1, NumJobsPerQueue: 320000, @@ -2332,7 +2320,7 @@ func BenchmarkPreemptingQueueScheduler(b *testing.B) { }, "1000 nodes 10 queues 320000 jobs": { SchedulingConfig: testfixtures.TestSchedulingConfig(), - Nodes: testfixtures.N32CpuNodes(1000, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(1000, testfixtures.TestPriorities), JobFunc: testfixtures.N1Cpu4GiJobs, NumQueues: 1, NumJobsPerQueue: 32000, @@ -2357,9 +2345,7 @@ func BenchmarkPreemptingQueueScheduler(b *testing.B) { require.NoError(b, err) txn := nodeDb.Txn(true) for _, node := range tc.Nodes { - dbNode, err := testfixtures.TestNodeFactory.FromSchedulerObjectsNode(node) - require.NoError(b, err) - err = nodeDb.CreateAndInsertWithJobDbJobsWithTxn(txn, nil, dbNode) + err = nodeDb.CreateAndInsertWithJobDbJobsWithTxn(txn, nil, node.DeepCopyNilKeys()) require.NoError(b, err) } txn.Commit() @@ -2454,9 +2440,7 @@ func BenchmarkPreemptingQueueScheduler(b *testing.B) { require.NoError(b, err) txn = nodeDb.Txn(true) for _, node := range tc.Nodes { - dbNode, err := testfixtures.TestNodeFactory.FromSchedulerObjectsNode(node) - require.NoError(b, err) - err = nodeDb.CreateAndInsertWithJobDbJobsWithTxn(txn, jobsByNodeId[node.Id], dbNode) + err = nodeDb.CreateAndInsertWithJobDbJobsWithTxn(txn, jobsByNodeId[node.GetId()], node.DeepCopyNilKeys()) require.NoError(b, err) } txn.Commit() diff --git a/internal/scheduler/scheduling/queue_scheduler_test.go b/internal/scheduler/scheduling/queue_scheduler_test.go index fdb6f47cc2c..74a6b29fdfe 100644 --- a/internal/scheduler/scheduling/queue_scheduler_test.go +++ b/internal/scheduler/scheduling/queue_scheduler_test.go @@ -40,7 +40,7 @@ func TestQueueScheduler(t *testing.T) { // Initial resource usage for all queues. InitialAllocatedByQueueAndPriorityClass map[string]schedulerobjects.QuantityByTAndResourceType[string] // Nodes to be considered by the scheduler. - Nodes []*schedulerobjects.Node + Nodes []*internaltypes.Node // Jobs to try scheduling. Jobs []*jobdb.Job // Indices of jobs expected to be scheduled. @@ -51,48 +51,48 @@ func TestQueueScheduler(t *testing.T) { "simple success": { SchedulingConfig: testfixtures.TestSchedulingConfig(), Queues: testfixtures.SingleQueuePriorityOne("A"), - Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), Jobs: testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 32), ExpectedScheduledIndices: testfixtures.IntRange(0, 31), }, "simple failure": { SchedulingConfig: testfixtures.TestSchedulingConfig(), Queues: testfixtures.SingleQueuePriorityOne("A"), - Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), Jobs: testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 33), ExpectedScheduledIndices: testfixtures.IntRange(0, 31), }, "multiple nodes": { SchedulingConfig: testfixtures.TestSchedulingConfig(), Queues: testfixtures.SingleQueuePriorityOne("A"), - Nodes: testfixtures.N32CpuNodes(2, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(2, testfixtures.TestPriorities), Jobs: testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 64), ExpectedScheduledIndices: testfixtures.IntRange(0, 63), }, "preempt lower-priority jobs": { SchedulingConfig: testfixtures.TestSchedulingConfig(), - Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), Jobs: armadaslices.Concatenate(testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 1), testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass1, 1)), Queues: testfixtures.SingleQueuePriorityOne("A"), ExpectedScheduledIndices: testfixtures.IntRange(0, 1), }, "no preemption of higher-priority jobs": { SchedulingConfig: testfixtures.TestSchedulingConfig(), - Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), Jobs: armadaslices.Concatenate(testfixtures.N32Cpu256GiJobs("A", testfixtures.PriorityClass1, 1), testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 1)), Queues: testfixtures.SingleQueuePriorityOne("A"), ExpectedScheduledIndices: testfixtures.IntRange(0, 0), }, "unschedulable jobs do not block schedulable jobs": { SchedulingConfig: testfixtures.TestSchedulingConfig(), - Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), Jobs: armadaslices.Concatenate(testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 1), testfixtures.N32Cpu256GiJobs("A", testfixtures.PriorityClass0, 10), testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 1)), Queues: testfixtures.SingleQueuePriorityOne("A"), ExpectedScheduledIndices: []int{0, 11}, }, "MaximumSchedulingBurst": { SchedulingConfig: testfixtures.WithGlobalSchedulingRateLimiterConfig(10, 2, testfixtures.TestSchedulingConfig()), - Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), Jobs: armadaslices.Concatenate( testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 1), testfixtures.N32Cpu256GiJobs("A", testfixtures.PriorityClass0, 10), @@ -104,7 +104,7 @@ func TestQueueScheduler(t *testing.T) { }, "MaximumPerQueueSchedulingBurst": { SchedulingConfig: testfixtures.WithPerQueueSchedulingLimiterConfig(10, 2, testfixtures.TestSchedulingConfig()), - Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), Jobs: armadaslices.Concatenate( testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 1), testfixtures.N32Cpu256GiJobs("A", testfixtures.PriorityClass0, 10), @@ -117,7 +117,7 @@ func TestQueueScheduler(t *testing.T) { }, "MaximumSchedulingBurst is not exceeded by gangs": { SchedulingConfig: testfixtures.WithGlobalSchedulingRateLimiterConfig(10, 2, testfixtures.TestSchedulingConfig()), - Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), Jobs: armadaslices.Concatenate( testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 1), testfixtures.N32Cpu256GiJobs("A", testfixtures.PriorityClass0, 1), @@ -131,7 +131,7 @@ func TestQueueScheduler(t *testing.T) { }, "MaximumPerQueueSchedulingBurst is not exceeded by gangs": { SchedulingConfig: testfixtures.WithPerQueueSchedulingLimiterConfig(10, 2, testfixtures.TestSchedulingConfig()), - Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), Jobs: armadaslices.Concatenate( testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 1), testfixtures.N32Cpu256GiJobs("A", testfixtures.PriorityClass0, 1), @@ -149,7 +149,7 @@ func TestQueueScheduler(t *testing.T) { testfixtures.TestSchedulingConfig(), ), Queues: testfixtures.SingleQueuePriorityOne("A"), - Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), Jobs: testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 32), ExpectedScheduledIndices: testfixtures.IntRange(0, 16), ExpectedNeverAttemptedIndices: testfixtures.IntRange(17, 31), @@ -165,7 +165,7 @@ func TestQueueScheduler(t *testing.T) { testfixtures.TestSchedulingConfig(), ), Queues: testfixtures.SingleQueuePriorityOne("A"), - Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), Jobs: armadaslices.Concatenate( testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 1), testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 2), @@ -201,20 +201,20 @@ func TestQueueScheduler(t *testing.T) { }, }, }, - Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), Jobs: testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 32), ExpectedScheduledIndices: testfixtures.IntRange(0, 15), }, "fairness two queues": { SchedulingConfig: testfixtures.TestSchedulingConfig(), - Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), Jobs: armadaslices.Concatenate(testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 32), testfixtures.N1Cpu4GiJobs("B", testfixtures.PriorityClass0, 32)), Queues: []*api.Queue{{Name: "A", PriorityFactor: 1.0}, {Name: "B", PriorityFactor: 1.0}}, ExpectedScheduledIndices: armadaslices.Concatenate(testfixtures.IntRange(0, 15), testfixtures.IntRange(32, 47)), }, "fairness three queues": { SchedulingConfig: testfixtures.TestSchedulingConfig(), - Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), Jobs: armadaslices.Concatenate( testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 32), testfixtures.N1Cpu4GiJobs("B", testfixtures.PriorityClass0, 32), @@ -229,7 +229,7 @@ func TestQueueScheduler(t *testing.T) { }, "weighted fairness two queues": { SchedulingConfig: testfixtures.TestSchedulingConfig(), - Nodes: testfixtures.N32CpuNodes(3, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(3, testfixtures.TestPriorities), Jobs: armadaslices.Concatenate( testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 96), testfixtures.N1Cpu4GiJobs("B", testfixtures.PriorityClass0, 96), @@ -242,7 +242,7 @@ func TestQueueScheduler(t *testing.T) { }, "weighted fairness three queues": { SchedulingConfig: testfixtures.TestSchedulingConfig(), - Nodes: testfixtures.N32CpuNodes(3, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(3, testfixtures.TestPriorities), Jobs: armadaslices.Concatenate( testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 96), testfixtures.N1Cpu4GiJobs("B", testfixtures.PriorityClass0, 96), @@ -257,7 +257,7 @@ func TestQueueScheduler(t *testing.T) { }, "fairness two queues with initial allocation": { SchedulingConfig: testfixtures.TestSchedulingConfig(), - Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), Jobs: armadaslices.Concatenate( testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 32), testfixtures.N1Cpu4GiJobs("B", testfixtures.PriorityClass0, 32), @@ -276,14 +276,10 @@ func TestQueueScheduler(t *testing.T) { }, "node with no available capacity": { SchedulingConfig: testfixtures.TestSchedulingConfig(), - Nodes: testfixtures.WithUsedResourcesNodes( + Nodes: testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{ - Resources: map[string]resource.Quantity{ - "cpu": resource.MustParse("32"), - }, - }, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + testfixtures.Cpu("32"), + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), Jobs: testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 1), Queues: testfixtures.SingleQueuePriorityOne("A"), @@ -291,14 +287,10 @@ func TestQueueScheduler(t *testing.T) { }, "node with some available capacity": { SchedulingConfig: testfixtures.TestSchedulingConfig(), - Nodes: testfixtures.WithUsedResourcesNodes( + Nodes: testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{ - Resources: map[string]resource.Quantity{ - "cpu": resource.MustParse("31"), - }, - }, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + testfixtures.Cpu("31"), + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), Jobs: testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 2), Queues: testfixtures.SingleQueuePriorityOne("A"), @@ -306,14 +298,10 @@ func TestQueueScheduler(t *testing.T) { }, "preempt used resources of lower-priority jobs": { SchedulingConfig: testfixtures.TestSchedulingConfig(), - Nodes: testfixtures.WithUsedResourcesNodes( + Nodes: testfixtures.ItWithUsedResourcesNodes( 0, - schedulerobjects.ResourceList{ - Resources: map[string]resource.Quantity{ - "cpu": resource.MustParse("32"), - }, - }, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + testfixtures.Cpu("32"), + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), Jobs: testfixtures.N32Cpu256GiJobs("A", testfixtures.PriorityClass1, 1), Queues: testfixtures.SingleQueuePriorityOne("A"), @@ -321,14 +309,14 @@ func TestQueueScheduler(t *testing.T) { }, "respect taints": { SchedulingConfig: testfixtures.TestSchedulingConfig(), - Nodes: testfixtures.NTainted32CpuNodes(1, testfixtures.TestPriorities), + Nodes: testfixtures.ItNTainted32CpuNodes(1, testfixtures.TestPriorities), Jobs: armadaslices.Concatenate(testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 1), testfixtures.N32Cpu256GiJobs("A", testfixtures.PriorityClass0, 1)), Queues: testfixtures.SingleQueuePriorityOne("A"), ExpectedScheduledIndices: []int{1}, }, "taints and tolerations": { SchedulingConfig: testfixtures.TestSchedulingConfig(), - Nodes: testfixtures.NTainted32CpuNodes(1, testfixtures.TestPriorities), + Nodes: testfixtures.ItNTainted32CpuNodes(1, testfixtures.TestPriorities), Jobs: armadaslices.Concatenate(testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 1), testfixtures.N32Cpu256GiJobs("A", testfixtures.PriorityClass0, 1)), Queues: testfixtures.SingleQueuePriorityOne("A"), ExpectedScheduledIndices: []int{1}, @@ -336,8 +324,11 @@ func TestQueueScheduler(t *testing.T) { "Node selector": { SchedulingConfig: testfixtures.TestSchedulingConfig(), Nodes: armadaslices.Concatenate( - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), - testfixtures.WithLabelsNodes(map[string]string{"foo": "foo"}, testfixtures.N32CpuNodes(1, testfixtures.TestPriorities)), + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), + testfixtures.TestNodeFactory.AddLabels( + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), + map[string]string{"foo": "foo"}, + ), ), Jobs: testfixtures.WithNodeSelectorJobs(map[string]string{"foo": "foo"}, testfixtures.N32Cpu256GiJobs("A", testfixtures.PriorityClass0, 2)), Queues: testfixtures.SingleQueuePriorityOne("A"), @@ -345,7 +336,7 @@ func TestQueueScheduler(t *testing.T) { }, "taints and tolerations (indexed)": { SchedulingConfig: testfixtures.WithIndexedTaintsConfig([]string{"largeJobsOnly"}, testfixtures.TestSchedulingConfig()), - Nodes: testfixtures.NTainted32CpuNodes(1, testfixtures.TestPriorities), + Nodes: testfixtures.ItNTainted32CpuNodes(1, testfixtures.TestPriorities), Jobs: armadaslices.Concatenate(testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 1), testfixtures.N32Cpu256GiJobs("A", testfixtures.PriorityClass0, 1)), Queues: testfixtures.SingleQueuePriorityOne("A"), ExpectedScheduledIndices: []int{1}, @@ -353,8 +344,11 @@ func TestQueueScheduler(t *testing.T) { "Node selector (indexed)": { SchedulingConfig: testfixtures.WithIndexedNodeLabelsConfig([]string{"foo"}, testfixtures.TestSchedulingConfig()), Nodes: append( - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), - testfixtures.WithLabelsNodes(map[string]string{"foo": "foo"}, testfixtures.N32CpuNodes(1, testfixtures.TestPriorities))..., + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), + testfixtures.TestNodeFactory.AddLabels( + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), + map[string]string{"foo": "foo"}, + )..., ), Jobs: testfixtures.WithNodeSelectorJobs(map[string]string{"foo": "foo"}, testfixtures.N32Cpu256GiJobs("A", testfixtures.PriorityClass0, 2)), Queues: testfixtures.SingleQueuePriorityOne("A"), @@ -362,7 +356,7 @@ func TestQueueScheduler(t *testing.T) { }, "MaxQueueLookback": { SchedulingConfig: testfixtures.WithMaxQueueLookbackConfig(3, testfixtures.TestSchedulingConfig()), - Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), Jobs: armadaslices.Concatenate( testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 1), testfixtures.N32Cpu256GiJobs("A", testfixtures.PriorityClass0, 3), @@ -374,14 +368,14 @@ func TestQueueScheduler(t *testing.T) { }, "gang success": { SchedulingConfig: testfixtures.TestSchedulingConfig(), - Nodes: testfixtures.N32CpuNodes(2, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(2, testfixtures.TestPriorities), Jobs: testfixtures.WithGangAnnotationsJobs(testfixtures.N32Cpu256GiJobs("A", testfixtures.PriorityClass0, 2)), Queues: testfixtures.SingleQueuePriorityOne("A"), ExpectedScheduledIndices: []int{0, 1}, }, "non-consecutive gang success": { SchedulingConfig: testfixtures.TestSchedulingConfig(), - Nodes: testfixtures.N32CpuNodes(3, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(3, testfixtures.TestPriorities), Jobs: armadaslices.Concatenate( testfixtures.WithAnnotationsJobs(map[string]string{ armadaconfiguration.GangIdAnnotation: "my-gang", @@ -400,14 +394,14 @@ func TestQueueScheduler(t *testing.T) { }, "gang failure": { SchedulingConfig: testfixtures.TestSchedulingConfig(), - Nodes: testfixtures.N32CpuNodes(2, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(2, testfixtures.TestPriorities), Jobs: testfixtures.WithGangAnnotationsJobs(testfixtures.N32Cpu256GiJobs("A", testfixtures.PriorityClass0, 3)), Queues: testfixtures.SingleQueuePriorityOne("A"), ExpectedScheduledIndices: nil, }, "non-consecutive gang failure": { SchedulingConfig: testfixtures.TestSchedulingConfig(), - Nodes: testfixtures.N32CpuNodes(2, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(2, testfixtures.TestPriorities), Jobs: armadaslices.Concatenate( testfixtures.WithAnnotationsJobs(map[string]string{ armadaconfiguration.GangIdAnnotation: "my-gang", @@ -426,7 +420,7 @@ func TestQueueScheduler(t *testing.T) { }, "job priority": { SchedulingConfig: testfixtures.TestSchedulingConfig(), - Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), Jobs: armadaslices.Concatenate( testfixtures.WithPriorityJobs(10, testfixtures.N32Cpu256GiJobs("A", testfixtures.PriorityClass0, 1)), testfixtures.WithPriorityJobs(1, testfixtures.N32Cpu256GiJobs("A", testfixtures.PriorityClass0, 1)), @@ -438,19 +432,19 @@ func TestQueueScheduler(t *testing.T) { "nodeAffinity node notIn": { SchedulingConfig: testfixtures.TestSchedulingConfig(), Nodes: armadaslices.Concatenate( - testfixtures.WithLabelsNodes( + testfixtures.TestNodeFactory.AddLabels( + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), map[string]string{"key": "val1"}, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithLabelsNodes( + testfixtures.TestNodeFactory.AddLabels( + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), map[string]string{"key": "val2"}, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.WithLabelsNodes( + testfixtures.TestNodeFactory.AddLabels( + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), map[string]string{"key": "val3"}, - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), ), - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), ), Jobs: armadaslices.Concatenate( testfixtures.WithNodeAffinityJobs( @@ -478,9 +472,7 @@ func TestQueueScheduler(t *testing.T) { require.NoError(t, err) txn := nodeDb.Txn(true) for _, node := range tc.Nodes { - dbNode, err := testfixtures.TestNodeFactory.FromSchedulerObjectsNode(node) - require.NoError(t, err) - err = nodeDb.CreateAndInsertWithJobDbJobsWithTxn(txn, nil, dbNode) + err = nodeDb.CreateAndInsertWithJobDbJobsWithTxn(txn, nil, node) require.NoError(t, err) } txn.Commit() diff --git a/internal/scheduler/testfixtures/testfixtures.go b/internal/scheduler/testfixtures/testfixtures.go index 5c66f044119..de8f5b530cd 100644 --- a/internal/scheduler/testfixtures/testfixtures.go +++ b/internal/scheduler/testfixtures/testfixtures.go @@ -1131,3 +1131,30 @@ func GetTestSupportedResourceTypes() []schedulerconfiguration.ResourceType { {Name: "nvidia.com/gpu", Resolution: resource.MustParse("1m")}, } } + +func Cpu(cpu string) internaltypes.ResourceList { + return TestResourceListFactory.FromNodeProto( + map[string]resource.Quantity{ + "cpu": resource.MustParse(cpu), + }, + ) +} + +func CpuMem(cpu string, memory string) internaltypes.ResourceList { + return TestResourceListFactory.FromNodeProto( + map[string]resource.Quantity{ + "cpu": resource.MustParse(cpu), + "memory": resource.MustParse(memory), + }, + ) +} + +func CpuMemGpu(cpu string, memory string, gpu string) internaltypes.ResourceList { + return TestResourceListFactory.FromNodeProto( + map[string]resource.Quantity{ + "cpu": resource.MustParse(cpu), + "memory": resource.MustParse(memory), + "nvidia.com/gpu": resource.MustParse(gpu), + }, + ) +} From fb91333e5e828b3f73c34167d3acb997742ccb2b Mon Sep 17 00:00:00 2001 From: Robert Smith <robertdavidsmith@yahoo.com> Date: Thu, 2 Jan 2025 17:45:01 +0000 Subject: [PATCH 2/5] tidy Signed-off-by: Robert Smith <robertdavidsmith@yahoo.com> --- .../scheduler/scheduling/preempting_queue_scheduler_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/internal/scheduler/scheduling/preempting_queue_scheduler_test.go b/internal/scheduler/scheduling/preempting_queue_scheduler_test.go index 003713801dd..68b19296c6f 100644 --- a/internal/scheduler/scheduling/preempting_queue_scheduler_test.go +++ b/internal/scheduler/scheduling/preempting_queue_scheduler_test.go @@ -1940,11 +1940,11 @@ func TestPreemptingQueueScheduler(t *testing.T) { ctx.FieldLogger = ctx.WithField("round", i) ctx.Infof("starting scheduling round %d", i) - jobsByNode := map[string][]*jobdb.Job{} + jobsByNodeId := map[string][]*jobdb.Job{} for _, job := range jobDbTxn.GetAll() { if job.LatestRun() != nil && !job.LatestRun().InTerminalState() { node := job.LatestRun().NodeId() - jobsByNode[node] = append(jobsByNode[node], job) + jobsByNodeId[node] = append(jobsByNodeId[node], job) } } @@ -1952,7 +1952,7 @@ func TestPreemptingQueueScheduler(t *testing.T) { require.NoError(t, err) nodeDbTxn := nodeDb.Txn(true) for _, node := range tc.Nodes { - err = nodeDb.CreateAndInsertWithJobDbJobsWithTxn(nodeDbTxn, jobsByNode[node.GetName()], node.DeepCopyNilKeys()) + err = nodeDb.CreateAndInsertWithJobDbJobsWithTxn(nodeDbTxn, jobsByNodeId[node.GetId()], node.DeepCopyNilKeys()) require.NoError(t, err) } nodeDbTxn.Commit() From ff24c3d3a2a9f048fb30405631c7543b87201c96 Mon Sep 17 00:00:00 2001 From: Robert Smith <robertdavidsmith@yahoo.com> Date: Fri, 3 Jan 2025 10:30:05 +0000 Subject: [PATCH 3/5] port more tests Signed-off-by: Robert Smith <robertdavidsmith@yahoo.com> --- .../internaltypes/resource_list_map_util.go | 1 + .../resource_list_map_util_test.go | 3 +- ..._driven_preempting_queue_scheduler_test.go | 31 +++++++++---------- 3 files changed, 17 insertions(+), 18 deletions(-) diff --git a/internal/scheduler/internaltypes/resource_list_map_util.go b/internal/scheduler/internaltypes/resource_list_map_util.go index b851e82c6cc..a26a1a80f6d 100644 --- a/internal/scheduler/internaltypes/resource_list_map_util.go +++ b/internal/scheduler/internaltypes/resource_list_map_util.go @@ -63,6 +63,7 @@ func NewAllocatableByPriorityAndResourceType(priorities []int32, rl ResourceList for _, priority := range priorities { result[priority] = rl } + result[EvictedPriority] = rl return result } diff --git a/internal/scheduler/internaltypes/resource_list_map_util_test.go b/internal/scheduler/internaltypes/resource_list_map_util_test.go index 85457c42a1a..5e2b61f729f 100644 --- a/internal/scheduler/internaltypes/resource_list_map_util_test.go +++ b/internal/scheduler/internaltypes/resource_list_map_util_test.go @@ -83,9 +83,10 @@ func TestNewAllocatableByPriorityAndResourceType(t *testing.T) { rl := testResourceList(factory, "2", "2Ki") result := NewAllocatableByPriorityAndResourceType([]int32{1, 2}, rl) - assert.Equal(t, 2, len(result)) + assert.Equal(t, 3, len(result)) assert.Equal(t, int64(2000), result[1].GetByNameZeroIfMissing("cpu")) assert.Equal(t, int64(2000), result[2].GetByNameZeroIfMissing("cpu")) + assert.Equal(t, int64(2000), result[EvictedPriority].GetByNameZeroIfMissing("cpu")) } func TestMarkAllocated(t *testing.T) { diff --git a/internal/scheduler/scheduling/market_driven_preempting_queue_scheduler_test.go b/internal/scheduler/scheduling/market_driven_preempting_queue_scheduler_test.go index 2a63f5ec6ea..4c5e8e1257a 100644 --- a/internal/scheduler/scheduling/market_driven_preempting_queue_scheduler_test.go +++ b/internal/scheduler/scheduling/market_driven_preempting_queue_scheduler_test.go @@ -19,7 +19,6 @@ import ( "github.com/armadaproject/armada/internal/scheduler/internaltypes" "github.com/armadaproject/armada/internal/scheduler/jobdb" "github.com/armadaproject/armada/internal/scheduler/nodedb" - "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" schedulerconstraints "github.com/armadaproject/armada/internal/scheduler/scheduling/constraints" "github.com/armadaproject/armada/internal/scheduler/scheduling/context" "github.com/armadaproject/armada/internal/scheduler/scheduling/fairness" @@ -45,7 +44,7 @@ func TestMarketDrivenPreemptingQueueScheduler(t *testing.T) { tests := map[string]struct { SchedulingConfig configuration.SchedulingConfig // Nodes to be considered by the scheduler. - Nodes []*schedulerobjects.Node + Nodes []*internaltypes.Node // Each item corresponds to a call to Reschedule(). Rounds []SchedulingRound // Map from queue to the priority factor associated with that queue. @@ -55,7 +54,7 @@ func TestMarketDrivenPreemptingQueueScheduler(t *testing.T) { }{ "three users, highest price jobs from single queue get on": { SchedulingConfig: testfixtures.TestSchedulingConfig(), - Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), Rounds: []SchedulingRound{ { JobsByQueue: map[string][]*jobdb.Job{ @@ -79,7 +78,7 @@ func TestMarketDrivenPreemptingQueueScheduler(t *testing.T) { }, "three users, highest price jobs between queues get on": { SchedulingConfig: testfixtures.TestSchedulingConfig(), - Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), Rounds: []SchedulingRound{ { JobsByQueue: map[string][]*jobdb.Job{ @@ -115,7 +114,7 @@ func TestMarketDrivenPreemptingQueueScheduler(t *testing.T) { }, "Two users, no preemption if price lower": { SchedulingConfig: testfixtures.TestSchedulingConfig(), - Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), Rounds: []SchedulingRound{ { JobsByQueue: map[string][]*jobdb.Job{ @@ -135,7 +134,7 @@ func TestMarketDrivenPreemptingQueueScheduler(t *testing.T) { }, "Two users, preemption if price higher": { SchedulingConfig: testfixtures.TestSchedulingConfig(), - Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), Rounds: []SchedulingRound{ { JobsByQueue: map[string][]*jobdb.Job{ @@ -163,7 +162,7 @@ func TestMarketDrivenPreemptingQueueScheduler(t *testing.T) { }, "Two users, partial preemption if price higher": { SchedulingConfig: testfixtures.TestSchedulingConfig(), - Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), Rounds: []SchedulingRound{ { JobsByQueue: map[string][]*jobdb.Job{ @@ -194,7 +193,7 @@ func TestMarketDrivenPreemptingQueueScheduler(t *testing.T) { }, "Self Preemption If Price Is Higher": { SchedulingConfig: testfixtures.TestSchedulingConfig(), - Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), Rounds: []SchedulingRound{ { JobsByQueue: map[string][]*jobdb.Job{ @@ -225,7 +224,7 @@ func TestMarketDrivenPreemptingQueueScheduler(t *testing.T) { }, "Two Users. Self preemption plus cross user preemption": { SchedulingConfig: testfixtures.TestSchedulingConfig(), - Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), Rounds: []SchedulingRound{ { JobsByQueue: map[string][]*jobdb.Job{ @@ -255,7 +254,7 @@ func TestMarketDrivenPreemptingQueueScheduler(t *testing.T) { }, "gang preemption": { SchedulingConfig: testfixtures.TestSchedulingConfig(), - Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Nodes: testfixtures.ItN32CpuNodes(1, testfixtures.TestPriorities), Rounds: []SchedulingRound{ { JobsByQueue: map[string][]*jobdb.Job{ @@ -355,11 +354,11 @@ func TestMarketDrivenPreemptingQueueScheduler(t *testing.T) { ctx.FieldLogger = ctx.WithField("round", i) ctx.Infof("starting scheduling round %d", i) - jobsByNode := map[string][]*jobdb.Job{} + jobsByNodeId := map[string][]*jobdb.Job{} for _, job := range jobDbTxn.GetAll() { if job.LatestRun() != nil && !job.LatestRun().InTerminalState() { - node := job.LatestRun().NodeId() - jobsByNode[node] = append(jobsByNode[node], job) + nodeId := job.LatestRun().NodeId() + jobsByNodeId[nodeId] = append(jobsByNodeId[nodeId], job) } } @@ -367,9 +366,7 @@ func TestMarketDrivenPreemptingQueueScheduler(t *testing.T) { require.NoError(t, err) nodeDbTxn := nodeDb.Txn(true) for _, node := range tc.Nodes { - dbNode, err := testfixtures.TestNodeFactory.FromSchedulerObjectsNode(node) - require.NoError(t, err) - err = nodeDb.CreateAndInsertWithJobDbJobsWithTxn(nodeDbTxn, jobsByNode[node.Name], dbNode) + err = nodeDb.CreateAndInsertWithJobDbJobsWithTxn(nodeDbTxn, jobsByNodeId[node.GetId()], node.DeepCopyNilKeys()) require.NoError(t, err) } nodeDbTxn.Commit() @@ -416,7 +413,7 @@ func TestMarketDrivenPreemptingQueueScheduler(t *testing.T) { } for idx, isCordoned := range cordonedNodes { if isCordoned { - node, err := nodeDb.GetNode(tc.Nodes[idx].Id) + node, err := nodeDb.GetNode(tc.Nodes[idx].GetId()) require.NoError(t, err) ctx.Infof("Cordoned node %s", node.GetId()) taints := append(slices.Clone(node.GetTaints()), internaltypes.UnschedulableTaint()) From a9d72c02d3c8fdf7bc3212e2634e6766464a8d05 Mon Sep 17 00:00:00 2001 From: Robert Smith <robertdavidsmith@yahoo.com> Date: Fri, 3 Jan 2025 10:44:37 +0000 Subject: [PATCH 4/5] progress Signed-off-by: Robert Smith <robertdavidsmith@yahoo.com> --- .../scheduling/scheduling_algo_test.go | 14 ++----- .../scheduler/testfixtures/testfixtures.go | 42 ------------------- 2 files changed, 3 insertions(+), 53 deletions(-) diff --git a/internal/scheduler/scheduling/scheduling_algo_test.go b/internal/scheduler/scheduling/scheduling_algo_test.go index 18e69ffd7f2..712eae3db7c 100644 --- a/internal/scheduler/scheduling/scheduling_algo_test.go +++ b/internal/scheduler/scheduling/scheduling_algo_test.go @@ -638,10 +638,10 @@ func BenchmarkNodeDbConstruction(b *testing.B) { numNodes := int(math.Pow10(e)) b.Run(fmt.Sprintf("%d nodes", numNodes), func(b *testing.B) { jobs := testfixtures.N1Cpu4GiJobs("queue-alice", testfixtures.PriorityClass0, 32*numNodes) - nodes := testfixtures.N32CpuNodes(numNodes, testfixtures.TestPriorities) + nodes := testfixtures.ItN32CpuNodes(numNodes, testfixtures.TestPriorities) for i, node := range nodes { for j := 32 * i; j < 32*(i+1); j++ { - jobs[j] = jobs[j].WithNewRun("executor-01", node.Id, node.Name, node.Pool, jobs[j].PriorityClass().Priority) + jobs[j] = jobs[j].WithNewRun("executor-01", node.GetId(), node.GetName(), node.GetPool(), jobs[j].PriorityClass().Priority) } } armadaslices.Shuffle(jobs) @@ -661,12 +661,6 @@ func BenchmarkNodeDbConstruction(b *testing.B) { require.NoError(b, err) b.StartTimer() - nodeFactory := internaltypes.NewNodeFactory( - schedulingConfig.IndexedTaints, - schedulingConfig.IndexedNodeLabels, - testfixtures.TestResourceListFactory, - ) - nodeDb, err := nodedb.NewNodeDb( schedulingConfig.PriorityClasses, schedulingConfig.IndexedResources, @@ -679,9 +673,7 @@ func BenchmarkNodeDbConstruction(b *testing.B) { dbNodes := []*internaltypes.Node{} for _, node := range nodes { - dbNode, err := nodeFactory.FromSchedulerObjectsNode(node) - require.NoError(b, err) - dbNodes = append(dbNodes, dbNode) + dbNodes = append(dbNodes, node.DeepCopyNilKeys()) } err = algo.populateNodeDb(nodeDb, jobs, []*jobdb.Job{}, dbNodes) diff --git a/internal/scheduler/testfixtures/testfixtures.go b/internal/scheduler/testfixtures/testfixtures.go index de8f5b530cd..5c2e9ab2949 100644 --- a/internal/scheduler/testfixtures/testfixtures.go +++ b/internal/scheduler/testfixtures/testfixtures.go @@ -290,13 +290,6 @@ func WithMaxQueueLookbackConfig(maxQueueLookback uint, config schedulerconfigura return config } -func WithUsedResourcesNodes(p int32, rl schedulerobjects.ResourceList, nodes []*schedulerobjects.Node) []*schedulerobjects.Node { - for _, node := range nodes { - schedulerobjects.AllocatableByPriorityAndResourceType(node.AllocatableByPriorityAndResource).MarkAllocated(p, rl) - } - return nodes -} - func ItWithUsedResourcesNodes(p int32, rl internaltypes.ResourceList, nodes []*internaltypes.Node) []*internaltypes.Node { for _, node := range nodes { internaltypes.MarkAllocated(node.AllocatableByPriority, p, rl) @@ -304,17 +297,6 @@ func ItWithUsedResourcesNodes(p int32, rl internaltypes.ResourceList, nodes []*i return nodes } -func WithLabelsNodes(labels map[string]string, nodes []*schedulerobjects.Node) []*schedulerobjects.Node { - for _, node := range nodes { - if node.Labels == nil { - node.Labels = maps.Clone(labels) - } else { - maps.Copy(node.Labels, labels) - } - } - return nodes -} - func ItWithNodeTypeNodes(nodeType *internaltypes.NodeType, nodes []*internaltypes.Node) []*internaltypes.Node { result := make([]*internaltypes.Node, len(nodes)) for i, node := range nodes { @@ -777,14 +759,6 @@ func TestCluster() []*schedulerobjects.Node { } } -func N32CpuNodes(n int, priorities []int32) []*schedulerobjects.Node { - rv := make([]*schedulerobjects.Node, n) - for i := 0; i < n; i++ { - rv[i] = Test32CpuNode(priorities) - } - return rv -} - func ItN32CpuNodes(n int, priorities []int32) []*internaltypes.Node { rv := make([]*internaltypes.Node, n) for i := 0; i < n; i++ { @@ -793,14 +767,6 @@ func ItN32CpuNodes(n int, priorities []int32) []*internaltypes.Node { return rv } -func NTainted32CpuNodes(n int, priorities []int32) []*schedulerobjects.Node { - rv := make([]*schedulerobjects.Node, n) - for i := 0; i < n; i++ { - rv[i] = TestTainted32CpuNode(priorities) - } - return rv -} - func ItNTainted32CpuNodes(n int, priorities []int32) []*internaltypes.Node { rv := make([]*internaltypes.Node, n) for i := 0; i < n; i++ { @@ -809,14 +775,6 @@ func ItNTainted32CpuNodes(n int, priorities []int32) []*internaltypes.Node { return rv } -func N8GpuNodes(n int, priorities []int32) []*schedulerobjects.Node { - rv := make([]*schedulerobjects.Node, n) - for i := 0; i < n; i++ { - rv[i] = Test8GpuNode(priorities) - } - return rv -} - func ItN8GpuNodes(n int, priorities []int32) []*internaltypes.Node { rv := make([]*internaltypes.Node, n) for i := 0; i < n; i++ { From b00d6035767cb9b92ca2d5aa7c032bd13982c4e5 Mon Sep 17 00:00:00 2001 From: Robert Smith <robertdavidsmith@yahoo.com> Date: Fri, 3 Jan 2025 11:17:20 +0000 Subject: [PATCH 5/5] always deep copy, tidy Signed-off-by: Robert Smith <robertdavidsmith@yahoo.com> --- .../scheduler/scheduling/gang_scheduler_test.go | 2 +- .../scheduling/preempting_queue_scheduler_test.go | 8 +++++--- .../scheduler/scheduling/queue_scheduler_test.go | 2 +- internal/scheduler/testfixtures/testfixtures.go | 13 ++----------- 4 files changed, 9 insertions(+), 16 deletions(-) diff --git a/internal/scheduler/scheduling/gang_scheduler_test.go b/internal/scheduler/scheduling/gang_scheduler_test.go index 8c075e2c956..f21338d20ca 100644 --- a/internal/scheduler/scheduling/gang_scheduler_test.go +++ b/internal/scheduler/scheduling/gang_scheduler_test.go @@ -582,7 +582,7 @@ func TestGangScheduler(t *testing.T) { require.NoError(t, err) txn := nodeDb.Txn(true) for _, node := range tc.Nodes { - err = nodeDb.CreateAndInsertWithJobDbJobsWithTxn(txn, nil, node) + err = nodeDb.CreateAndInsertWithJobDbJobsWithTxn(txn, nil, node.DeepCopyNilKeys()) require.NoError(t, err) } txn.Commit() diff --git a/internal/scheduler/scheduling/preempting_queue_scheduler_test.go b/internal/scheduler/scheduling/preempting_queue_scheduler_test.go index 68b19296c6f..775a5b66ebf 100644 --- a/internal/scheduler/scheduling/preempting_queue_scheduler_test.go +++ b/internal/scheduler/scheduling/preempting_queue_scheduler_test.go @@ -51,12 +51,14 @@ func TestEvictOversubscribed(t *testing.T) { stringInterner := stringinterner.New(1024) - node := testfixtures.ItTest32CpuNode(priorities) nodeDb, err := NewNodeDb(config, stringInterner) require.NoError(t, err) - require.NoError(t, err) nodeDbTxn := nodeDb.Txn(true) - err = nodeDb.CreateAndInsertWithJobDbJobsWithTxn(nodeDbTxn, jobs, node) + err = nodeDb.CreateAndInsertWithJobDbJobsWithTxn( + nodeDbTxn, + jobs, + testfixtures.ItTest32CpuNode(priorities), + ) require.NoError(t, err) jobDb := jobdb.NewJobDb(config.PriorityClasses, config.DefaultPriorityClassName, stringInterner, testfixtures.TestResourceListFactory) diff --git a/internal/scheduler/scheduling/queue_scheduler_test.go b/internal/scheduler/scheduling/queue_scheduler_test.go index 74a6b29fdfe..36b23d6a51b 100644 --- a/internal/scheduler/scheduling/queue_scheduler_test.go +++ b/internal/scheduler/scheduling/queue_scheduler_test.go @@ -472,7 +472,7 @@ func TestQueueScheduler(t *testing.T) { require.NoError(t, err) txn := nodeDb.Txn(true) for _, node := range tc.Nodes { - err = nodeDb.CreateAndInsertWithJobDbJobsWithTxn(txn, nil, node) + err = nodeDb.CreateAndInsertWithJobDbJobsWithTxn(txn, nil, node.DeepCopyNilKeys()) require.NoError(t, err) } txn.Commit() diff --git a/internal/scheduler/testfixtures/testfixtures.go b/internal/scheduler/testfixtures/testfixtures.go index 5c2e9ab2949..10469f318fc 100644 --- a/internal/scheduler/testfixtures/testfixtures.go +++ b/internal/scheduler/testfixtures/testfixtures.go @@ -1091,20 +1091,11 @@ func GetTestSupportedResourceTypes() []schedulerconfiguration.ResourceType { } func Cpu(cpu string) internaltypes.ResourceList { - return TestResourceListFactory.FromNodeProto( - map[string]resource.Quantity{ - "cpu": resource.MustParse(cpu), - }, - ) + return CpuMemGpu(cpu, "0", "0") } func CpuMem(cpu string, memory string) internaltypes.ResourceList { - return TestResourceListFactory.FromNodeProto( - map[string]resource.Quantity{ - "cpu": resource.MustParse(cpu), - "memory": resource.MustParse(memory), - }, - ) + return CpuMemGpu(cpu, memory, "0") } func CpuMemGpu(cpu string, memory string, gpu string) internaltypes.ResourceList {