diff --git a/pkg/cache/cohort_snapshot.go b/pkg/cache/cohort_snapshot.go index 43cdfadbd2..a381338ae2 100644 --- a/pkg/cache/cohort_snapshot.go +++ b/pkg/cache/cohort_snapshot.go @@ -29,6 +29,38 @@ func (c *CohortSnapshot) GetName() string { return c.Name } +// Root returns the root of the Cohort Tree. It expects that no cycles +// exist in the Cohort graph. +func (c *CohortSnapshot) Root() *CohortSnapshot { + if !c.HasParent() { + return c + } + return c.Parent().Root() +} + +// SubtreeClusterQueues returns all of the ClusterQueues in the +// subtree starting at the given Cohort. It expects that no cycles +// exist in the Cohort graph. +func (c *CohortSnapshot) SubtreeClusterQueues() []*ClusterQueueSnapshot { + return c.subtreeClusterQueuesHelper(make([]*ClusterQueueSnapshot, 0, c.subtreeClusterQueueCount())) +} + +func (c *CohortSnapshot) subtreeClusterQueuesHelper(cqs []*ClusterQueueSnapshot) []*ClusterQueueSnapshot { + cqs = append(cqs, c.ChildCQs()...) + for _, cohort := range c.ChildCohorts() { + cqs = cohort.subtreeClusterQueuesHelper(cqs) + } + return cqs +} + +func (c *CohortSnapshot) subtreeClusterQueueCount() int { + count := len(c.ChildCQs()) + for _, cohort := range c.ChildCohorts() { + count += cohort.subtreeClusterQueueCount() + } + return count +} + // The methods below implement hierarchicalResourceNode interface. func (c *CohortSnapshot) getResourceNode() ResourceNode { diff --git a/pkg/scheduler/preemption/preemption.go b/pkg/scheduler/preemption/preemption.go index a3451eac1e..ce35ab633c 100644 --- a/pkg/scheduler/preemption/preemption.go +++ b/pkg/scheduler/preemption/preemption.go @@ -513,7 +513,7 @@ func (p *Preemptor) findCandidates(wl *kueue.Workload, cq *cache.ClusterQueueSna if cq.HasParent() && cq.Preemption.ReclaimWithinCohort != kueue.PreemptionPolicyNever { onlyLowerPriority := cq.Preemption.ReclaimWithinCohort != kueue.PreemptionPolicyAny - for _, cohortCQ := range cq.Parent().ChildCQs() { + for _, cohortCQ := range cq.Parent().Root().SubtreeClusterQueues() { if cq == cohortCQ || !cqIsBorrowing(cohortCQ, frsNeedPreemption) { // Can't reclaim quota from itself or ClusterQueues that are not borrowing. continue diff --git a/pkg/scheduler/preemption/preemption_test.go b/pkg/scheduler/preemption/preemption_test.go index 0e92097f05..c3fc1eeeb3 100644 --- a/pkg/scheduler/preemption/preemption_test.go +++ b/pkg/scheduler/preemption/preemption_test.go @@ -37,6 +37,7 @@ import ( "k8s.io/utils/ptr" config "sigs.k8s.io/kueue/apis/config/v1beta1" + kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1" kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" "sigs.k8s.io/kueue/pkg/cache" "sigs.k8s.io/kueue/pkg/constants" @@ -71,7 +72,7 @@ func TestPreemption(t *testing.T) { utiltesting.MakeResourceFlavor("alpha").Obj(), utiltesting.MakeResourceFlavor("beta").Obj(), } - clusterQueues := []*kueue.ClusterQueue{ + defaultClusterQueues := []*kueue.ClusterQueue{ utiltesting.MakeClusterQueue("standalone"). ResourceGroup( *utiltesting.MakeFlavorQuotas("default"). @@ -279,6 +280,8 @@ func TestPreemption(t *testing.T) { Obj(), } cases := map[string]struct { + clusterQueues []*kueue.ClusterQueue + cohorts []*kueuealpha.Cohort admitted []kueue.Workload incoming *kueue.Workload targetCQ string @@ -287,6 +290,7 @@ func TestPreemption(t *testing.T) { disableLendingLimit bool }{ "preempt lowest priority": { + clusterQueues: defaultClusterQueues, admitted: []kueue.Workload{ *utiltesting.MakeWorkload("low", ""). Priority(-1). @@ -317,6 +321,7 @@ func TestPreemption(t *testing.T) { wantPreempted: sets.New(targetKeyReason("/low", kueue.InClusterQueueReason)), }, "preempt multiple": { + clusterQueues: defaultClusterQueues, admitted: []kueue.Workload{ *utiltesting.MakeWorkload("low", ""). Priority(-1). @@ -348,6 +353,7 @@ func TestPreemption(t *testing.T) { }, "no preemption for low priority": { + clusterQueues: defaultClusterQueues, admitted: []kueue.Workload{ *utiltesting.MakeWorkload("low", ""). Priority(-1). @@ -372,6 +378,7 @@ func TestPreemption(t *testing.T) { }), }, "not enough low priority workloads": { + clusterQueues: defaultClusterQueues, admitted: []kueue.Workload{ *utiltesting.MakeWorkload("low", ""). Priority(-1). @@ -395,6 +402,7 @@ func TestPreemption(t *testing.T) { }), }, "some free quota, preempt low priority": { + clusterQueues: defaultClusterQueues, admitted: []kueue.Workload{ *utiltesting.MakeWorkload("low", ""). Priority(-1). @@ -425,6 +433,7 @@ func TestPreemption(t *testing.T) { wantPreempted: sets.New(targetKeyReason("/low", kueue.InClusterQueueReason)), }, "minimal set excludes low priority": { + clusterQueues: defaultClusterQueues, admitted: []kueue.Workload{ *utiltesting.MakeWorkload("low", ""). Priority(-1). @@ -455,6 +464,7 @@ func TestPreemption(t *testing.T) { wantPreempted: sets.New(targetKeyReason("/mid", kueue.InClusterQueueReason)), }, "only preempt workloads using the chosen flavor": { + clusterQueues: defaultClusterQueues, admitted: []kueue.Workload{ *utiltesting.MakeWorkload("low", ""). Priority(-1). @@ -490,6 +500,7 @@ func TestPreemption(t *testing.T) { wantPreempted: sets.New(targetKeyReason("/mid", kueue.InClusterQueueReason)), }, "reclaim quota from borrower": { + clusterQueues: defaultClusterQueues, admitted: []kueue.Workload{ *utiltesting.MakeWorkload("c1-low", ""). Priority(-1). @@ -520,6 +531,7 @@ func TestPreemption(t *testing.T) { wantPreempted: sets.New(targetKeyReason("/c2-mid", kueue.InCohortReclamationReason)), }, "reclaim quota if workload requests 0 resources for a resource at nominal quota": { + clusterQueues: defaultClusterQueues, admitted: []kueue.Workload{ *utiltesting.MakeWorkload("c1-low", ""). Priority(-1). @@ -556,6 +568,7 @@ func TestPreemption(t *testing.T) { wantPreempted: sets.New(targetKeyReason("/c2-mid", kueue.InCohortReclamationReason)), }, "no workloads borrowing": { + clusterQueues: defaultClusterQueues, admitted: []kueue.Workload{ *utiltesting.MakeWorkload("c1-high", ""). Priority(1). @@ -581,6 +594,7 @@ func TestPreemption(t *testing.T) { }), }, "not enough workloads borrowing": { + clusterQueues: defaultClusterQueues, admitted: []kueue.Workload{ *utiltesting.MakeWorkload("c1-high", ""). Priority(1). @@ -611,6 +625,7 @@ func TestPreemption(t *testing.T) { }), }, "preempting locally and borrowing other resources in cohort, without cohort candidates": { + clusterQueues: defaultClusterQueues, admitted: []kueue.Workload{ *utiltesting.MakeWorkload("c1-low", ""). Priority(-1). @@ -647,6 +662,7 @@ func TestPreemption(t *testing.T) { wantPreempted: sets.New(targetKeyReason("/c1-low", kueue.InClusterQueueReason)), }, "preempting locally and borrowing same resource in cohort": { + clusterQueues: defaultClusterQueues, admitted: []kueue.Workload{ *utiltesting.MakeWorkload("c1-med", ""). Priority(0). @@ -678,6 +694,7 @@ func TestPreemption(t *testing.T) { wantPreempted: sets.New(targetKeyReason("/c1-low", kueue.InClusterQueueReason)), }, "preempting locally and borrowing same resource in cohort; no borrowing limit in the cohort": { + clusterQueues: defaultClusterQueues, admitted: []kueue.Workload{ *utiltesting.MakeWorkload("d1-med", ""). Priority(0). @@ -709,6 +726,7 @@ func TestPreemption(t *testing.T) { wantPreempted: sets.New(targetKeyReason("/d1-low", kueue.InClusterQueueReason)), }, "preempting locally and borrowing other resources in cohort, with cohort candidates": { + clusterQueues: defaultClusterQueues, admitted: []kueue.Workload{ *utiltesting.MakeWorkload("c1-med", ""). Priority(0). @@ -750,6 +768,7 @@ func TestPreemption(t *testing.T) { wantPreempted: sets.New(targetKeyReason("/c1-med", kueue.InClusterQueueReason)), }, "preempting locally and not borrowing same resource in 1-queue cohort": { + clusterQueues: defaultClusterQueues, admitted: []kueue.Workload{ *utiltesting.MakeWorkload("l1-med", ""). Priority(0). @@ -776,6 +795,7 @@ func TestPreemption(t *testing.T) { wantPreempted: sets.New(targetKeyReason("/l1-med", kueue.InClusterQueueReason)), }, "do not reclaim borrowed quota from same priority for withinCohort=ReclaimFromLowerPriority": { + clusterQueues: defaultClusterQueues, admitted: []kueue.Workload{ *utiltesting.MakeWorkload("c1", ""). Request(corev1.ResourceCPU, "2"). @@ -802,6 +822,7 @@ func TestPreemption(t *testing.T) { }), }, "reclaim borrowed quota from same priority for withinCohort=ReclaimFromAny": { + clusterQueues: defaultClusterQueues, admitted: []kueue.Workload{ *utiltesting.MakeWorkload("c1-1", ""). Request(corev1.ResourceCPU, "4"). @@ -830,6 +851,7 @@ func TestPreemption(t *testing.T) { wantPreempted: sets.New(targetKeyReason("/c1-1", kueue.InCohortReclamationReason)), }, "preempt from all ClusterQueues in cohort": { + clusterQueues: defaultClusterQueues, admitted: []kueue.Workload{ *utiltesting.MakeWorkload("c1-low", ""). Priority(-1). @@ -863,6 +885,7 @@ func TestPreemption(t *testing.T) { wantPreempted: sets.New(targetKeyReason("/c1-low", kueue.InClusterQueueReason), targetKeyReason("/c2-low", kueue.InCohortReclamationReason)), }, "can't preempt workloads in ClusterQueue for withinClusterQueue=Never": { + clusterQueues: defaultClusterQueues, admitted: []kueue.Workload{ *utiltesting.MakeWorkload("c2-low", ""). Priority(-1). @@ -883,6 +906,7 @@ func TestPreemption(t *testing.T) { }), }, "each podset preempts a different flavor": { + clusterQueues: defaultClusterQueues, admitted: []kueue.Workload{ *utiltesting.MakeWorkload("low-alpha", ""). Priority(-1). @@ -931,6 +955,7 @@ func TestPreemption(t *testing.T) { wantPreempted: sets.New(targetKeyReason("/low-alpha", kueue.InClusterQueueReason), targetKeyReason("/low-beta", kueue.InClusterQueueReason)), }, "preempt newer workloads with the same priority": { + clusterQueues: defaultClusterQueues, admitted: []kueue.Workload{ *utiltesting.MakeWorkload("wl1", ""). Priority(2). @@ -980,6 +1005,7 @@ func TestPreemption(t *testing.T) { wantPreempted: sets.New(targetKeyReason("/wl2", kueue.InClusterQueueReason)), }, "use BorrowWithinCohort; allow preempting a lower-priority workload from another ClusterQueue while borrowing": { + clusterQueues: defaultClusterQueues, admitted: []kueue.Workload{ *utiltesting.MakeWorkload("a_best_effort_low", ""). Priority(-1). @@ -1005,6 +1031,7 @@ func TestPreemption(t *testing.T) { wantPreempted: sets.New(targetKeyReason("/a_best_effort_low", kueue.InCohortReclaimWhileBorrowingReason)), }, "use BorrowWithinCohort; don't allow preempting a lower-priority workload with priority above MaxPriorityThreshold, if borrowing is required even after the preemption": { + clusterQueues: defaultClusterQueues, admitted: []kueue.Workload{ *utiltesting.MakeWorkload("b_standard", ""). Priority(1). @@ -1025,6 +1052,7 @@ func TestPreemption(t *testing.T) { }), }, "use BorrowWithinCohort; allow preempting a lower-priority workload with priority above MaxPriorityThreshold, if borrowing is not required after the preemption": { + clusterQueues: defaultClusterQueues, admitted: []kueue.Workload{ // this admitted workload consumes all resources so it needs to be preempted to run a new workload *utiltesting.MakeWorkload("b_standard", ""). @@ -1048,6 +1076,7 @@ func TestPreemption(t *testing.T) { wantPreempted: sets.New(targetKeyReason("/b_standard", kueue.InCohortReclamationReason)), }, "use BorrowWithinCohort; don't allow for preemption of lower-priority workload from the same ClusterQueue": { + clusterQueues: defaultClusterQueues, admitted: []kueue.Workload{ *utiltesting.MakeWorkload("a_standard", ""). Priority(1). @@ -1068,6 +1097,7 @@ func TestPreemption(t *testing.T) { }), }, "use BorrowWithinCohort; only preempt from CQ if no workloads below threshold and already above nominal": { + clusterQueues: defaultClusterQueues, admitted: []kueue.Workload{ *utiltesting.MakeWorkload("a_standard_1", ""). Priority(1). @@ -1104,6 +1134,7 @@ func TestPreemption(t *testing.T) { wantPreempted: sets.New(targetKeyReason("/b_standard_1", kueue.InClusterQueueReason)), }, "use BorrowWithinCohort; preempt from CQ and from other CQs with workloads below threshold": { + clusterQueues: defaultClusterQueues, admitted: []kueue.Workload{ *utiltesting.MakeWorkload("b_standard_high", ""). Priority(2). @@ -1140,6 +1171,7 @@ func TestPreemption(t *testing.T) { wantPreempted: sets.New(targetKeyReason("/b_standard_mid", kueue.InClusterQueueReason), targetKeyReason("/a_best_effort_lower", kueue.InCohortReclaimWhileBorrowingReason)), }, "reclaim quota from lender": { + clusterQueues: defaultClusterQueues, admitted: []kueue.Workload{ *utiltesting.MakeWorkload("lend1-low", ""). Priority(-1). @@ -1170,6 +1202,7 @@ func TestPreemption(t *testing.T) { wantPreempted: sets.New(targetKeyReason("/lend2-mid", kueue.InCohortReclamationReason)), }, "preempt from all ClusterQueues in cohort-lend": { + clusterQueues: defaultClusterQueues, admitted: []kueue.Workload{ *utiltesting.MakeWorkload("lend1-low", ""). Priority(-1). @@ -1203,6 +1236,7 @@ func TestPreemption(t *testing.T) { wantPreempted: sets.New(targetKeyReason("/lend1-low", kueue.InClusterQueueReason), targetKeyReason("/lend2-low", kueue.InCohortReclamationReason)), }, "cannot preempt from other ClusterQueues if exceeds requestable quota including lending limit": { + clusterQueues: defaultClusterQueues, admitted: []kueue.Workload{ *utiltesting.MakeWorkload("lend2-low", ""). Priority(-1). @@ -1223,6 +1257,7 @@ func TestPreemption(t *testing.T) { wantPreempted: nil, }, "preemptions from cq when target queue is exhausted for the single requested resource": { + clusterQueues: defaultClusterQueues, admitted: []kueue.Workload{ *utiltesting.MakeWorkload("a1", ""). Priority(-2). @@ -1269,6 +1304,7 @@ func TestPreemption(t *testing.T) { wantPreempted: sets.New(targetKeyReason("/a1", kueue.InClusterQueueReason), targetKeyReason("/a2", kueue.InClusterQueueReason)), }, "preemptions from cq when target queue is exhausted for two requested resources": { + clusterQueues: defaultClusterQueues, admitted: []kueue.Workload{ *utiltesting.MakeWorkload("a1", ""). Priority(-2). @@ -1326,6 +1362,7 @@ func TestPreemption(t *testing.T) { wantPreempted: sets.New(targetKeyReason("/a1", kueue.InClusterQueueReason), targetKeyReason("/a2", kueue.InClusterQueueReason)), }, "preemptions from cq when target queue is exhausted for one requested resource, but not the other": { + clusterQueues: defaultClusterQueues, admitted: []kueue.Workload{ *utiltesting.MakeWorkload("a1", ""). Priority(-2). @@ -1377,6 +1414,7 @@ func TestPreemption(t *testing.T) { wantPreempted: sets.New(targetKeyReason("/a1", kueue.InClusterQueueReason), targetKeyReason("/a2", kueue.InClusterQueueReason)), }, "allow preemption from other cluster queues if target cq is not exhausted for the requested resource": { + clusterQueues: defaultClusterQueues, admitted: []kueue.Workload{ *utiltesting.MakeWorkload("a1", ""). Priority(-1). @@ -1421,6 +1459,47 @@ func TestPreemption(t *testing.T) { }), wantPreempted: sets.New(targetKeyReason("/a1", kueue.InClusterQueueReason), targetKeyReason("/b5", kueue.InCohortReclamationReason)), }, + "long range preemption": { + clusterQueues: []*kueue.ClusterQueue{ + utiltesting.MakeClusterQueue("cq-left"). + Cohort("cohort-left"). + Preemption(kueue.ClusterQueuePreemption{ + ReclaimWithinCohort: kueue.PreemptionPolicyAny, + }). + ResourceGroup(*utiltesting.MakeFlavorQuotas("default"). + Resource(corev1.ResourceCPU, "10"). + Obj(), + ).Obj(), + utiltesting.MakeClusterQueue("cq-right"). + Cohort("cohort-right"). + ResourceGroup(*utiltesting.MakeFlavorQuotas("default"). + Resource(corev1.ResourceCPU, "0"). + Obj(), + ). + Obj(), + }, + cohorts: []*kueuealpha.Cohort{ + utiltesting.MakeCohort("cohort-left").Parent("root").Obj(), + utiltesting.MakeCohort("cohort-right").Parent("root").Obj(), + }, + admitted: []kueue.Workload{ + *utiltesting.MakeWorkload("to-be-preempted", ""). + Request(corev1.ResourceCPU, "5"). + ReserveQuota(utiltesting.MakeAdmission("cq-right").Assignment(corev1.ResourceCPU, "default", "5").Obj()). + Obj(), + }, + incoming: utiltesting.MakeWorkload("incoming", ""). + Request(corev1.ResourceCPU, "8"). + Obj(), + targetCQ: "cq-left", + assignment: singlePodSetAssignment(flavorassigner.ResourceAssignment{ + corev1.ResourceCPU: &flavorassigner.FlavorAssignment{ + Name: "default", + Mode: flavorassigner.Preempt, + }, + }), + wantPreempted: sets.New(targetKeyReason("/to-be-preempted", kueue.InCohortReclamationReason)), + }, } for name, tc := range cases { t.Run(name, func(t *testing.T) { @@ -1436,11 +1515,16 @@ func TestPreemption(t *testing.T) { for _, flv := range flavors { cqCache.AddOrUpdateResourceFlavor(flv) } - for _, cq := range clusterQueues { + for _, cq := range tc.clusterQueues { if err := cqCache.AddClusterQueue(ctx, cq); err != nil { t.Fatalf("Couldn't add ClusterQueue to cache: %v", err) } } + for _, cohort := range tc.cohorts { + if err := cqCache.AddOrUpdateCohort(cohort); err != nil { + t.Fatalf("Couldn't add Cohort to cache: %v", err) + } + } var lock sync.Mutex gotPreempted := sets.New[string]()