Skip to content

Commit

Permalink
Remove the config to disable the scheduling key optimisation (#3960)
Browse files Browse the repository at this point in the history
* Remove the config to disable the scheduling key optimisation

Signed-off-by: Chris Martin <[email protected]>

* fix mere conflicts

Signed-off-by: Chris Martin <[email protected]>

---------

Signed-off-by: Chris Martin <[email protected]>
  • Loading branch information
d80tb7 authored Sep 23, 2024
1 parent c3d5faf commit cf9122e
Show file tree
Hide file tree
Showing 8 changed files with 14 additions and 34 deletions.
1 change: 0 additions & 1 deletion config/scheduler/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,5 @@ scheduling:
resolution: "1Gi"
executorTimeout: "10m"
maxUnacknowledgedJobsPerExecutor: 2500
alwaysAttemptScheduling: false
executorUpdateFrequency: "60s"

2 changes: 0 additions & 2 deletions internal/scheduler/configuration/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,8 +227,6 @@ type SchedulingConfig struct {
// Maximum number of jobs that can be assigned to a executor but not yet acknowledged, before
// the scheduler is excluded from consideration by the scheduler.
MaxUnacknowledgedJobsPerExecutor uint
// If true, do not during scheduling skip jobs with requirements known to be impossible to meet.
AlwaysAttemptScheduling bool
// The frequency at which the scheduler updates the cluster state.
ExecutorUpdateFrequency time.Duration
// Defines the order in which pools will be scheduled. Higher priority pools will be scheduled first
Expand Down
14 changes: 6 additions & 8 deletions internal/scheduler/gang_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,17 @@ func NewGangScheduler(
constraints schedulerconstraints.SchedulingConstraints,
floatingResourceTypes *floatingresources.FloatingResourceTypes,
nodeDb *nodedb.NodeDb,
skipUnsuccessfulSchedulingKeyCheck bool,
) (*GangScheduler, error) {
return &GangScheduler{
constraints: constraints,
floatingResourceTypes: floatingResourceTypes,
schedulingContext: sctx,
nodeDb: nodeDb,
constraints: constraints,
floatingResourceTypes: floatingResourceTypes,
schedulingContext: sctx,
nodeDb: nodeDb,
skipUnsuccessfulSchedulingKeyCheck: skipUnsuccessfulSchedulingKeyCheck,
}, nil
}

func (sch *GangScheduler) SkipUnsuccessfulSchedulingKeyCheck() {
sch.skipUnsuccessfulSchedulingKeyCheck = true
}

func (sch *GangScheduler) updateGangSchedulingContextOnSuccess(gctx *schedulercontext.GangSchedulingContext, gangAddedToSchedulingContext bool) error {
if !gangAddedToSchedulingContext {
// Nothing to do.
Expand Down
2 changes: 1 addition & 1 deletion internal/scheduler/gang_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -643,7 +643,7 @@ func TestGangScheduler(t *testing.T) {
constraints := schedulerconstraints.NewSchedulingConstraints("pool", tc.TotalResources, tc.SchedulingConfig, nil, map[string]bool{})
floatingResourceTypes, err := floatingresources.NewFloatingResourceTypes(tc.SchedulingConfig.ExperimentalFloatingResources)
require.NoError(t, err)
sch, err := NewGangScheduler(sctx, constraints, floatingResourceTypes, nodeDb)
sch, err := NewGangScheduler(sctx, constraints, floatingResourceTypes, nodeDb, false)
require.NoError(t, err)

var actualScheduledIndices []int
Expand Down
16 changes: 4 additions & 12 deletions internal/scheduler/preempting_queue_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@ type PreemptingQueueScheduler struct {
jobIdsByGangId map[string]map[string]bool
// Maps job ids of gang jobs to the id of that gang.
gangIdByJobId map[string]string
// If true, the unsuccessfulSchedulingKeys check of gangScheduler is omitted.
skipUnsuccessfulSchedulingKeyCheck bool
}

func NewPreemptingQueueScheduler(
Expand Down Expand Up @@ -83,10 +81,6 @@ func NewPreemptingQueueScheduler(
}
}

func (sch *PreemptingQueueScheduler) SkipUnsuccessfulSchedulingKeyCheck() {
sch.skipUnsuccessfulSchedulingKeyCheck = true
}

// Schedule
// - preempts jobs belonging to queues with total allocation above their fair share and
// - schedules new jobs belonging to queues with total allocation less than their fair share.
Expand Down Expand Up @@ -145,6 +139,7 @@ func (sch *PreemptingQueueScheduler) Schedule(ctx *armadacontext.Context) (*sche
armadacontext.WithLogField(ctx, "stage", "re-schedule after balancing eviction"),
inMemoryJobRepo,
sch.jobRepo,
false,
)
if err != nil {
return nil, err
Expand Down Expand Up @@ -191,14 +186,13 @@ func (sch *PreemptingQueueScheduler) Schedule(ctx *armadacontext.Context) (*sche
// Re-schedule evicted jobs/schedule new jobs.
// Only necessary if a non-zero number of jobs were evicted.
if len(evictorResult.EvictedJctxsByJobId) > 0 {
// Since no new jobs are considered in this round, the scheduling key check brings no benefit.
sch.SkipUnsuccessfulSchedulingKeyCheck()
ctx.WithField("stage", "scheduling-algo").Info("Performing second scheduling ")
schedulerResult, err = sch.schedule(
armadacontext.WithLogField(ctx, "stage", "schedule after oversubscribed eviction"),
inMemoryJobRepo,
// Only evicted jobs should be scheduled in this round.
nil,
true, // Since no new jobs are considered in this round, the scheduling key check brings no benefit.
)
if err != nil {
return nil, err
Expand Down Expand Up @@ -501,7 +495,7 @@ func addEvictedJobsToNodeDb(_ *armadacontext.Context, sctx *schedulercontext.Sch
return nil
}

func (sch *PreemptingQueueScheduler) schedule(ctx *armadacontext.Context, inMemoryJobRepo *InMemoryJobRepository, jobRepo JobRepository) (*schedulerresult.SchedulerResult, error) {
func (sch *PreemptingQueueScheduler) schedule(ctx *armadacontext.Context, inMemoryJobRepo *InMemoryJobRepository, jobRepo JobRepository, skipUnsuccessfulSchedulingKeyCheck bool) (*schedulerresult.SchedulerResult, error) {
jobIteratorByQueue := make(map[string]JobContextIterator)
for _, qctx := range sch.schedulingContext.QueueSchedulingContexts {
evictedIt := inMemoryJobRepo.GetJobIterator(qctx.Queue)
Expand All @@ -522,13 +516,11 @@ func (sch *PreemptingQueueScheduler) schedule(ctx *armadacontext.Context, inMemo
sch.floatingResourceTypes,
sch.nodeDb,
jobIteratorByQueue,
skipUnsuccessfulSchedulingKeyCheck,
)
if err != nil {
return nil, err
}
if sch.skipUnsuccessfulSchedulingKeyCheck {
sched.SkipUnsuccessfulSchedulingKeyCheck()
}
result, err := sched.Schedule(ctx)
if err != nil {
return nil, err
Expand Down
7 changes: 2 additions & 5 deletions internal/scheduler/queue_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,14 @@ func NewQueueScheduler(
floatingResourceTypes *floatingresources.FloatingResourceTypes,
nodeDb *nodedb.NodeDb,
jobIteratorByQueue map[string]JobContextIterator,
skipUnsuccessfulSchedulingKeyCheck bool,
) (*QueueScheduler, error) {
for queue := range jobIteratorByQueue {
if _, ok := sctx.QueueSchedulingContexts[queue]; !ok {
return nil, errors.Errorf("no scheduling context for queue %s", queue)
}
}
gangScheduler, err := NewGangScheduler(sctx, constraints, floatingResourceTypes, nodeDb)
gangScheduler, err := NewGangScheduler(sctx, constraints, floatingResourceTypes, nodeDb, skipUnsuccessfulSchedulingKeyCheck)
if err != nil {
return nil, err
}
Expand All @@ -58,10 +59,6 @@ func NewQueueScheduler(
}, nil
}

func (sch *QueueScheduler) SkipUnsuccessfulSchedulingKeyCheck() {
sch.gangScheduler.SkipUnsuccessfulSchedulingKeyCheck()
}

func (sch *QueueScheduler) Schedule(ctx *armadacontext.Context) (*schedulerresult.SchedulerResult, error) {
var scheduledJobs []*schedulercontext.JobSchedulingContext

Expand Down
2 changes: 1 addition & 1 deletion internal/scheduler/queue_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,7 @@ func TestQueueScheduler(t *testing.T) {
it := jobRepo.GetJobIterator(q.Name)
jobIteratorByQueue[q.Name] = it
}
sch, err := NewQueueScheduler(sctx, constraints, testfixtures.TestEmptyFloatingResources, nodeDb, jobIteratorByQueue)
sch, err := NewQueueScheduler(sctx, constraints, testfixtures.TestEmptyFloatingResources, nodeDb, jobIteratorByQueue, false)
require.NoError(t, err)

result, err := sch.Schedule(armadacontext.Background())
Expand Down
4 changes: 0 additions & 4 deletions internal/scheduler/scheduling_algo.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,10 +484,6 @@ func (l *FairSchedulingAlgo) schedulePool(
fsctx.jobIdsByGangId,
fsctx.gangIdByJobId,
)
if l.schedulingConfig.AlwaysAttemptScheduling {
scheduler.SkipUnsuccessfulSchedulingKeyCheck()
}

result, err := scheduler.Schedule(ctx)
if err != nil {
return nil, nil, err
Expand Down

0 comments on commit cf9122e

Please sign in to comment.