Skip to content

Commit

Permalink
Move code in common.go to more sensible places (#3893)
Browse files Browse the repository at this point in the history
* Move common.go to more sensible places

Signed-off-by: Chris Martin <[email protected]>

* fix typo

Signed-off-by: Chris Martin <[email protected]>

---------

Signed-off-by: Chris Martin <[email protected]>
Co-authored-by: JamesMurkin <[email protected]>
  • Loading branch information
d80tb7 and JamesMurkin authored Sep 2, 2024
1 parent 2573a91 commit 41b3f79
Show file tree
Hide file tree
Showing 6 changed files with 147 additions and 167 deletions.
72 changes: 0 additions & 72 deletions internal/scheduler/common.go

This file was deleted.

92 changes: 0 additions & 92 deletions internal/scheduler/common_test.go

This file was deleted.

62 changes: 62 additions & 0 deletions internal/scheduler/context/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,12 @@ import (

"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"golang.org/x/exp/maps"
v1 "k8s.io/api/core/v1"

"github.com/armadaproject/armada/internal/common/armadacontext"
armadamaps "github.com/armadaproject/armada/internal/common/maps"
armadaslices "github.com/armadaproject/armada/internal/common/slices"
schedulerconfig "github.com/armadaproject/armada/internal/scheduler/configuration"
"github.com/armadaproject/armada/internal/scheduler/interfaces"
"github.com/armadaproject/armada/internal/scheduler/internaltypes"
Expand Down Expand Up @@ -187,3 +191,61 @@ func JobSchedulingContextFromJob(job *jobdb.Job) *JobSchedulingContext {
GangInfo: gangInfo,
}
}

// PrintJobSummary logs a summary of the job scheduling context
// It will log a high level summary at Info level, and a list of all queues + jobs affected at debug level
func PrintJobSummary(ctx *armadacontext.Context, prefix string, jctxs []*JobSchedulingContext) {
if len(jctxs) == 0 {
return
}
jobsByQueue := armadaslices.MapAndGroupByFuncs(
jctxs,
func(jctx *JobSchedulingContext) string {
return jctx.Job.Queue()
},
func(jctx *JobSchedulingContext) *jobdb.Job {
return jctx.Job
},
)
resourcesByQueue := armadamaps.MapValues(
jobsByQueue,
func(jobs []*jobdb.Job) schedulerobjects.ResourceList {
rv := schedulerobjects.NewResourceListWithDefaultSize()
for _, job := range jobs {
rv.AddV1ResourceList(job.ResourceRequirements().Requests)
}
return rv
},
)
jobCountPerQueue := armadamaps.MapValues(
jobsByQueue,
func(jobs []*jobdb.Job) int {
return len(jobs)
},
)
jobIdsByQueue := armadamaps.MapValues(
jobsByQueue,
func(jobs []*jobdb.Job) []string {
rv := make([]string, len(jobs))
for i, job := range jobs {
rv[i] = job.Id()
}
return rv
},
)
summary := fmt.Sprintf(
"affected queues %v; resources %v; jobs per queue %v",
maps.Keys(jobsByQueue),
armadamaps.MapValues(
resourcesByQueue,
func(rl schedulerobjects.ResourceList) string {
return rl.CompactString()
},
),
jobCountPerQueue,
)
verbose := fmt.Sprintf("affected jobs %v", jobIdsByQueue)

ctx.Infof("%s %s", prefix, summary)
ctx.Debugf("%s %s", prefix, verbose)
}
4 changes: 2 additions & 2 deletions internal/scheduler/preempting_queue_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,8 @@ func (sch *PreemptingQueueScheduler) Schedule(ctx *armadacontext.Context) (*sche
}
ctx.WithField("stage", "scheduling-algo").Infof("Finished unbinding preempted and evicted jobs")

PrintJobSummary(ctx, "Preempting running jobs;", preemptedJobs)
PrintJobSummary(ctx, "Scheduling new jobs;", scheduledJobs)
schedulercontext.PrintJobSummary(ctx, "Preempting running jobs;", preemptedJobs)
schedulercontext.PrintJobSummary(ctx, "Scheduling new jobs;", scheduledJobs)
// TODO: Show failed jobs.

if sch.enableAssertions {
Expand Down
2 changes: 1 addition & 1 deletion internal/scheduler/queue/queue_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func TestFetch(t *testing.T) {
{Name: "testQueue2"},
},
},
"Immediate Steam Error": {
"Immediate Stream Error": {
queues: []*api.Queue{},
streamError: true,
},
Expand Down
82 changes: 82 additions & 0 deletions internal/scheduler/schedulerobjects/resourcelist_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package schedulerobjects

import (
"math"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -796,6 +797,87 @@ func TestV1ResourceListConversion(t *testing.T) {
assert.True(t, maps.Equal(v1rlCopy, v1rl))
}

func TestResourceListAsWeightedMillis(t *testing.T) {
tests := map[string]struct {
rl ResourceList
weights map[string]float64
expected int64
}{
"default": {
rl: ResourceList{
Resources: map[string]resource.Quantity{
"foo": resource.MustParse("2"),
"bar": resource.MustParse("10Gi"),
"baz": resource.MustParse("1"),
},
},
weights: map[string]float64{
"foo": 1,
"bar": 0.1,
"baz": 10,
},
expected: (1 * 2 * 1000) + (1 * 1000 * 1024 * 1024 * 1024) + (10 * 1 * 1000),
},
"zeroes": {
rl: ResourceList{
Resources: map[string]resource.Quantity{
"foo": resource.MustParse("0"),
"bar": resource.MustParse("1"),
"baz": resource.MustParse("2"),
},
},
weights: map[string]float64{
"foo": 1,
"bar": 0,
},
expected: 0,
},
"1Pi": {
rl: ResourceList{
Resources: map[string]resource.Quantity{
"foo": resource.MustParse("1Pi"),
},
},
weights: map[string]float64{
"foo": 1,
},
expected: int64(math.Pow(1024, 5)) * 1000,
},
"rounding": {
rl: ResourceList{
Resources: map[string]resource.Quantity{
"foo": resource.MustParse("1"),
},
},
weights: map[string]float64{
"foo": 0.3006,
},
expected: 301,
},
}
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
assert.Equal(t, tc.expected, tc.rl.AsWeightedMillis(tc.weights))
})
}
}

func BenchmarkResourceListAsWeightedMillis(b *testing.B) {
rl := NewResourceList(3)
rl.Set("cpu", resource.MustParse("2"))
rl.Set("memory", resource.MustParse("10Gi"))
rl.Set("nvidia.com/gpu", resource.MustParse("1"))
weights := map[string]float64{
"cpu": 1,
"memory": 0.1,
"nvidia.com/gpu": 10,
}
b.ResetTimer()
for n := 0; n < b.N; n++ {
rl.AsWeightedMillis(weights)
}
}

func BenchmarkResourceListZeroAdd(b *testing.B) {
rla := NewResourceList(3)
rlb := NewResourceList(3)
Expand Down

0 comments on commit 41b3f79

Please sign in to comment.