diff --git a/apis/kueue/v1beta1/clusterqueue_types.go b/apis/kueue/v1beta1/clusterqueue_types.go index 264276908a..161ac21486 100644 --- a/apis/kueue/v1beta1/clusterqueue_types.go +++ b/apis/kueue/v1beta1/clusterqueue_types.go @@ -186,12 +186,20 @@ type ResourceFlavorReference string // ClusterQueueStatus defines the observed state of ClusterQueue type ClusterQueueStatus struct { - // flavorsUsage are the used quotas, by flavor, currently in use by the + // flavorsReservation are the reserved quotas, by flavor, currently in use by the // workloads assigned to this ClusterQueue. // +listType=map // +listMapKey=name // +kubebuilder:validation:MaxItems=16 // +optional + FlavorsReservation []FlavorUsage `json:"flavorsReservation"` + + // flavorsUsage are the used quotas, by flavor, currently in use by the + // workloads admitted in this ClusterQueue. + // +listType=map + // +listMapKey=name + // +kubebuilder:validation:MaxItems=16 + // +optional FlavorsUsage []FlavorUsage `json:"flavorsUsage"` // pendingWorkloads is the number of workloads currently waiting to be @@ -199,6 +207,11 @@ type ClusterQueueStatus struct { // +optional PendingWorkloads int32 `json:"pendingWorkloads"` + // reservingWorkloads is the number of workloads currently reserving quota in this + // clusterQueue. + // +optional + ReservingWorkloads int32 `json:"reservingWorkloads"` + // admittedWorkloads is the number of workloads currently admitted to this // clusterQueue and haven't finished yet. // +optional diff --git a/apis/kueue/v1beta1/localqueue_types.go b/apis/kueue/v1beta1/localqueue_types.go index 38cc78fc46..99680e6d8e 100644 --- a/apis/kueue/v1beta1/localqueue_types.go +++ b/apis/kueue/v1beta1/localqueue_types.go @@ -37,7 +37,12 @@ type LocalQueueStatus struct { // +optional PendingWorkloads int32 `json:"pendingWorkloads"` - // AdmittedWorkloads is the number of workloads in this LocalQueue + // reservingWorkloads is the number of workloads in this LocalQueue + // reserving quota in a ClusterQueue and that haven't finished yet. + // +optional + ReservingWorkloads int32 `json:"reservingWorkloads"` + + // admittedWorkloads is the number of workloads in this LocalQueue // admitted to a ClusterQueue and that haven't finished yet. // +optional AdmittedWorkloads int32 `json:"admittedWorkloads"` @@ -51,7 +56,15 @@ type LocalQueueStatus struct { // +patchMergeKey=type Conditions []metav1.Condition `json:"conditions,omitempty" patchStrategy:"merge" patchMergeKey:"type"` - // flavorUsage are the used quotas, by flavor currently in use by the + // flavorsReservation are the reserved quotas, by flavor currently in use by the + // workloads assigned to this LocalQueue. + // +listType=map + // +listMapKey=name + // +kubebuilder:validation:MaxItems=16 + // +optional + FlavorsReservation []LocalQueueFlavorUsage `json:"flavorsReservation"` + + // flavorsUsage are the used quotas, by flavor currently in use by the // workloads assigned to this LocalQueue. // +listType=map // +listMapKey=name diff --git a/apis/kueue/v1beta1/zz_generated.deepcopy.go b/apis/kueue/v1beta1/zz_generated.deepcopy.go index a4555e5782..2a307ae98c 100644 --- a/apis/kueue/v1beta1/zz_generated.deepcopy.go +++ b/apis/kueue/v1beta1/zz_generated.deepcopy.go @@ -348,6 +348,13 @@ func (in *ClusterQueueSpec) DeepCopy() *ClusterQueueSpec { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ClusterQueueStatus) DeepCopyInto(out *ClusterQueueStatus) { *out = *in + if in.FlavorsReservation != nil { + in, out := &in.FlavorsReservation, &out.FlavorsReservation + *out = make([]FlavorUsage, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } if in.FlavorsUsage != nil { in, out := &in.FlavorsUsage, &out.FlavorsUsage *out = make([]FlavorUsage, len(*in)) @@ -560,6 +567,13 @@ func (in *LocalQueueStatus) DeepCopyInto(out *LocalQueueStatus) { (*in)[i].DeepCopyInto(&(*out)[i]) } } + if in.FlavorsReservation != nil { + in, out := &in.FlavorsReservation, &out.FlavorsReservation + *out = make([]LocalQueueFlavorUsage, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } if in.FlavorUsage != nil { in, out := &in.FlavorUsage, &out.FlavorUsage *out = make([]LocalQueueFlavorUsage, len(*in)) diff --git a/charts/kueue/templates/crd/kueue.x-k8s.io_clusterqueues.yaml b/charts/kueue/templates/crd/kueue.x-k8s.io_clusterqueues.yaml index 8faa7201c9..30d147f888 100644 --- a/charts/kueue/templates/crd/kueue.x-k8s.io_clusterqueues.yaml +++ b/charts/kueue/templates/crd/kueue.x-k8s.io_clusterqueues.yaml @@ -414,9 +414,59 @@ spec: x-kubernetes-list-map-keys: - type x-kubernetes-list-type: map + flavorsReservation: + description: flavorsReservation are the reserved quotas, by flavor, + currently in use by the workloads assigned to this ClusterQueue. + items: + properties: + name: + description: name of the flavor. + type: string + resources: + description: resources lists the quota usage for the resources + in this flavor. + items: + properties: + borrowed: + anyOf: + - type: integer + - type: string + description: Borrowed is quantity of quota that is borrowed + from the cohort. In other words, it's the used quota + that is over the nominalQuota. + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + name: + description: name of the resource + type: string + total: + anyOf: + - type: integer + - type: string + description: total is the total quantity of used quota, + including the amount borrowed from the cohort. + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + required: + - name + type: object + maxItems: 16 + type: array + x-kubernetes-list-map-keys: + - name + x-kubernetes-list-type: map + required: + - name + - resources + type: object + maxItems: 16 + type: array + x-kubernetes-list-map-keys: + - name + x-kubernetes-list-type: map flavorsUsage: description: flavorsUsage are the used quotas, by flavor, currently - in use by the workloads assigned to this ClusterQueue. + in use by the workloads admitted in this ClusterQueue. items: properties: name: @@ -501,6 +551,11 @@ spec: required: - lastChangeTime type: object + reservingWorkloads: + description: reservingWorkloads is the number of workloads currently + reserving quota in this clusterQueue. + format: int32 + type: integer type: object type: object served: true diff --git a/charts/kueue/templates/crd/kueue.x-k8s.io_localqueues.yaml b/charts/kueue/templates/crd/kueue.x-k8s.io_localqueues.yaml index 18e6f39e11..3fb00b3ce4 100644 --- a/charts/kueue/templates/crd/kueue.x-k8s.io_localqueues.yaml +++ b/charts/kueue/templates/crd/kueue.x-k8s.io_localqueues.yaml @@ -72,7 +72,7 @@ spec: description: LocalQueueStatus defines the observed state of LocalQueue properties: admittedWorkloads: - description: AdmittedWorkloads is the number of workloads in this + description: admittedWorkloads is the number of workloads in this LocalQueue admitted to a ClusterQueue and that haven't finished yet. format: int32 @@ -151,7 +151,7 @@ spec: - type x-kubernetes-list-type: map flavorUsage: - description: flavorUsage are the used quotas, by flavor currently + description: flavorsUsage are the used quotas, by flavor currently in use by the workloads assigned to this LocalQueue. items: properties: @@ -190,11 +190,57 @@ spec: x-kubernetes-list-map-keys: - name x-kubernetes-list-type: map + flavorsReservation: + description: flavorsReservation are the reserved quotas, by flavor + currently in use by the workloads assigned to this LocalQueue. + items: + properties: + name: + description: name of the flavor. + type: string + resources: + description: resources lists the quota usage for the resources + in this flavor. + items: + properties: + name: + description: name of the resource. + type: string + total: + anyOf: + - type: integer + - type: string + description: total is the total quantity of used quota. + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + required: + - name + type: object + maxItems: 16 + type: array + x-kubernetes-list-map-keys: + - name + x-kubernetes-list-type: map + required: + - name + - resources + type: object + maxItems: 16 + type: array + x-kubernetes-list-map-keys: + - name + x-kubernetes-list-type: map pendingWorkloads: description: PendingWorkloads is the number of Workloads in the LocalQueue not yet admitted to a ClusterQueue format: int32 type: integer + reservingWorkloads: + description: reservingWorkloads is the number of workloads in this + LocalQueue reserving quota in a ClusterQueue and that haven't finished + yet. + format: int32 + type: integer type: object type: object served: true diff --git a/client-go/applyconfiguration/kueue/v1beta1/clusterqueuestatus.go b/client-go/applyconfiguration/kueue/v1beta1/clusterqueuestatus.go index 3992496da6..39fc059793 100644 --- a/client-go/applyconfiguration/kueue/v1beta1/clusterqueuestatus.go +++ b/client-go/applyconfiguration/kueue/v1beta1/clusterqueuestatus.go @@ -24,8 +24,10 @@ import ( // ClusterQueueStatusApplyConfiguration represents an declarative configuration of the ClusterQueueStatus type for use // with apply. type ClusterQueueStatusApplyConfiguration struct { + FlavorsReservation []FlavorUsageApplyConfiguration `json:"flavorsReservation,omitempty"` FlavorsUsage []FlavorUsageApplyConfiguration `json:"flavorsUsage,omitempty"` PendingWorkloads *int32 `json:"pendingWorkloads,omitempty"` + ReservingWorkloads *int32 `json:"reservingWorkloads,omitempty"` AdmittedWorkloads *int32 `json:"admittedWorkloads,omitempty"` Conditions []v1.Condition `json:"conditions,omitempty"` PendingWorkloadsStatus *ClusterQueuePendingWorkloadsStatusApplyConfiguration `json:"pendingWorkloadsStatus,omitempty"` @@ -37,6 +39,19 @@ func ClusterQueueStatus() *ClusterQueueStatusApplyConfiguration { return &ClusterQueueStatusApplyConfiguration{} } +// WithFlavorsReservation adds the given value to the FlavorsReservation field in the declarative configuration +// and returns the receiver, so that objects can be build by chaining "With" function invocations. +// If called multiple times, values provided by each call will be appended to the FlavorsReservation field. +func (b *ClusterQueueStatusApplyConfiguration) WithFlavorsReservation(values ...*FlavorUsageApplyConfiguration) *ClusterQueueStatusApplyConfiguration { + for i := range values { + if values[i] == nil { + panic("nil value passed to WithFlavorsReservation") + } + b.FlavorsReservation = append(b.FlavorsReservation, *values[i]) + } + return b +} + // WithFlavorsUsage adds the given value to the FlavorsUsage field in the declarative configuration // and returns the receiver, so that objects can be build by chaining "With" function invocations. // If called multiple times, values provided by each call will be appended to the FlavorsUsage field. @@ -58,6 +73,14 @@ func (b *ClusterQueueStatusApplyConfiguration) WithPendingWorkloads(value int32) return b } +// WithReservingWorkloads sets the ReservingWorkloads field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the ReservingWorkloads field is set to the value of the last call. +func (b *ClusterQueueStatusApplyConfiguration) WithReservingWorkloads(value int32) *ClusterQueueStatusApplyConfiguration { + b.ReservingWorkloads = &value + return b +} + // WithAdmittedWorkloads sets the AdmittedWorkloads field in the declarative configuration to the given value // and returns the receiver, so that objects can be built by chaining "With" function invocations. // If called multiple times, the AdmittedWorkloads field is set to the value of the last call. diff --git a/client-go/applyconfiguration/kueue/v1beta1/localqueuestatus.go b/client-go/applyconfiguration/kueue/v1beta1/localqueuestatus.go index c9a30530a7..34f4179746 100644 --- a/client-go/applyconfiguration/kueue/v1beta1/localqueuestatus.go +++ b/client-go/applyconfiguration/kueue/v1beta1/localqueuestatus.go @@ -24,10 +24,12 @@ import ( // LocalQueueStatusApplyConfiguration represents an declarative configuration of the LocalQueueStatus type for use // with apply. type LocalQueueStatusApplyConfiguration struct { - PendingWorkloads *int32 `json:"pendingWorkloads,omitempty"` - AdmittedWorkloads *int32 `json:"admittedWorkloads,omitempty"` - Conditions []v1.Condition `json:"conditions,omitempty"` - FlavorUsage []LocalQueueFlavorUsageApplyConfiguration `json:"flavorUsage,omitempty"` + PendingWorkloads *int32 `json:"pendingWorkloads,omitempty"` + ReservingWorkloads *int32 `json:"reservingWorkloads,omitempty"` + AdmittedWorkloads *int32 `json:"admittedWorkloads,omitempty"` + Conditions []v1.Condition `json:"conditions,omitempty"` + FlavorsReservation []LocalQueueFlavorUsageApplyConfiguration `json:"flavorsReservation,omitempty"` + FlavorUsage []LocalQueueFlavorUsageApplyConfiguration `json:"flavorUsage,omitempty"` } // LocalQueueStatusApplyConfiguration constructs an declarative configuration of the LocalQueueStatus type for use with @@ -44,6 +46,14 @@ func (b *LocalQueueStatusApplyConfiguration) WithPendingWorkloads(value int32) * return b } +// WithReservingWorkloads sets the ReservingWorkloads field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the ReservingWorkloads field is set to the value of the last call. +func (b *LocalQueueStatusApplyConfiguration) WithReservingWorkloads(value int32) *LocalQueueStatusApplyConfiguration { + b.ReservingWorkloads = &value + return b +} + // WithAdmittedWorkloads sets the AdmittedWorkloads field in the declarative configuration to the given value // and returns the receiver, so that objects can be built by chaining "With" function invocations. // If called multiple times, the AdmittedWorkloads field is set to the value of the last call. @@ -62,6 +72,19 @@ func (b *LocalQueueStatusApplyConfiguration) WithConditions(values ...v1.Conditi return b } +// WithFlavorsReservation adds the given value to the FlavorsReservation field in the declarative configuration +// and returns the receiver, so that objects can be build by chaining "With" function invocations. +// If called multiple times, values provided by each call will be appended to the FlavorsReservation field. +func (b *LocalQueueStatusApplyConfiguration) WithFlavorsReservation(values ...*LocalQueueFlavorUsageApplyConfiguration) *LocalQueueStatusApplyConfiguration { + for i := range values { + if values[i] == nil { + panic("nil value passed to WithFlavorsReservation") + } + b.FlavorsReservation = append(b.FlavorsReservation, *values[i]) + } + return b +} + // WithFlavorUsage adds the given value to the FlavorUsage field in the declarative configuration // and returns the receiver, so that objects can be build by chaining "With" function invocations. // If called multiple times, values provided by each call will be appended to the FlavorUsage field. diff --git a/config/components/crd/bases/kueue.x-k8s.io_clusterqueues.yaml b/config/components/crd/bases/kueue.x-k8s.io_clusterqueues.yaml index e0fdcd7fb1..4cfabf3835 100644 --- a/config/components/crd/bases/kueue.x-k8s.io_clusterqueues.yaml +++ b/config/components/crd/bases/kueue.x-k8s.io_clusterqueues.yaml @@ -401,9 +401,59 @@ spec: x-kubernetes-list-map-keys: - type x-kubernetes-list-type: map + flavorsReservation: + description: flavorsReservation are the reserved quotas, by flavor, + currently in use by the workloads assigned to this ClusterQueue. + items: + properties: + name: + description: name of the flavor. + type: string + resources: + description: resources lists the quota usage for the resources + in this flavor. + items: + properties: + borrowed: + anyOf: + - type: integer + - type: string + description: Borrowed is quantity of quota that is borrowed + from the cohort. In other words, it's the used quota + that is over the nominalQuota. + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + name: + description: name of the resource + type: string + total: + anyOf: + - type: integer + - type: string + description: total is the total quantity of used quota, + including the amount borrowed from the cohort. + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + required: + - name + type: object + maxItems: 16 + type: array + x-kubernetes-list-map-keys: + - name + x-kubernetes-list-type: map + required: + - name + - resources + type: object + maxItems: 16 + type: array + x-kubernetes-list-map-keys: + - name + x-kubernetes-list-type: map flavorsUsage: description: flavorsUsage are the used quotas, by flavor, currently - in use by the workloads assigned to this ClusterQueue. + in use by the workloads admitted in this ClusterQueue. items: properties: name: @@ -488,6 +538,11 @@ spec: required: - lastChangeTime type: object + reservingWorkloads: + description: reservingWorkloads is the number of workloads currently + reserving quota in this clusterQueue. + format: int32 + type: integer type: object type: object served: true diff --git a/config/components/crd/bases/kueue.x-k8s.io_localqueues.yaml b/config/components/crd/bases/kueue.x-k8s.io_localqueues.yaml index 2124f7cbc0..fe53484d6e 100644 --- a/config/components/crd/bases/kueue.x-k8s.io_localqueues.yaml +++ b/config/components/crd/bases/kueue.x-k8s.io_localqueues.yaml @@ -59,7 +59,7 @@ spec: description: LocalQueueStatus defines the observed state of LocalQueue properties: admittedWorkloads: - description: AdmittedWorkloads is the number of workloads in this + description: admittedWorkloads is the number of workloads in this LocalQueue admitted to a ClusterQueue and that haven't finished yet. format: int32 @@ -138,7 +138,7 @@ spec: - type x-kubernetes-list-type: map flavorUsage: - description: flavorUsage are the used quotas, by flavor currently + description: flavorsUsage are the used quotas, by flavor currently in use by the workloads assigned to this LocalQueue. items: properties: @@ -177,11 +177,57 @@ spec: x-kubernetes-list-map-keys: - name x-kubernetes-list-type: map + flavorsReservation: + description: flavorsReservation are the reserved quotas, by flavor + currently in use by the workloads assigned to this LocalQueue. + items: + properties: + name: + description: name of the flavor. + type: string + resources: + description: resources lists the quota usage for the resources + in this flavor. + items: + properties: + name: + description: name of the resource. + type: string + total: + anyOf: + - type: integer + - type: string + description: total is the total quantity of used quota. + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + required: + - name + type: object + maxItems: 16 + type: array + x-kubernetes-list-map-keys: + - name + x-kubernetes-list-type: map + required: + - name + - resources + type: object + maxItems: 16 + type: array + x-kubernetes-list-map-keys: + - name + x-kubernetes-list-type: map pendingWorkloads: description: PendingWorkloads is the number of Workloads in the LocalQueue not yet admitted to a ClusterQueue format: int32 type: integer + reservingWorkloads: + description: reservingWorkloads is the number of workloads in this + LocalQueue reserving quota in a ClusterQueue and that haven't finished + yet. + format: int32 + type: integer type: object type: object served: true diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index d4904226f1..a9ceec2588 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -172,20 +172,6 @@ func (c *Cache) CleanUpOnContext(ctx context.Context) { c.podsReadyCond.Broadcast() } -func (c *Cache) AdmittedWorkloadsInLocalQueue(localQueue *kueue.LocalQueue) int32 { - c.Lock() - defer c.Unlock() - cq, ok := c.clusterQueues[string(localQueue.Spec.ClusterQueue)] - if !ok { - return 0 - } - qImpl, ok := cq.localQueues[queueKey(localQueue)] - if !ok { - return 0 - } - return int32(qImpl.admittedWorkloads) -} - func (c *Cache) updateClusterQueues() sets.Set[string] { cqs := sets.New[string]() @@ -314,11 +300,14 @@ func (c *Cache) AddClusterQueue(ctx context.Context, cq *kueue.ClusterQueue) err for _, q := range queues.Items { qKey := queueKey(&q) qImpl := &queue{ - key: qKey, - admittedWorkloads: 0, - usage: make(FlavorResourceQuantities), + key: qKey, + reservingWorkloads: 0, + admittedWorkloads: 0, + //TODO: rename this to better distinguish between reserved and in use quantities + usage: make(FlavorResourceQuantities), + admittedUsage: make(FlavorResourceQuantities), } - if err = qImpl.resetFlavorsAndResources(cqImpl.Usage); err != nil { + if err = qImpl.resetFlavorsAndResources(cqImpl.Usage, cqImpl.AdmittedUsage); err != nil { return err } cqImpl.localQueues[qKey] = qImpl @@ -351,7 +340,7 @@ func (c *Cache) UpdateClusterQueue(cq *kueue.ClusterQueue) error { if qImpl == nil { return errQNotFound } - if err := qImpl.resetFlavorsAndResources(cqImpl.Usage); err != nil { + if err := qImpl.resetFlavorsAndResources(cqImpl.Usage, cqImpl.AdmittedUsage); err != nil { return err } } @@ -554,20 +543,36 @@ func (c *Cache) ForgetWorkload(w *kueue.Workload) error { return nil } -// Usage reports the used resources and number of workloads admitted by the ClusterQueue. -func (c *Cache) Usage(cqObj *kueue.ClusterQueue) ([]kueue.FlavorUsage, int, error) { +type ClusterQueueUsageStats struct { + ReservedResources []kueue.FlavorUsage + ReservingWorkloads int + AdmittedResources []kueue.FlavorUsage + AdmittedWorkloads int +} + +// Usage reports the reserved and admitted resources and number of workloads holding them in the ClusterQueue. +func (c *Cache) Usage(cqObj *kueue.ClusterQueue) (*ClusterQueueUsageStats, error) { c.RLock() defer c.RUnlock() cq := c.clusterQueues[cqObj.Name] if cq == nil { - return nil, 0, errCqNotFound + return nil, errCqNotFound } - usage := make([]kueue.FlavorUsage, 0, len(cq.Usage)) - for _, rg := range cq.ResourceGroups { + return &ClusterQueueUsageStats{ + ReservedResources: getUsage(cq.Usage, cq.ResourceGroups, cq.Cohort), + ReservingWorkloads: len(cq.Workloads), + AdmittedResources: getUsage(cq.AdmittedUsage, cq.ResourceGroups, cq.Cohort), + AdmittedWorkloads: cq.admittedWorkloadsCount, + }, nil +} + +func getUsage(frq FlavorResourceQuantities, rgs []ResourceGroup, cohort *Cohort) []kueue.FlavorUsage { + usage := make([]kueue.FlavorUsage, 0, len(frq)) + for _, rg := range rgs { for _, flvQuotas := range rg.Flavors { - flvUsage := cq.Usage[flvQuotas.Name] + flvUsage := frq[flvQuotas.Name] outFlvUsage := kueue.FlavorUsage{ Name: flvQuotas.Name, Resources: make([]kueue.ResourceUsage, 0, len(flvQuotas.Resources)), @@ -579,7 +584,7 @@ func (c *Cache) Usage(cqObj *kueue.ClusterQueue) ([]kueue.FlavorUsage, int, erro Total: workload.ResourceQuantity(rName, used), } // Enforce `borrowed=0` if the clusterQueue doesn't belong to a cohort. - if cq.Cohort != nil { + if cohort != nil { borrowed := used - rQuota.Nominal if borrowed > 0 { rUsage.Borrowed = workload.ResourceQuantity(rName, borrowed) @@ -594,26 +599,42 @@ func (c *Cache) Usage(cqObj *kueue.ClusterQueue) ([]kueue.FlavorUsage, int, erro usage = append(usage, outFlvUsage) } } - return usage, len(cq.Workloads), nil + return usage } -func (c *Cache) LocalQueueUsage(qObj *kueue.LocalQueue) ([]kueue.LocalQueueFlavorUsage, error) { +type LocalQueueUsageStats struct { + ReservedResources []kueue.LocalQueueFlavorUsage + ReservingWorkloads int + AdmittedResources []kueue.LocalQueueFlavorUsage + AdmittedWorkloads int +} + +func (c *Cache) LocalQueueUsage(qObj *kueue.LocalQueue) (*LocalQueueUsageStats, error) { c.RLock() defer c.RUnlock() cqImpl, ok := c.clusterQueues[string(qObj.Spec.ClusterQueue)] if !ok { - return nil, nil + return &LocalQueueUsageStats{}, nil } qImpl, ok := cqImpl.localQueues[queueKey(qObj)] if !ok { return nil, errQNotFound } - qFlvUsages := make([]kueue.LocalQueueFlavorUsage, 0, len(qImpl.usage)) - for _, rg := range cqImpl.ResourceGroups { + return &LocalQueueUsageStats{ + ReservedResources: filterLocalQueueUsage(qImpl.usage, cqImpl.ResourceGroups), + ReservingWorkloads: qImpl.reservingWorkloads, + AdmittedResources: filterLocalQueueUsage(qImpl.admittedUsage, cqImpl.ResourceGroups), + AdmittedWorkloads: qImpl.admittedWorkloads, + }, nil +} + +func filterLocalQueueUsage(orig FlavorResourceQuantities, resourceGroups []ResourceGroup) []kueue.LocalQueueFlavorUsage { + qFlvUsages := make([]kueue.LocalQueueFlavorUsage, 0, len(orig)) + for _, rg := range resourceGroups { for _, flvQuotas := range rg.Flavors { - flvUsage := qImpl.usage[flvQuotas.Name] + flvUsage := orig[flvQuotas.Name] outFlvUsage := kueue.LocalQueueFlavorUsage{ Name: flvQuotas.Name, Resources: make([]kueue.LocalQueueResourceUsage, 0, len(flvQuotas.Resources)), @@ -631,7 +652,7 @@ func (c *Cache) LocalQueueUsage(qObj *kueue.LocalQueue) ([]kueue.LocalQueueFlavo qFlvUsages = append(qFlvUsages, outFlvUsage) } } - return qFlvUsages, nil + return qFlvUsages } func (c *Cache) cleanupAssumedState(w *kueue.Workload) { diff --git a/pkg/cache/cache_test.go b/pkg/cache/cache_test.go index 27aa2cb7cb..77d6676c59 100644 --- a/pkg/cache/cache_test.go +++ b/pkg/cache/cache_test.go @@ -121,6 +121,9 @@ func TestCacheClusterQueueOperations(t *testing.T) { Usage: FlavorResourceQuantities{ "default": {corev1.ResourceCPU: 0}, }, + AdmittedUsage: FlavorResourceQuantities{ + "default": {corev1.ResourceCPU: 0}, + }, Status: active, Preemption: defaultPreemption, }, @@ -144,6 +147,9 @@ func TestCacheClusterQueueOperations(t *testing.T) { Usage: FlavorResourceQuantities{ "default": {corev1.ResourceCPU: 0}, }, + AdmittedUsage: FlavorResourceQuantities{ + "default": {corev1.ResourceCPU: 0}, + }, Status: active, Preemption: defaultPreemption, }, @@ -186,6 +192,9 @@ func TestCacheClusterQueueOperations(t *testing.T) { Usage: FlavorResourceQuantities{ "nonexistent-flavor": {corev1.ResourceCPU: 0}, }, + AdmittedUsage: FlavorResourceQuantities{ + "nonexistent-flavor": {corev1.ResourceCPU: 0}, + }, Status: pending, Preemption: defaultPreemption, }, @@ -270,6 +279,9 @@ func TestCacheClusterQueueOperations(t *testing.T) { Usage: FlavorResourceQuantities{ "default": {corev1.ResourceCPU: 0}, }, + AdmittedUsage: FlavorResourceQuantities{ + "default": {corev1.ResourceCPU: 0}, + }, Status: active, Preemption: defaultPreemption, }, @@ -293,6 +305,9 @@ func TestCacheClusterQueueOperations(t *testing.T) { Usage: FlavorResourceQuantities{ "default": {corev1.ResourceCPU: 0}, }, + AdmittedUsage: FlavorResourceQuantities{ + "default": {corev1.ResourceCPU: 0}, + }, Status: active, Preemption: defaultPreemption, }, @@ -337,6 +352,9 @@ func TestCacheClusterQueueOperations(t *testing.T) { Usage: FlavorResourceQuantities{ "nonexistent-flavor": {corev1.ResourceCPU: 0}, }, + AdmittedUsage: FlavorResourceQuantities{ + "nonexistent-flavor": {corev1.ResourceCPU: 0}, + }, Status: pending, Preemption: defaultPreemption, }, @@ -418,6 +436,9 @@ func TestCacheClusterQueueOperations(t *testing.T) { Usage: FlavorResourceQuantities{ "default": {corev1.ResourceCPU: 0}, }, + AdmittedUsage: FlavorResourceQuantities{ + "default": {corev1.ResourceCPU: 0}, + }, Status: active, Preemption: defaultPreemption, }, @@ -472,6 +493,9 @@ func TestCacheClusterQueueOperations(t *testing.T) { Usage: FlavorResourceQuantities{ "default": {corev1.ResourceCPU: 0}, }, + AdmittedUsage: FlavorResourceQuantities{ + "default": {corev1.ResourceCPU: 0}, + }, Status: active, Preemption: defaultPreemption, }, @@ -529,6 +553,9 @@ func TestCacheClusterQueueOperations(t *testing.T) { Usage: FlavorResourceQuantities{ "default": {corev1.ResourceCPU: 0}, }, + AdmittedUsage: FlavorResourceQuantities{ + "default": {corev1.ResourceCPU: 0}, + }, Status: active, Preemption: defaultPreemption, }, @@ -563,6 +590,9 @@ func TestCacheClusterQueueOperations(t *testing.T) { Usage: FlavorResourceQuantities{ "nonexistent-flavor": {corev1.ResourceCPU: 0}, }, + AdmittedUsage: FlavorResourceQuantities{ + "nonexistent-flavor": {corev1.ResourceCPU: 0}, + }, Status: pending, Preemption: defaultPreemption, }, @@ -619,6 +649,9 @@ func TestCacheClusterQueueOperations(t *testing.T) { Usage: FlavorResourceQuantities{ "default": {corev1.ResourceCPU: 0}, }, + AdmittedUsage: FlavorResourceQuantities{ + "default": {corev1.ResourceCPU: 0}, + }, Status: active, Preemption: defaultPreemption, }, @@ -642,6 +675,9 @@ func TestCacheClusterQueueOperations(t *testing.T) { Usage: FlavorResourceQuantities{ "default": {corev1.ResourceCPU: 0}, }, + AdmittedUsage: FlavorResourceQuantities{ + "default": {corev1.ResourceCPU: 0}, + }, Status: active, Preemption: defaultPreemption, }, @@ -682,6 +718,7 @@ func TestCacheClusterQueueOperations(t *testing.T) { NamespaceSelector: labels.Nothing(), FlavorFungibility: defaultFlavorFungibility, Usage: FlavorResourceQuantities{"nonexistent-flavor": {corev1.ResourceCPU: 0}}, + AdmittedUsage: FlavorResourceQuantities{"nonexistent-flavor": {corev1.ResourceCPU: 0}}, Status: active, Preemption: defaultPreemption, }, @@ -789,6 +826,22 @@ func TestCacheClusterQueueOperations(t *testing.T) { "example.com/gpu": 0, }, }, + AdmittedUsage: FlavorResourceQuantities{ + "foo": { + "cpu": 0, + "memory": 0, + }, + "bar": { + "cpu": 0, + "memory": 0, + }, + "theta": { + "example.com/gpu": 0, + }, + "gamma": { + "example.com/gpu": 0, + }, + }, Status: pending, Preemption: defaultPreemption, }, @@ -1594,6 +1647,7 @@ func TestClusterQueueUsage(t *testing.T) { Request(corev1.ResourceCPU, "8"). Request("example.com/gpu", "5"). ReserveQuota(utiltesting.MakeAdmission("foo").Assignment(corev1.ResourceCPU, "default", "8000m").Assignment("example.com/gpu", "model_a", "5").Obj()). + Condition(metav1.Condition{Type: kueue.WorkloadAdmitted, Status: metav1.ConditionTrue}). Obj(), *utiltesting.MakeWorkload("two", ""). Request(corev1.ResourceCPU, "5"). @@ -1602,14 +1656,47 @@ func TestClusterQueueUsage(t *testing.T) { Obj(), } cases := map[string]struct { - clusterQueue *kueue.ClusterQueue - workloads []kueue.Workload - wantUsedResources []kueue.FlavorUsage - wantWorkloads int + clusterQueue *kueue.ClusterQueue + workloads []kueue.Workload + wantReservedResources []kueue.FlavorUsage + wantReservingWorkloads int + wantUsedResources []kueue.FlavorUsage + wantAdmittedWorkloads int }{ - "clusterQueue without cohort; single no borrowing": { + "clusterQueue without cohort; single, admitted no borrowing": { clusterQueue: cqWithOutCohort, workloads: workloads[:1], + wantReservedResources: []kueue.FlavorUsage{ + { + Name: "default", + Resources: []kueue.ResourceUsage{{ + Name: corev1.ResourceCPU, + Total: resource.MustParse("8"), + }}, + }, + { + Name: "model_a", + Resources: []kueue.ResourceUsage{{ + Name: "example.com/gpu", + Total: resource.MustParse("5"), + }}, + }, + { + Name: "model_b", + Resources: []kueue.ResourceUsage{{ + Name: "example.com/gpu", + }}, + }, + { + Name: "interconnect_a", + Resources: []kueue.ResourceUsage{ + {Name: "example.com/vf-0"}, + {Name: "example.com/vf-1"}, + {Name: "example.com/vf-2"}, + }, + }, + }, + wantReservingWorkloads: 1, wantUsedResources: []kueue.FlavorUsage{ { Name: "default", @@ -1640,12 +1727,12 @@ func TestClusterQueueUsage(t *testing.T) { }, }, }, - wantWorkloads: 1, + wantAdmittedWorkloads: 1, }, "clusterQueue with cohort; multiple borrowing": { clusterQueue: cq, workloads: workloads, - wantUsedResources: []kueue.FlavorUsage{ + wantReservedResources: []kueue.FlavorUsage{ { Name: "default", Resources: []kueue.ResourceUsage{{ @@ -1678,12 +1765,43 @@ func TestClusterQueueUsage(t *testing.T) { }, }, }, - wantWorkloads: 2, + wantReservingWorkloads: 2, + wantUsedResources: []kueue.FlavorUsage{ + { + Name: "default", + Resources: []kueue.ResourceUsage{{ + Name: corev1.ResourceCPU, + Total: resource.MustParse("8"), + }}, + }, + { + Name: "model_a", + Resources: []kueue.ResourceUsage{{ + Name: "example.com/gpu", + Total: resource.MustParse("5"), + }}, + }, + { + Name: "model_b", + Resources: []kueue.ResourceUsage{{ + Name: "example.com/gpu", + }}, + }, + { + Name: "interconnect_a", + Resources: []kueue.ResourceUsage{ + {Name: "example.com/vf-0"}, + {Name: "example.com/vf-1"}, + {Name: "example.com/vf-2"}, + }, + }, + }, + wantAdmittedWorkloads: 1, }, "clusterQueue without cohort; multiple borrowing": { clusterQueue: cqWithOutCohort, workloads: workloads, - wantUsedResources: []kueue.FlavorUsage{ + wantReservedResources: []kueue.FlavorUsage{ { Name: "default", Resources: []kueue.ResourceUsage{{ @@ -1716,7 +1834,38 @@ func TestClusterQueueUsage(t *testing.T) { }, }, }, - wantWorkloads: 2, + wantReservingWorkloads: 2, + wantUsedResources: []kueue.FlavorUsage{ + { + Name: "default", + Resources: []kueue.ResourceUsage{{ + Name: corev1.ResourceCPU, + Total: resource.MustParse("8"), + }}, + }, + { + Name: "model_a", + Resources: []kueue.ResourceUsage{{ + Name: "example.com/gpu", + Total: resource.MustParse("5"), + }}, + }, + { + Name: "model_b", + Resources: []kueue.ResourceUsage{{ + Name: "example.com/gpu", + }}, + }, + { + Name: "interconnect_a", + Resources: []kueue.ResourceUsage{ + {Name: "example.com/vf-0"}, + {Name: "example.com/vf-1"}, + {Name: "example.com/vf-2"}, + }, + }, + }, + wantAdmittedWorkloads: 1, }, } for name, tc := range cases { @@ -1727,20 +1876,29 @@ func TestClusterQueueUsage(t *testing.T) { if err != nil { t.Fatalf("Adding ClusterQueue: %v", err) } - for _, w := range tc.workloads { - if added := cache.AddOrUpdateWorkload(&w); !added { - t.Fatalf("Workload %s was not added", workload.Key(&w)) + for i := range tc.workloads { + w := &tc.workloads[i] + if added := cache.AddOrUpdateWorkload(w); !added { + t.Fatalf("Workload %s was not added", workload.Key(w)) } } - resources, workloads, err := cache.Usage(tc.clusterQueue) + stats, err := cache.Usage(tc.clusterQueue) if err != nil { t.Fatalf("Couldn't get usage: %v", err) } - if diff := cmp.Diff(tc.wantUsedResources, resources); diff != "" { + + if diff := cmp.Diff(tc.wantReservedResources, stats.ReservedResources); diff != "" { + t.Errorf("Unexpected used reserved resources (-want,+got):\n%s", diff) + } + if stats.ReservingWorkloads != tc.wantReservingWorkloads { + t.Errorf("Got %d reserving workloads, want %d", stats.ReservingWorkloads, tc.wantReservingWorkloads) + } + + if diff := cmp.Diff(tc.wantUsedResources, stats.AdmittedResources); diff != "" { t.Errorf("Unexpected used resources (-want,+got):\n%s", diff) } - if workloads != tc.wantWorkloads { - t.Errorf("Got %d workloads, want %d", workloads, tc.wantWorkloads) + if stats.AdmittedWorkloads != tc.wantAdmittedWorkloads { + t.Errorf("Got %d admitted workloads, want %d", stats.AdmittedWorkloads, tc.wantAdmittedWorkloads) } }) } @@ -1962,7 +2120,7 @@ func TestLocalQueueUsage(t *testing.T) { if err != nil { t.Fatalf("Couldn't get usage for the queue: %v", err) } - if diff := cmp.Diff(tc.wantUsage, gotUsage); diff != "" { + if diff := cmp.Diff(tc.wantUsage, gotUsage.ReservedResources); diff != "" { t.Errorf("Unexpected used resources for the queue (-want,+got):\n%s", diff) } }) @@ -2004,14 +2162,18 @@ func TestCacheQueueOperations(t *testing.T) { utiltesting.MakeAdmission("foo"). Assignment("cpu", "spot", "2"). Assignment("memory", "spot", "8Gi").Obj(), - ).Obj(), + ). + Condition(metav1.Condition{Type: kueue.WorkloadAdmitted, Status: metav1.ConditionTrue}). + Obj(), utiltesting.MakeWorkload("job2", "ns2"). Queue("beta"). Request("example.com/gpu", "2"). ReserveQuota( utiltesting.MakeAdmission("foo"). Assignment("example.com/gpu", "model-a", "2").Obj(), - ).Obj(), + ). + Condition(metav1.Condition{Type: kueue.WorkloadAdmitted, Status: metav1.ConditionTrue}). + Obj(), utiltesting.MakeWorkload("job3", "ns1"). Queue("gamma"). Request("cpu", "5"). @@ -2065,8 +2227,9 @@ func TestCacheQueueOperations(t *testing.T) { } cacheLocalQueuesAfterInsertingAll := map[string]*queue{ "ns1/alpha": { - key: "ns1/alpha", - admittedWorkloads: 1, + key: "ns1/alpha", + reservingWorkloads: 1, + admittedWorkloads: 1, usage: FlavorResourceQuantities{ "spot": { corev1.ResourceCPU: workload.ResourceValue(corev1.ResourceCPU, resource.MustParse("2")), @@ -2076,10 +2239,20 @@ func TestCacheQueueOperations(t *testing.T) { "example.com/gpu": workload.ResourceValue("example.com/gpu", resource.MustParse("0")), }, }, + admittedUsage: FlavorResourceQuantities{ + "spot": { + corev1.ResourceCPU: workload.ResourceValue(corev1.ResourceCPU, resource.MustParse("2")), + corev1.ResourceMemory: workload.ResourceValue(corev1.ResourceMemory, resource.MustParse("8Gi")), + }, + "model-a": { + "example.com/gpu": workload.ResourceValue("example.com/gpu", resource.MustParse("0")), + }, + }, }, "ns2/beta": { - key: "ns2/beta", - admittedWorkloads: 2, + key: "ns2/beta", + reservingWorkloads: 2, + admittedWorkloads: 1, usage: FlavorResourceQuantities{ "spot": { corev1.ResourceCPU: workload.ResourceValue(corev1.ResourceCPU, resource.MustParse("0")), @@ -2089,10 +2262,20 @@ func TestCacheQueueOperations(t *testing.T) { "example.com/gpu": workload.ResourceValue("example.com/gpu", resource.MustParse("7")), }, }, + admittedUsage: FlavorResourceQuantities{ + "spot": { + corev1.ResourceCPU: workload.ResourceValue(corev1.ResourceCPU, resource.MustParse("0")), + corev1.ResourceMemory: workload.ResourceValue(corev1.ResourceMemory, resource.MustParse("0")), + }, + "model-a": { + "example.com/gpu": workload.ResourceValue("example.com/gpu", resource.MustParse("2")), + }, + }, }, "ns1/gamma": { - key: "ns1/gamma", - admittedWorkloads: 1, + key: "ns1/gamma", + reservingWorkloads: 1, + admittedWorkloads: 0, usage: FlavorResourceQuantities{ "ondemand": { corev1.ResourceCPU: workload.ResourceValue(corev1.ResourceCPU, resource.MustParse("5")), @@ -2102,12 +2285,22 @@ func TestCacheQueueOperations(t *testing.T) { "example.com/gpu": workload.ResourceValue("example.com/gpu", resource.MustParse("0")), }, }, + admittedUsage: FlavorResourceQuantities{ + "ondemand": { + corev1.ResourceCPU: workload.ResourceValue(corev1.ResourceCPU, resource.MustParse("0")), + corev1.ResourceMemory: workload.ResourceValue(corev1.ResourceMemory, resource.MustParse("0")), + }, + "model-b": { + "example.com/gpu": workload.ResourceValue("example.com/gpu", resource.MustParse("0")), + }, + }, }, } cacheLocalQueuesAfterInsertingCqAndQ := map[string]*queue{ "ns1/alpha": { - key: "ns1/alpha", - admittedWorkloads: 0, + key: "ns1/alpha", + reservingWorkloads: 0, + admittedWorkloads: 0, usage: FlavorResourceQuantities{ "spot": { corev1.ResourceCPU: workload.ResourceValue(corev1.ResourceCPU, resource.MustParse("0")), @@ -2117,10 +2310,20 @@ func TestCacheQueueOperations(t *testing.T) { "example.com/gpu": workload.ResourceValue("example.com/gpu", resource.MustParse("0")), }, }, + admittedUsage: FlavorResourceQuantities{ + "spot": { + corev1.ResourceCPU: workload.ResourceValue(corev1.ResourceCPU, resource.MustParse("0")), + corev1.ResourceMemory: workload.ResourceValue(corev1.ResourceMemory, resource.MustParse("0")), + }, + "model-a": { + "example.com/gpu": workload.ResourceValue("example.com/gpu", resource.MustParse("0")), + }, + }, }, "ns2/beta": { - key: "ns2/beta", - admittedWorkloads: 0, + key: "ns2/beta", + reservingWorkloads: 0, + admittedWorkloads: 0, usage: FlavorResourceQuantities{ "spot": { corev1.ResourceCPU: workload.ResourceValue(corev1.ResourceCPU, resource.MustParse("0")), @@ -2130,10 +2333,20 @@ func TestCacheQueueOperations(t *testing.T) { "example.com/gpu": workload.ResourceValue("example.com/gpu", resource.MustParse("0")), }, }, + admittedUsage: FlavorResourceQuantities{ + "spot": { + corev1.ResourceCPU: workload.ResourceValue(corev1.ResourceCPU, resource.MustParse("0")), + corev1.ResourceMemory: workload.ResourceValue(corev1.ResourceMemory, resource.MustParse("0")), + }, + "model-a": { + "example.com/gpu": workload.ResourceValue("example.com/gpu", resource.MustParse("0")), + }, + }, }, "ns1/gamma": { - key: "ns1/gamma", - admittedWorkloads: 0, + key: "ns1/gamma", + reservingWorkloads: 0, + admittedWorkloads: 0, usage: FlavorResourceQuantities{ "ondemand": { corev1.ResourceCPU: workload.ResourceValue(corev1.ResourceCPU, resource.MustParse("0")), @@ -2143,6 +2356,15 @@ func TestCacheQueueOperations(t *testing.T) { "example.com/gpu": workload.ResourceValue("example.com/gpu", resource.MustParse("0")), }, }, + admittedUsage: FlavorResourceQuantities{ + "ondemand": { + corev1.ResourceCPU: workload.ResourceValue(corev1.ResourceCPU, resource.MustParse("0")), + corev1.ResourceMemory: workload.ResourceValue(corev1.ResourceMemory, resource.MustParse("0")), + }, + "model-b": { + "example.com/gpu": workload.ResourceValue("example.com/gpu", resource.MustParse("0")), + }, + }, }, } cases := map[string]struct { diff --git a/pkg/cache/clusterqueue.go b/pkg/cache/clusterqueue.go index b953fb0bc1..b9fd6c0ed6 100644 --- a/pkg/cache/clusterqueue.go +++ b/pkg/cache/clusterqueue.go @@ -28,6 +28,7 @@ type ClusterQueue struct { ResourceGroups []ResourceGroup RGByResource map[corev1.ResourceName]*ResourceGroup Usage FlavorResourceQuantities + AdmittedUsage FlavorResourceQuantities Workloads map[string]*workload.Info WorkloadsNotReady sets.Set[string] NamespaceSelector labels.Selector @@ -46,6 +47,7 @@ type ClusterQueue struct { podsReadyTracking bool hasMissingFlavors bool hasMissingOrInactiveAdmissionChecks bool + admittedWorkloadsCount int } // Cohort is a set of ClusterQueues that can borrow resources from each other. @@ -84,9 +86,12 @@ type ResourceQuota struct { type FlavorResourceQuantities map[kueue.ResourceFlavorReference]map[corev1.ResourceName]int64 type queue struct { - key string - admittedWorkloads int - usage FlavorResourceQuantities + key string + reservingWorkloads int + admittedWorkloads int + //TODO: rename this to better distinguish between reserved and "in use" quantities + usage FlavorResourceQuantities + admittedUsage FlavorResourceQuantities } func newCohort(name string, size int) *Cohort { @@ -151,22 +156,10 @@ func (c *ClusterQueue) update(in *kueue.ClusterQueue, resourceFlavors map[kueue. } c.NamespaceSelector = nsSelector - // Cleanup removed flavors or resources. - usedFlavorResources := make(FlavorResourceQuantities) - for _, rg := range in.Spec.ResourceGroups { - for _, f := range rg.Flavors { - existingUsedResources := c.Usage[f.Name] - usedResources := make(map[corev1.ResourceName]int64, len(f.Resources)) - for _, r := range f.Resources { - usedResources[r.Name] = existingUsedResources[r.Name] - } - usedFlavorResources[f.Name] = usedResources - } - } - c.AdmissionChecks = sets.New(in.Spec.AdmissionChecks...) - c.Usage = usedFlavorResources + c.Usage = filterQuantities(c.Usage, in.Spec.ResourceGroups) + c.AdmittedUsage = filterQuantities(c.AdmittedUsage, in.Spec.ResourceGroups) c.UpdateWithFlavors(resourceFlavors) c.updateWithAdmissionChecks(admissionChecks) @@ -191,6 +184,21 @@ func (c *ClusterQueue) update(in *kueue.ClusterQueue, resourceFlavors map[kueue. return nil } +func filterQuantities(orig FlavorResourceQuantities, resourceGroups []kueue.ResourceGroup) FlavorResourceQuantities { + ret := make(FlavorResourceQuantities) + for _, rg := range resourceGroups { + for _, f := range rg.Flavors { + existingUsedResources := orig[f.Name] + usedResources := make(map[corev1.ResourceName]int64, len(f.Resources)) + for _, r := range f.Resources { + usedResources[r.Name] = existingUsedResources[r.Name] + } + ret[f.Name] = usedResources + } + } + return ret +} + func (c *ClusterQueue) updateResourceGroups(in []kueue.ResourceGroup) { c.ResourceGroups = make([]ResourceGroup, len(in)) for i, rgIn := range in { @@ -324,7 +332,7 @@ func (c *ClusterQueue) addWorkload(w *kueue.Workload) error { if c.podsReadyTracking && !apimeta.IsStatusConditionTrue(w.Status.Conditions, kueue.WorkloadPodsReady) { c.WorkloadsNotReady.Insert(k) } - reportAdmittedActiveWorkloads(wi.ClusterQueue, len(c.Workloads)) + c.reportActiveWorkloads() return nil } @@ -341,18 +349,33 @@ func (c *ClusterQueue) deleteWorkload(w *kueue.Workload) { // we only increase the AllocatableResourceGeneration cause the add of workload won't make more // workloads fit in ClusterQueue. c.AllocatableResourceGeneration++ + delete(c.Workloads, k) - reportAdmittedActiveWorkloads(wi.ClusterQueue, len(c.Workloads)) + c.reportActiveWorkloads() +} + +func (c *ClusterQueue) reportActiveWorkloads() { + metrics.AdmittedActiveWorkloads.WithLabelValues(c.Name).Set(float64(c.admittedWorkloadsCount)) + metrics.ReservingActiveWorkloads.WithLabelValues(c.Name).Set(float64(len(c.Workloads))) } // updateWorkloadUsage updates the usage of the ClusterQueue for the workload // and the number of admitted workloads for local queues. func (c *ClusterQueue) updateWorkloadUsage(wi *workload.Info, m int64) { + admitted := workload.IsAdmitted(wi.Obj) updateUsage(wi, c.Usage, m) + if admitted { + updateUsage(wi, c.AdmittedUsage, m) + c.admittedWorkloadsCount += int(m) + } qKey := workload.QueueKey(wi.Obj) - if _, ok := c.localQueues[qKey]; ok { - updateUsage(wi, c.localQueues[qKey].usage, m) - c.localQueues[qKey].admittedWorkloads += int(m) + if lq, ok := c.localQueues[qKey]; ok { + updateUsage(wi, lq.usage, m) + lq.reservingWorkloads += int(m) + if admitted { + updateUsage(wi, lq.admittedUsage, m) + lq.admittedWorkloads += int(m) + } } } @@ -378,17 +401,21 @@ func (c *ClusterQueue) addLocalQueue(q *kueue.LocalQueue) error { // We need to count the workloads, because they could have been added before // receiving the queue add event. qImpl := &queue{ - key: qKey, - admittedWorkloads: 0, - usage: make(FlavorResourceQuantities), + key: qKey, + reservingWorkloads: 0, + usage: make(FlavorResourceQuantities), } - if err := qImpl.resetFlavorsAndResources(c.Usage); err != nil { + if err := qImpl.resetFlavorsAndResources(c.Usage, c.AdmittedUsage); err != nil { return err } for _, wl := range c.Workloads { if workloadBelongsToLocalQueue(wl.Obj, q) { updateUsage(wl, qImpl.usage, 1) - qImpl.admittedWorkloads++ + qImpl.reservingWorkloads++ + if workload.IsAdmitted(wl.Obj) { + updateUsage(wl, qImpl.admittedUsage, 1) + qImpl.admittedWorkloads++ + } } } c.localQueues[qKey] = qImpl @@ -411,25 +438,26 @@ func (c *ClusterQueue) flavorInUse(flavor string) bool { return false } -func (q *queue) resetFlavorsAndResources(cqUsage FlavorResourceQuantities) error { +func (q *queue) resetFlavorsAndResources(cqUsage FlavorResourceQuantities, cqAdmittedUsage FlavorResourceQuantities) error { // Clean up removed flavors or resources. + q.usage = resetUsage(q.usage, cqUsage) + q.admittedUsage = resetUsage(q.admittedUsage, cqAdmittedUsage) + return nil +} + +func resetUsage(lqUsage FlavorResourceQuantities, cqUsage FlavorResourceQuantities) FlavorResourceQuantities { usedFlavorResources := make(FlavorResourceQuantities) for cqFlv, cqRes := range cqUsage { - existingUsedResources := q.usage[cqFlv] + existingUsedResources := lqUsage[cqFlv] usedResources := make(map[corev1.ResourceName]int64, len(cqRes)) for rName := range cqRes { usedResources[rName] = existingUsedResources[rName] } usedFlavorResources[cqFlv] = usedResources } - q.usage = usedFlavorResources - return nil + return usedFlavorResources } func workloadBelongsToLocalQueue(wl *kueue.Workload, q *kueue.LocalQueue) bool { return wl.Namespace == q.Namespace && wl.Spec.QueueName == q.Name } - -func reportAdmittedActiveWorkloads(cqName string, val int) { - metrics.AdmittedActiveWorkloads.WithLabelValues(cqName).Set(float64(val)) -} diff --git a/pkg/controller/core/clusterqueue_controller.go b/pkg/controller/core/clusterqueue_controller.go index e5150be239..3e55ca991b 100644 --- a/pkg/controller/core/clusterqueue_controller.go +++ b/pkg/controller/core/clusterqueue_controller.go @@ -332,6 +332,14 @@ func recordResourceMetrics(cq *kueue.ClusterQueue) { } } + for fri := range cq.Status.FlavorsReservation { + fr := &cq.Status.FlavorsReservation[fri] + for ri := range fr.Resources { + r := &fr.Resources[ri] + metrics.ReportClusterQueueResourceReservations(cq.Spec.Cohort, cq.Name, string(fr.Name), string(r.Name), resource.QuantityToFloat(&r.Total)) + } + } + for fui := range cq.Status.FlavorsUsage { fu := &cq.Status.FlavorsUsage[fui] for ri := range fu.Resources { @@ -377,6 +385,28 @@ func clearOldResourceQuotas(oldCq, newCq *kueue.ClusterQueue) { } } + // reservation metrics + if len(oldCq.Status.FlavorsReservation) > 0 { + newFlavors := map[kueue.ResourceFlavorReference]*kueue.FlavorUsage{} + if len(newCq.Status.FlavorsReservation) > 0 { + newFlavors = slices.ToRefMap(newCq.Status.FlavorsReservation, func(f *kueue.FlavorUsage) kueue.ResourceFlavorReference { return f.Name }) + } + for fi := range oldCq.Status.FlavorsReservation { + flavor := &oldCq.Status.FlavorsReservation[fi] + if newFlavor, found := newFlavors[flavor.Name]; !found || len(newFlavor.Resources) == 0 { + metrics.ClearClusterQueueResourceReservations(oldCq.Name, string(flavor.Name), "") + } else { + newResources := slices.ToRefMap(newFlavor.Resources, func(r *kueue.ResourceUsage) corev1.ResourceName { return r.Name }) + for ri := range flavor.Resources { + rname := flavor.Resources[ri].Name + if _, found := newResources[rname]; !found { + metrics.ClearClusterQueueResourceReservations(oldCq.Name, string(flavor.Name), string(rname)) + } + } + } + } + } + // usage metrics if len(oldCq.Status.FlavorsUsage) > 0 { newFlavors := map[kueue.ResourceFlavorReference]*kueue.FlavorUsage{} @@ -600,15 +630,17 @@ func (r *ClusterQueueReconciler) updateCqStatusIfChanged( ) error { oldStatus := cq.Status.DeepCopy() pendingWorkloads := r.qManager.Pending(cq) - usage, workloads, err := r.cache.Usage(cq) + stats, err := r.cache.Usage(cq) if err != nil { r.log.Error(err, "Failed getting usage from cache") // This is likely because the cluster queue was recently removed, // but we didn't process that event yet. return err } - cq.Status.FlavorsUsage = usage - cq.Status.AdmittedWorkloads = int32(workloads) + cq.Status.FlavorsReservation = stats.ReservedResources + cq.Status.FlavorsUsage = stats.AdmittedResources + cq.Status.ReservingWorkloads = int32(stats.ReservingWorkloads) + cq.Status.AdmittedWorkloads = int32(stats.AdmittedWorkloads) cq.Status.PendingWorkloads = int32(pendingWorkloads) cq.Status.PendingWorkloadsStatus = r.getWorkloadsStatus(cq) meta.SetStatusCondition(&cq.Status.Conditions, metav1.Condition{ diff --git a/pkg/controller/core/clusterqueue_controller_test.go b/pkg/controller/core/clusterqueue_controller_test.go index ddb0506e2e..fa5f1c698e 100644 --- a/pkg/controller/core/clusterqueue_controller_test.go +++ b/pkg/controller/core/clusterqueue_controller_test.go @@ -226,7 +226,7 @@ func allMetricsForQueue(name string) cqMetrics { return cqMetrics{ NominalDPs: testingmetrics.CollectFilteredGaugeVec(metrics.ClusterQueueResourceNominalQuota, map[string]string{"cluster_queue": name}), BorrowingDPs: testingmetrics.CollectFilteredGaugeVec(metrics.ClusterQueueResourceBorrowingLimit, map[string]string{"cluster_queue": name}), - UsageDPs: testingmetrics.CollectFilteredGaugeVec(metrics.ClusterQueueResourceUsage, map[string]string{"cluster_queue": name}), + UsageDPs: testingmetrics.CollectFilteredGaugeVec(metrics.ClusterQueueResourceReservations, map[string]string{"cluster_queue": name}), } } @@ -268,7 +268,7 @@ func TestRecordResourceMetrics(t *testing.T) { }, }, Status: kueue.ClusterQueueStatus{ - FlavorsUsage: []kueue.FlavorUsage{ + FlavorsReservation: []kueue.FlavorUsage{ { Name: "flavor", Resources: []kueue.ResourceUsage{ @@ -320,7 +320,7 @@ func TestRecordResourceMetrics(t *testing.T) { ret := baseQueue.DeepCopy() ret.Spec.ResourceGroups[0].Flavors[0].Resources[0].NominalQuota = resource.MustParse("2") ret.Spec.ResourceGroups[0].Flavors[0].Resources[0].BorrowingLimit = ptr.To(resource.MustParse("1")) - ret.Status.FlavorsUsage[0].Resources[0].Total = resource.MustParse("3") + ret.Status.FlavorsReservation[0].Resources[0].Total = resource.MustParse("3") return ret }(), wantUpdatedMetrics: cqMetrics{ @@ -381,7 +381,7 @@ func TestRecordResourceMetrics(t *testing.T) { updatedQueue: func() *kueue.ClusterQueue { ret := baseQueue.DeepCopy() ret.Spec.ResourceGroups[0].Flavors[0].Name = "flavor2" - ret.Status.FlavorsUsage[0].Name = "flavor2" + ret.Status.FlavorsReservation[0].Name = "flavor2" return ret }(), wantUpdatedMetrics: cqMetrics{ @@ -412,7 +412,7 @@ func TestRecordResourceMetrics(t *testing.T) { updatedQueue: func() *kueue.ClusterQueue { ret := baseQueue.DeepCopy() ret.Spec.ResourceGroups[0].Flavors[0].Resources[0].Name = corev1.ResourceMemory - ret.Status.FlavorsUsage[0].Resources[0].Name = corev1.ResourceMemory + ret.Status.FlavorsReservation[0].Resources[0].Name = corev1.ResourceMemory return ret }(), wantUpdatedMetrics: cqMetrics{ @@ -442,7 +442,7 @@ func TestRecordResourceMetrics(t *testing.T) { }, updatedQueue: func() *kueue.ClusterQueue { ret := baseQueue.DeepCopy() - ret.Status.FlavorsUsage = nil + ret.Status.FlavorsReservation = nil return ret }(), wantUpdatedMetrics: cqMetrics{ diff --git a/pkg/controller/core/localqueue_controller.go b/pkg/controller/core/localqueue_controller.go index a616e99398..0a21da34ae 100644 --- a/pkg/controller/core/localqueue_controller.go +++ b/pkg/controller/core/localqueue_controller.go @@ -272,14 +272,16 @@ func (r *LocalQueueReconciler) UpdateStatusIfChanged( r.log.Error(err, failedUpdateLqStatusMsg) return err } - usage, err := r.cache.LocalQueueUsage(queue) + stats, err := r.cache.LocalQueueUsage(queue) if err != nil { r.log.Error(err, failedUpdateLqStatusMsg) return err } queue.Status.PendingWorkloads = pendingWls - queue.Status.AdmittedWorkloads = r.cache.AdmittedWorkloadsInLocalQueue(queue) - queue.Status.FlavorUsage = usage + queue.Status.ReservingWorkloads = int32(stats.ReservingWorkloads) + queue.Status.AdmittedWorkloads = int32(stats.AdmittedWorkloads) + queue.Status.FlavorsReservation = stats.ReservedResources + queue.Status.FlavorUsage = stats.AdmittedResources if len(conditionStatus) != 0 && len(reason) != 0 && len(msg) != 0 { meta.SetStatusCondition(&queue.Status.Conditions, metav1.Condition{ Type: kueue.LocalQueueActive, diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index 7a4bdc95f7..a548989b35 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -105,6 +105,14 @@ The label 'result' can have the following values: // Metrics tied to the cache. + ReservingActiveWorkloads = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Subsystem: constants.KueueName, + Name: "reserving_active_workloads", + Help: "The number of Workloads that are reserving quota, per 'cluster_queue'", + }, []string{"cluster_queue"}, + ) + AdmittedActiveWorkloads = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Subsystem: constants.KueueName, @@ -123,6 +131,14 @@ For a ClusterQueue, the metric only reports a value of 1 for one of the statuses ) // Optional cluster queue metrics + ClusterQueueResourceReservations = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Subsystem: constants.KueueName, + Name: "cluster_queue_resource_reservation", + Help: `Reports the cluster_queue's total resource reservation within all the flavors`, + }, []string{"cohort", "cluster_queue", "flavor", "resource"}, + ) + ClusterQueueResourceUsage = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Subsystem: constants.KueueName, @@ -181,6 +197,7 @@ func ReportClusterQueueStatus(cqName string, cqStatus ClusterQueueStatus) { } func ClearCacheMetrics(cqName string) { + ReservingActiveWorkloads.DeleteLabelValues(cqName) AdmittedActiveWorkloads.DeleteLabelValues(cqName) for _, status := range CQStatuses { ClusterQueueByStatus.DeleteLabelValues(cqName, string(status)) @@ -192,6 +209,10 @@ func ReportClusterQueueQuotas(cohort, queue, flavor, resource string, nominal, b ClusterQueueResourceBorrowingLimit.WithLabelValues(cohort, queue, flavor, resource).Set(borrowing) } +func ReportClusterQueueResourceReservations(cohort, queue, flavor, resource string, usage float64) { + ClusterQueueResourceReservations.WithLabelValues(cohort, queue, flavor, resource).Set(usage) +} + func ReportClusterQueueResourceUsage(cohort, queue, flavor, resource string, usage float64) { ClusterQueueResourceUsage.WithLabelValues(cohort, queue, flavor, resource).Set(usage) } @@ -203,6 +224,7 @@ func ClearClusterQueueResourceMetrics(cqName string) { ClusterQueueResourceNominalQuota.DeletePartialMatch(lbls) ClusterQueueResourceBorrowingLimit.DeletePartialMatch(lbls) ClusterQueueResourceUsage.DeletePartialMatch(lbls) + ClusterQueueResourceReservations.DeletePartialMatch(lbls) } func ClearClusterQueueResourceQuotas(cqName, flavor, resource string) { @@ -232,15 +254,30 @@ func ClearClusterQueueResourceUsage(cqName, flavor, resource string) { ClusterQueueResourceUsage.DeletePartialMatch(lbls) } +func ClearClusterQueueResourceReservations(cqName, flavor, resource string) { + lbls := prometheus.Labels{ + "cluster_queue": cqName, + "flavor": flavor, + } + + if len(resource) != 0 { + lbls["resource"] = resource + } + + ClusterQueueResourceReservations.DeletePartialMatch(lbls) +} + func Register() { metrics.Registry.MustRegister( admissionAttemptsTotal, admissionAttemptDuration, PendingWorkloads, + ReservingActiveWorkloads, AdmittedActiveWorkloads, AdmittedWorkloadsTotal, admissionWaitTime, ClusterQueueResourceUsage, + ClusterQueueResourceReservations, ClusterQueueResourceNominalQuota, ClusterQueueResourceBorrowingLimit, ) diff --git a/pkg/metrics/metrics_test.go b/pkg/metrics/metrics_test.go index 1318242e78..cff7bfe02f 100644 --- a/pkg/metrics/metrics_test.go +++ b/pkg/metrics/metrics_test.go @@ -43,15 +43,20 @@ func TestReportAndCleanupClusterQueueMetics(t *testing.T) { expectFilteredMetricsCount(t, ClusterQueueResourceNominalQuota, 2, "cluster_queue", "queue") expectFilteredMetricsCount(t, ClusterQueueResourceBorrowingLimit, 2, "cluster_queue", "queue") + ReportClusterQueueResourceReservations("cohort", "queue", "flavor", "res", 7) + ReportClusterQueueResourceReservations("cohort", "queue", "flavor2", "res", 3) + ReportClusterQueueResourceUsage("cohort", "queue", "flavor", "res", 7) ReportClusterQueueResourceUsage("cohort", "queue", "flavor2", "res", 3) + expectFilteredMetricsCount(t, ClusterQueueResourceReservations, 2, "cluster_queue", "queue") expectFilteredMetricsCount(t, ClusterQueueResourceUsage, 2, "cluster_queue", "queue") ClearClusterQueueResourceMetrics("queue") expectFilteredMetricsCount(t, ClusterQueueResourceNominalQuota, 0, "cluster_queue", "queue") expectFilteredMetricsCount(t, ClusterQueueResourceBorrowingLimit, 0, "cluster_queue", "queue") + expectFilteredMetricsCount(t, ClusterQueueResourceReservations, 0, "cluster_queue", "queue") expectFilteredMetricsCount(t, ClusterQueueResourceUsage, 0, "cluster_queue", "queue") } @@ -84,6 +89,25 @@ func TestReportAndCleanupClusterQueueQuotas(t *testing.T) { } func TestReportAndCleanupClusterQueueUsage(t *testing.T) { + ReportClusterQueueResourceReservations("cohort", "queue", "flavor", "res", 5) + ReportClusterQueueResourceReservations("cohort", "queue", "flavor", "res2", 5) + ReportClusterQueueResourceReservations("cohort", "queue", "flavor2", "res", 1) + ReportClusterQueueResourceReservations("cohort", "queue", "flavor2", "res2", 1) + + expectFilteredMetricsCount(t, ClusterQueueResourceReservations, 4, "cluster_queue", "queue") + + // drop flavor2 + ClearClusterQueueResourceReservations("queue", "flavor2", "") + + expectFilteredMetricsCount(t, ClusterQueueResourceReservations, 2, "cluster_queue", "queue") + expectFilteredMetricsCount(t, ClusterQueueResourceReservations, 0, "cluster_queue", "queue", "flavor", "flavor2") + + // drop res2 + ClearClusterQueueResourceReservations("queue", "flavor", "res2") + + expectFilteredMetricsCount(t, ClusterQueueResourceReservations, 1, "cluster_queue", "queue") + expectFilteredMetricsCount(t, ClusterQueueResourceReservations, 0, "cluster_queue", "queue", "flavor", "flavor", "resource", "res2") + ReportClusterQueueResourceUsage("cohort", "queue", "flavor", "res", 5) ReportClusterQueueResourceUsage("cohort", "queue", "flavor", "res2", 5) ReportClusterQueueResourceUsage("cohort", "queue", "flavor2", "res", 1) diff --git a/test/integration/controller/core/clusterqueue_controller_test.go b/test/integration/controller/core/clusterqueue_controller_test.go index 6cb0ce1411..129617aa3f 100644 --- a/test/integration/controller/core/clusterqueue_controller_test.go +++ b/test/integration/controller/core/clusterqueue_controller_test.go @@ -113,9 +113,14 @@ var _ = ginkgo.Describe("ClusterQueue controller", ginkgo.Ordered, ginkgo.Contin spotFlavor *kueue.ResourceFlavor modelAFlavor *kueue.ResourceFlavor modelBFlavor *kueue.ResourceFlavor + ac *kueue.AdmissionCheck ) ginkgo.BeforeEach(func() { + ac = testing.MakeAdmissionCheck("ac").Obj() + gomega.Expect(k8sClient.Create(ctx, ac)).To(gomega.Succeed()) + util.SetAdmissionCheckActive(ctx, k8sClient, ac, metav1.ConditionTrue) + clusterQueue = testing.MakeClusterQueue("cluster-queue"). ResourceGroup( *testing.MakeFlavorQuotas(flavorOnDemand). @@ -130,6 +135,7 @@ var _ = ginkgo.Describe("ClusterQueue controller", ginkgo.Ordered, ginkgo.Contin Resource(resourceGPU, "5", "5").Obj(), ). Cohort("cohort"). + AdmissionChecks(ac.Name). Obj() gomega.Expect(k8sClient.Create(ctx, clusterQueue)).To(gomega.Succeed()) localQueue = testing.MakeLocalQueue("queue", ns.Name).ClusterQueue(clusterQueue.Name).Obj() @@ -142,6 +148,7 @@ var _ = ginkgo.Describe("ClusterQueue controller", ginkgo.Ordered, ginkgo.Contin util.ExpectResourceFlavorToBeDeleted(ctx, k8sClient, spotFlavor, true) util.ExpectResourceFlavorToBeDeleted(ctx, k8sClient, modelAFlavor, true) util.ExpectResourceFlavorToBeDeleted(ctx, k8sClient, modelBFlavor, true) + util.ExpectAdmissionCheckToBeDeleted(ctx, k8sClient, ac, true) }) ginkgo.It("Should update status and report metrics when workloads are assigned and finish", func() { @@ -171,10 +178,10 @@ var _ = ginkgo.Describe("ClusterQueue controller", ginkgo.Ordered, ginkgo.Contin util.ExpectCQResourceBorrowingQuota(clusterQueue, flavorModelA, string(resourceGPU), 5) util.ExpectCQResourceBorrowingQuota(clusterQueue, flavorModelB, string(resourceGPU), 5) - util.ExpectCQResourceUsage(clusterQueue, flavorOnDemand, string(corev1.ResourceCPU), 0) - util.ExpectCQResourceUsage(clusterQueue, flavorSpot, string(corev1.ResourceCPU), 0) - util.ExpectCQResourceUsage(clusterQueue, flavorModelA, string(resourceGPU), 0) - util.ExpectCQResourceUsage(clusterQueue, flavorModelB, string(resourceGPU), 0) + util.ExpectCQResourceReservations(clusterQueue, flavorOnDemand, string(corev1.ResourceCPU), 0) + util.ExpectCQResourceReservations(clusterQueue, flavorSpot, string(corev1.ResourceCPU), 0) + util.ExpectCQResourceReservations(clusterQueue, flavorModelA, string(resourceGPU), 0) + util.ExpectCQResourceReservations(clusterQueue, flavorModelB, string(resourceGPU), 0) }) ginkgo.By("Creating workloads") @@ -186,8 +193,9 @@ var _ = ginkgo.Describe("ClusterQueue controller", ginkgo.Ordered, ginkgo.Contin gomega.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(clusterQueue), &updatedCq)).To(gomega.Succeed()) return updatedCq.Status }, util.Timeout, util.Interval).Should(gomega.BeComparableTo(kueue.ClusterQueueStatus{ - PendingWorkloads: 5, - FlavorsUsage: emptyUsedFlavors, + PendingWorkloads: 5, + FlavorsReservation: emptyUsedFlavors, + FlavorsUsage: emptyUsedFlavors, Conditions: []metav1.Condition{ { Type: kueue.ClusterQueueActive, @@ -199,7 +207,7 @@ var _ = ginkgo.Describe("ClusterQueue controller", ginkgo.Ordered, ginkgo.Contin }, ignoreConditionTimestamps, ignorePendingWorkloadsStatus)) // Workloads are inadmissible because ResourceFlavors don't exist here yet. util.ExpectPendingWorkloadsMetric(clusterQueue, 0, 5) - util.ExpectAdmittedActiveWorkloadsMetric(clusterQueue, 0) + util.ExpectReservingActiveWorkloadsMetric(clusterQueue, 0) ginkgo.By("Creating ResourceFlavors") onDemandFlavor = testing.MakeResourceFlavor(flavorOnDemand).Obj() @@ -211,7 +219,7 @@ var _ = ginkgo.Describe("ClusterQueue controller", ginkgo.Ordered, ginkgo.Contin modelBFlavor = testing.MakeResourceFlavor(flavorModelB).Label(resourceGPU.String(), flavorModelB).Obj() gomega.Expect(k8sClient.Create(ctx, modelBFlavor)).To(gomega.Succeed()) - ginkgo.By("Admitting workloads") + ginkgo.By("Set workloads quota reservation") admissions := []*kueue.Admission{ testing.MakeAdmission(clusterQueue.Name). Assignment(corev1.ResourceCPU, flavorOnDemand, "2").Assignment(resourceGPU, flavorModelA, "2").Obj(), @@ -232,48 +240,86 @@ var _ = ginkgo.Describe("ClusterQueue controller", ginkgo.Ordered, ginkgo.Contin if admissions[i] != nil { return util.SetQuotaReservation(ctx, k8sClient, &newWL, admissions[i]) } - return k8sClient.Status().Update(ctx, &newWL) + return nil }, util.Timeout, util.Interval).Should(gomega.Succeed()) } + totalUsage := []kueue.FlavorUsage{ + { + Name: flavorOnDemand, + Resources: []kueue.ResourceUsage{{ + Name: corev1.ResourceCPU, + Total: resource.MustParse("6"), + Borrowed: resource.MustParse("1"), + }}, + }, + { + Name: flavorSpot, + Resources: []kueue.ResourceUsage{{ + Name: corev1.ResourceCPU, + Total: resource.MustParse("1"), + }}, + }, + { + Name: flavorModelA, + Resources: []kueue.ResourceUsage{{ + Name: resourceGPU, + Total: resource.MustParse("5"), + }}, + }, + { + Name: flavorModelB, + Resources: []kueue.ResourceUsage{{ + Name: resourceGPU, + Total: resource.MustParse("2"), + }}, + }, + } + gomega.Eventually(func() kueue.ClusterQueueStatus { var updatedCQ kueue.ClusterQueue gomega.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(clusterQueue), &updatedCQ)).To(gomega.Succeed()) return updatedCQ.Status }, util.Timeout, util.Interval).Should(gomega.BeComparableTo(kueue.ClusterQueueStatus{ - PendingWorkloads: 1, - AdmittedWorkloads: 4, - FlavorsUsage: []kueue.FlavorUsage{ - { - Name: flavorOnDemand, - Resources: []kueue.ResourceUsage{{ - Name: corev1.ResourceCPU, - Total: resource.MustParse("6"), - Borrowed: resource.MustParse("1"), - }}, - }, - { - Name: flavorSpot, - Resources: []kueue.ResourceUsage{{ - Name: corev1.ResourceCPU, - Total: resource.MustParse("1"), - }}, - }, - { - Name: flavorModelA, - Resources: []kueue.ResourceUsage{{ - Name: resourceGPU, - Total: resource.MustParse("5"), - }}, - }, + PendingWorkloads: 1, + ReservingWorkloads: 4, + AdmittedWorkloads: 0, + FlavorsReservation: totalUsage, + FlavorsUsage: emptyUsedFlavors, + Conditions: []metav1.Condition{ { - Name: flavorModelB, - Resources: []kueue.ResourceUsage{{ - Name: resourceGPU, - Total: resource.MustParse("2"), - }}, + Type: kueue.ClusterQueueActive, + Status: metav1.ConditionTrue, + Reason: "Ready", + Message: "Can admit new workloads", }, }, + }, ignoreConditionTimestamps, ignorePendingWorkloadsStatus)) + util.ExpectPendingWorkloadsMetric(clusterQueue, 1, 0) + util.ExpectReservingActiveWorkloadsMetric(clusterQueue, 4) + + ginkgo.By("Checking the resource reservation metrics are updated", func() { + util.ExpectCQResourceReservations(clusterQueue, flavorOnDemand, string(corev1.ResourceCPU), 6) + util.ExpectCQResourceReservations(clusterQueue, flavorSpot, string(corev1.ResourceCPU), 1) + util.ExpectCQResourceReservations(clusterQueue, flavorModelA, string(resourceGPU), 5) + util.ExpectCQResourceReservations(clusterQueue, flavorModelB, string(resourceGPU), 2) + }) + + ginkgo.By("Setting the admission check for the first 4 workloads") + for _, w := range workloads[:4] { + util.SetWorkloadsAdmissionCkeck(ctx, k8sClient, w, ac.Name, kueue.CheckStateReady) + } + + gomega.Eventually(func() kueue.ClusterQueueStatus { + var updatedCQ kueue.ClusterQueue + gomega.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(clusterQueue), &updatedCQ)).To(gomega.Succeed()) + return updatedCQ.Status + }, util.Timeout, util.Interval).Should(gomega.BeComparableTo(kueue.ClusterQueueStatus{ + PendingWorkloads: 1, + ReservingWorkloads: 4, + AdmittedWorkloads: 4, + FlavorsReservation: totalUsage, + FlavorsUsage: totalUsage, Conditions: []metav1.Condition{ { Type: kueue.ClusterQueueActive, @@ -284,13 +330,13 @@ var _ = ginkgo.Describe("ClusterQueue controller", ginkgo.Ordered, ginkgo.Contin }, }, ignoreConditionTimestamps, ignorePendingWorkloadsStatus)) util.ExpectPendingWorkloadsMetric(clusterQueue, 1, 0) - util.ExpectAdmittedActiveWorkloadsMetric(clusterQueue, 4) + util.ExpectReservingActiveWorkloadsMetric(clusterQueue, 4) ginkgo.By("Checking the resource usage metrics are updated", func() { - util.ExpectCQResourceUsage(clusterQueue, flavorOnDemand, string(corev1.ResourceCPU), 6) - util.ExpectCQResourceUsage(clusterQueue, flavorSpot, string(corev1.ResourceCPU), 1) - util.ExpectCQResourceUsage(clusterQueue, flavorModelA, string(resourceGPU), 5) - util.ExpectCQResourceUsage(clusterQueue, flavorModelB, string(resourceGPU), 2) + util.ExpectCQResourceReservations(clusterQueue, flavorOnDemand, string(corev1.ResourceCPU), 6) + util.ExpectCQResourceReservations(clusterQueue, flavorSpot, string(corev1.ResourceCPU), 1) + util.ExpectCQResourceReservations(clusterQueue, flavorModelA, string(resourceGPU), 5) + util.ExpectCQResourceReservations(clusterQueue, flavorModelB, string(resourceGPU), 2) }) ginkgo.By("Finishing workloads") @@ -300,7 +346,8 @@ var _ = ginkgo.Describe("ClusterQueue controller", ginkgo.Ordered, ginkgo.Contin gomega.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(clusterQueue), &updatedCq)).To(gomega.Succeed()) return updatedCq.Status }, util.Timeout, util.Interval).Should(gomega.BeComparableTo(kueue.ClusterQueueStatus{ - FlavorsUsage: emptyUsedFlavors, + FlavorsReservation: emptyUsedFlavors, + FlavorsUsage: emptyUsedFlavors, Conditions: []metav1.Condition{ { Type: kueue.ClusterQueueActive, @@ -311,7 +358,7 @@ var _ = ginkgo.Describe("ClusterQueue controller", ginkgo.Ordered, ginkgo.Contin }, }, ignoreConditionTimestamps, ignorePendingWorkloadsStatus)) util.ExpectPendingWorkloadsMetric(clusterQueue, 0, 0) - util.ExpectAdmittedActiveWorkloadsMetric(clusterQueue, 0) + util.ExpectReservingActiveWorkloadsMetric(clusterQueue, 0) }) ginkgo.It("Should update status when workloads have reclaimable pods", func() { @@ -374,11 +421,11 @@ var _ = ginkgo.Describe("ClusterQueue controller", ginkgo.Ordered, ginkgo.Contin }, util.Timeout, util.Interval).Should(gomega.Succeed()) }) - util.ExpectAdmittedActiveWorkloadsMetric(clusterQueue, 1) + util.ExpectReservingActiveWorkloadsMetric(clusterQueue, 1) gomega.Eventually(func() []kueue.FlavorUsage { var updatedCq kueue.ClusterQueue gomega.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(clusterQueue), &updatedCq)).To(gomega.Succeed()) - return updatedCq.Status.FlavorsUsage + return updatedCq.Status.FlavorsReservation }, util.Timeout, util.Interval).Should(gomega.BeComparableTo([]kueue.FlavorUsage{ { Name: flavorOnDemand, @@ -411,11 +458,11 @@ var _ = ginkgo.Describe("ClusterQueue controller", ginkgo.Ordered, ginkgo.Contin ginkgo.By("Mark two workers as reclaimable", func() { gomega.Expect(workload.UpdateReclaimablePods(ctx, k8sClient, wl, []kueue.ReclaimablePod{{Name: "workers", Count: 2}})).To(gomega.Succeed()) - util.ExpectAdmittedActiveWorkloadsMetric(clusterQueue, 1) + util.ExpectReservingActiveWorkloadsMetric(clusterQueue, 1) gomega.Eventually(func() []kueue.FlavorUsage { var updatedCq kueue.ClusterQueue gomega.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(clusterQueue), &updatedCq)).To(gomega.Succeed()) - return updatedCq.Status.FlavorsUsage + return updatedCq.Status.FlavorsReservation }, util.Timeout, util.Interval).Should(gomega.BeComparableTo([]kueue.FlavorUsage{ { Name: flavorOnDemand, @@ -449,11 +496,11 @@ var _ = ginkgo.Describe("ClusterQueue controller", ginkgo.Ordered, ginkgo.Contin ginkgo.By("Mark all workers and a driver as reclaimable", func() { gomega.Expect(workload.UpdateReclaimablePods(ctx, k8sClient, wl, []kueue.ReclaimablePod{{Name: "workers", Count: 5}, {Name: "driver", Count: 1}})).To(gomega.Succeed()) - util.ExpectAdmittedActiveWorkloadsMetric(clusterQueue, 1) + util.ExpectReservingActiveWorkloadsMetric(clusterQueue, 1) gomega.Eventually(func() []kueue.FlavorUsage { var updatedCq kueue.ClusterQueue gomega.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(clusterQueue), &updatedCq)).To(gomega.Succeed()) - return updatedCq.Status.FlavorsUsage + return updatedCq.Status.FlavorsReservation }, util.Timeout, util.Interval).Should(gomega.BeComparableTo([]kueue.FlavorUsage{ { Name: flavorOnDemand, @@ -486,7 +533,7 @@ var _ = ginkgo.Describe("ClusterQueue controller", ginkgo.Ordered, ginkgo.Contin ginkgo.By("Finishing workload", func() { util.FinishWorkloads(ctx, k8sClient, wl) util.ExpectPendingWorkloadsMetric(clusterQueue, 0, 0) - util.ExpectAdmittedActiveWorkloadsMetric(clusterQueue, 0) + util.ExpectReservingActiveWorkloadsMetric(clusterQueue, 0) }) }) diff --git a/test/integration/controller/core/localqueue_controller_test.go b/test/integration/controller/core/localqueue_controller_test.go index 793275a829..b03b732872 100644 --- a/test/integration/controller/core/localqueue_controller_test.go +++ b/test/integration/controller/core/localqueue_controller_test.go @@ -45,6 +45,7 @@ var _ = ginkgo.Describe("Queue controller", ginkgo.Ordered, ginkgo.ContinueOnFai *testing.MakeResourceFlavor(flavorModelC).Label(resourceGPU.String(), flavorModelC).Obj(), *testing.MakeResourceFlavor(flavorModelD).Label(resourceGPU.String(), flavorModelD).Obj(), } + ac *kueue.AdmissionCheck ) ginkgo.BeforeAll(func() { @@ -66,11 +67,17 @@ var _ = ginkgo.Describe("Queue controller", ginkgo.Ordered, ginkgo.ContinueOnFai }) ginkgo.BeforeEach(func() { + ac = testing.MakeAdmissionCheck("ac").Obj() + gomega.Expect(k8sClient.Create(ctx, ac)).To(gomega.Succeed()) + util.SetAdmissionCheckActive(ctx, k8sClient, ac, metav1.ConditionTrue) + clusterQueue = testing.MakeClusterQueue("cluster-queue.queue-controller"). ResourceGroup( *testing.MakeFlavorQuotas(flavorModelC).Resource(resourceGPU, "5", "5").Obj(), *testing.MakeFlavorQuotas(flavorModelD).Resource(resourceGPU, "5", "5").Obj(), - ).Obj() + ). + AdmissionChecks(ac.Name). + Obj() queue = testing.MakeLocalQueue("queue", ns.Name).ClusterQueue(clusterQueue.Name).Obj() gomega.Expect(k8sClient.Create(ctx, queue)).To(gomega.Succeed()) }) @@ -81,6 +88,7 @@ var _ = ginkgo.Describe("Queue controller", ginkgo.Ordered, ginkgo.ContinueOnFai for _, rf := range resourceFlavors { util.ExpectResourceFlavorToBeDeleted(ctx, k8sClient, &rf, true) } + util.ExpectAdmissionCheckToBeDeleted(ctx, k8sClient, ac, true) }) ginkgo.It("Should update conditions when clusterQueues that its localQueue references are updated", func() { @@ -192,13 +200,36 @@ var _ = ginkgo.Describe("Queue controller", ginkgo.Ordered, ginkgo.ContinueOnFai for _, w := range workloads { gomega.Expect(k8sClient.Create(ctx, w)).To(gomega.Succeed()) } + + emptyUsage := []kueue.LocalQueueFlavorUsage{ + { + Name: flavorModelC, + Resources: []kueue.LocalQueueResourceUsage{ + { + Name: resourceGPU, + Total: resource.MustParse("0"), + }, + }, + }, + { + Name: flavorModelD, + Resources: []kueue.LocalQueueResourceUsage{ + { + Name: resourceGPU, + Total: resource.MustParse("0"), + }, + }, + }, + } + gomega.Eventually(func() kueue.LocalQueueStatus { var updatedQueue kueue.LocalQueue gomega.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(queue), &updatedQueue)).To(gomega.Succeed()) return updatedQueue.Status }, util.Timeout, util.Interval).Should(gomega.BeComparableTo(kueue.LocalQueueStatus{ - AdmittedWorkloads: 0, - PendingWorkloads: 3, + ReservingWorkloads: 0, + AdmittedWorkloads: 0, + PendingWorkloads: 3, Conditions: []metav1.Condition{ { Type: kueue.LocalQueueActive, @@ -207,29 +238,11 @@ var _ = ginkgo.Describe("Queue controller", ginkgo.Ordered, ginkgo.ContinueOnFai Message: "Can submit new workloads to clusterQueue", }, }, - FlavorUsage: []kueue.LocalQueueFlavorUsage{ - { - Name: flavorModelC, - Resources: []kueue.LocalQueueResourceUsage{ - { - Name: resourceGPU, - Total: resource.MustParse("0"), - }, - }, - }, - { - Name: flavorModelD, - Resources: []kueue.LocalQueueResourceUsage{ - { - Name: resourceGPU, - Total: resource.MustParse("0"), - }, - }, - }, - }, + FlavorsReservation: emptyUsage, + FlavorUsage: emptyUsage, }, ignoreConditionTimestamps)) - ginkgo.By("Admitting workloads") + ginkgo.By("Setting the workloads quota reservation") for i, w := range workloads { gomega.Eventually(func() error { var newWL kueue.Workload @@ -237,13 +250,36 @@ var _ = ginkgo.Describe("Queue controller", ginkgo.Ordered, ginkgo.ContinueOnFai return util.SetQuotaReservation(ctx, k8sClient, &newWL, admissions[i]) }, util.Timeout, util.Interval).Should(gomega.Succeed()) } + + fullUsage := []kueue.LocalQueueFlavorUsage{ + { + Name: flavorModelC, + Resources: []kueue.LocalQueueResourceUsage{ + { + Name: resourceGPU, + Total: resource.MustParse("5"), + }, + }, + }, + { + Name: flavorModelD, + Resources: []kueue.LocalQueueResourceUsage{ + { + Name: resourceGPU, + Total: resource.MustParse("1"), + }, + }, + }, + } + gomega.Eventually(func() kueue.LocalQueueStatus { var updatedQueue kueue.LocalQueue gomega.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(queue), &updatedQueue)).To(gomega.Succeed()) return updatedQueue.Status }, util.Timeout, util.Interval).Should(gomega.BeComparableTo(kueue.LocalQueueStatus{ - AdmittedWorkloads: 3, - PendingWorkloads: 0, + ReservingWorkloads: 3, + AdmittedWorkloads: 0, + PendingWorkloads: 0, Conditions: []metav1.Condition{ { Type: kueue.LocalQueueActive, @@ -252,26 +288,33 @@ var _ = ginkgo.Describe("Queue controller", ginkgo.Ordered, ginkgo.ContinueOnFai Message: "Can submit new workloads to clusterQueue", }, }, - FlavorUsage: []kueue.LocalQueueFlavorUsage{ - { - Name: flavorModelC, - Resources: []kueue.LocalQueueResourceUsage{ - { - Name: resourceGPU, - Total: resource.MustParse("5"), - }, - }, - }, + FlavorsReservation: fullUsage, + FlavorUsage: emptyUsage, + }, ignoreConditionTimestamps)) + + ginkgo.By("Setting the workloads admission checks") + for _, w := range workloads { + util.SetWorkloadsAdmissionCkeck(ctx, k8sClient, w, ac.Name, kueue.CheckStateReady) + } + + gomega.Eventually(func() kueue.LocalQueueStatus { + var updatedQueue kueue.LocalQueue + gomega.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(queue), &updatedQueue)).To(gomega.Succeed()) + return updatedQueue.Status + }, util.Timeout, util.Interval).Should(gomega.BeComparableTo(kueue.LocalQueueStatus{ + ReservingWorkloads: 3, + AdmittedWorkloads: 3, + PendingWorkloads: 0, + Conditions: []metav1.Condition{ { - Name: flavorModelD, - Resources: []kueue.LocalQueueResourceUsage{ - { - Name: resourceGPU, - Total: resource.MustParse("1"), - }, - }, + Type: kueue.LocalQueueActive, + Status: metav1.ConditionTrue, + Reason: "Ready", + Message: "Can submit new workloads to clusterQueue", }, }, + FlavorsReservation: fullUsage, + FlavorUsage: fullUsage, }, ignoreConditionTimestamps)) ginkgo.By("Finishing workloads") @@ -289,26 +332,8 @@ var _ = ginkgo.Describe("Queue controller", ginkgo.Ordered, ginkgo.ContinueOnFai Message: "Can submit new workloads to clusterQueue", }, }, - FlavorUsage: []kueue.LocalQueueFlavorUsage{ - { - Name: flavorModelC, - Resources: []kueue.LocalQueueResourceUsage{ - { - Name: resourceGPU, - Total: resource.MustParse("0"), - }, - }, - }, - { - Name: flavorModelD, - Resources: []kueue.LocalQueueResourceUsage{ - { - Name: resourceGPU, - Total: resource.MustParse("0"), - }, - }, - }, - }, + FlavorsReservation: emptyUsage, + FlavorUsage: emptyUsage, }, ignoreConditionTimestamps)) }) }) diff --git a/test/integration/controller/jobs/job/job_controller_test.go b/test/integration/controller/jobs/job/job_controller_test.go index d0f6e95cdd..febb6706df 100644 --- a/test/integration/controller/jobs/job/job_controller_test.go +++ b/test/integration/controller/jobs/job/job_controller_test.go @@ -1033,7 +1033,7 @@ var _ = ginkgo.Describe("Job controller interacting with scheduler", ginkgo.Orde }, util.Timeout, util.Interval).Should(gomega.Equal(ptr.To(false))) gomega.Expect(createdProdJob1.Spec.Template.Spec.NodeSelector[instanceKey]).Should(gomega.Equal(onDemandFlavor.Name)) util.ExpectPendingWorkloadsMetric(prodClusterQ, 0, 0) - util.ExpectAdmittedActiveWorkloadsMetric(prodClusterQ, 1) + util.ExpectReservingActiveWorkloadsMetric(prodClusterQ, 1) ginkgo.By("checking a second no-fit prod job does not start") prodJob2 := testingjob.MakeJob("prod-job2", ns.Name).Queue(prodLocalQ.Name).Request(corev1.ResourceCPU, "5").Obj() @@ -1045,7 +1045,7 @@ var _ = ginkgo.Describe("Job controller interacting with scheduler", ginkgo.Orde return createdProdJob2.Spec.Suspend }, util.ConsistentDuration, util.Interval).Should(gomega.Equal(ptr.To(true))) util.ExpectPendingWorkloadsMetric(prodClusterQ, 0, 1) - util.ExpectAdmittedActiveWorkloadsMetric(prodClusterQ, 1) + util.ExpectReservingActiveWorkloadsMetric(prodClusterQ, 1) ginkgo.By("checking a dev job starts") devJob := testingjob.MakeJob("dev-job", ns.Name).Queue(devLocalQ.Name).Request(corev1.ResourceCPU, "5").Obj() @@ -1058,7 +1058,7 @@ var _ = ginkgo.Describe("Job controller interacting with scheduler", ginkgo.Orde }, util.Timeout, util.Interval).Should(gomega.Equal(ptr.To(false))) gomega.Expect(createdDevJob.Spec.Template.Spec.NodeSelector[instanceKey]).Should(gomega.Equal(spotUntaintedFlavor.Name)) util.ExpectPendingWorkloadsMetric(devClusterQ, 0, 0) - util.ExpectAdmittedActiveWorkloadsMetric(devClusterQ, 1) + util.ExpectReservingActiveWorkloadsMetric(devClusterQ, 1) ginkgo.By("checking the second prod job starts when the first finishes") createdProdJob1.Status.Conditions = append(createdProdJob1.Status.Conditions, @@ -1075,7 +1075,7 @@ var _ = ginkgo.Describe("Job controller interacting with scheduler", ginkgo.Orde }, util.Timeout, util.Interval).Should(gomega.Equal(ptr.To(false))) gomega.Expect(createdProdJob2.Spec.Template.Spec.NodeSelector[instanceKey]).Should(gomega.Equal(onDemandFlavor.Name)) util.ExpectPendingWorkloadsMetric(prodClusterQ, 0, 0) - util.ExpectAdmittedActiveWorkloadsMetric(prodClusterQ, 1) + util.ExpectReservingActiveWorkloadsMetric(prodClusterQ, 1) }) ginkgo.It("Should unsuspend job iff localQueue is in the same namespace", func() { @@ -1321,7 +1321,7 @@ var _ = ginkgo.Describe("Job controller interacting with scheduler", ginkgo.Orde }, util.Timeout, util.Interval).Should(gomega.Equal(ptr.To(false))) gomega.Expect(createdJob1.Spec.Template.Spec.NodeSelector[instanceKey]).Should(gomega.Equal(onDemandFlavor.Name)) util.ExpectPendingWorkloadsMetric(prodClusterQ, 0, 0) - util.ExpectAdmittedActiveWorkloadsMetric(prodClusterQ, 1) + util.ExpectReservingActiveWorkloadsMetric(prodClusterQ, 1) }) job2 := testingjob.MakeJob("job2", ns.Name).Queue(prodLocalQ.Name).Request(corev1.ResourceCPU, "3").Obj() @@ -1335,7 +1335,7 @@ var _ = ginkgo.Describe("Job controller interacting with scheduler", ginkgo.Orde return createdJob2.Spec.Suspend }, util.ConsistentDuration, util.Interval).Should(gomega.Equal(ptr.To(true))) util.ExpectPendingWorkloadsMetric(prodClusterQ, 0, 1) - util.ExpectAdmittedActiveWorkloadsMetric(prodClusterQ, 1) + util.ExpectReservingActiveWorkloadsMetric(prodClusterQ, 1) }) ginkgo.By("checking the second job starts when the first has less then to completions to go", func() { @@ -1363,7 +1363,7 @@ var _ = ginkgo.Describe("Job controller interacting with scheduler", ginkgo.Orde gomega.Expect(createdJob2.Spec.Template.Spec.NodeSelector[instanceKey]).Should(gomega.Equal(onDemandFlavor.Name)) util.ExpectPendingWorkloadsMetric(prodClusterQ, 0, 0) - util.ExpectAdmittedActiveWorkloadsMetric(prodClusterQ, 2) + util.ExpectReservingActiveWorkloadsMetric(prodClusterQ, 2) }) }) diff --git a/test/integration/controller/jobs/jobset/jobset_controller_test.go b/test/integration/controller/jobs/jobset/jobset_controller_test.go index c95f9c75ba..0dea3e4b6c 100644 --- a/test/integration/controller/jobs/jobset/jobset_controller_test.go +++ b/test/integration/controller/jobs/jobset/jobset_controller_test.go @@ -881,7 +881,7 @@ var _ = ginkgo.Describe("JobSet controller interacting with scheduler", ginkgo.O gomega.Expect(createdJobSet.Spec.ReplicatedJobs[0].Template.Spec.Template.Spec.NodeSelector[instanceKey]).Should(gomega.Equal(spotUntaintedFlavor.Name)) gomega.Expect(createdJobSet.Spec.ReplicatedJobs[1].Template.Spec.Template.Spec.NodeSelector[instanceKey]).Should(gomega.Equal(onDemandFlavor.Name)) util.ExpectPendingWorkloadsMetric(clusterQueue, 0, 0) - util.ExpectAdmittedActiveWorkloadsMetric(clusterQueue, 1) + util.ExpectReservingActiveWorkloadsMetric(clusterQueue, 1) }) @@ -917,7 +917,7 @@ var _ = ginkgo.Describe("JobSet controller interacting with scheduler", ginkgo.O return createdJobSet1.Spec.Suspend }, util.Timeout, util.Interval).Should(gomega.Equal(ptr.To(false))) util.ExpectPendingWorkloadsMetric(clusterQueue, 0, 0) - util.ExpectAdmittedActiveWorkloadsMetric(clusterQueue, 1) + util.ExpectReservingActiveWorkloadsMetric(clusterQueue, 1) }) jobSet2 := testingjobset.MakeJobSet("dev-jobset2", ns.Name).ReplicatedJobs( @@ -947,7 +947,7 @@ var _ = ginkgo.Describe("JobSet controller interacting with scheduler", ginkgo.O return createdJobSet2.Spec.Suspend }, util.ConsistentDuration, util.Interval).Should(gomega.Equal(ptr.To(true))) util.ExpectPendingWorkloadsMetric(clusterQueue, 0, 1) - util.ExpectAdmittedActiveWorkloadsMetric(clusterQueue, 1) + util.ExpectReservingActiveWorkloadsMetric(clusterQueue, 1) }) ginkgo.By("checking the second job starts when the first one needs less then two cpus", func() { @@ -988,7 +988,7 @@ var _ = ginkgo.Describe("JobSet controller interacting with scheduler", ginkgo.O return createdJobSet2.Spec.Suspend }, util.Timeout, util.Interval).Should(gomega.Equal(ptr.To(false))) util.ExpectPendingWorkloadsMetric(clusterQueue, 0, 0) - util.ExpectAdmittedActiveWorkloadsMetric(clusterQueue, 2) + util.ExpectReservingActiveWorkloadsMetric(clusterQueue, 2) }) }) }) diff --git a/test/integration/controller/jobs/mpijob/mpijob_controller_test.go b/test/integration/controller/jobs/mpijob/mpijob_controller_test.go index ed8dc2b35d..3e45f4abfa 100644 --- a/test/integration/controller/jobs/mpijob/mpijob_controller_test.go +++ b/test/integration/controller/jobs/mpijob/mpijob_controller_test.go @@ -877,7 +877,7 @@ var _ = ginkgo.Describe("Job controller interacting with scheduler", ginkgo.Orde gomega.Expect(createdJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeLauncher].Template.Spec.NodeSelector[instanceKey]).Should(gomega.Equal(spotUntaintedFlavor.Name)) gomega.Expect(createdJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeWorker].Template.Spec.NodeSelector[instanceKey]).Should(gomega.Equal(onDemandFlavor.Name)) util.ExpectPendingWorkloadsMetric(clusterQueue, 0, 0) - util.ExpectAdmittedActiveWorkloadsMetric(clusterQueue, 1) + util.ExpectReservingActiveWorkloadsMetric(clusterQueue, 1) }) diff --git a/test/integration/controller/jobs/mxjob/mxjob_controller_test.go b/test/integration/controller/jobs/mxjob/mxjob_controller_test.go index 1f1f09e209..75112836ec 100644 --- a/test/integration/controller/jobs/mxjob/mxjob_controller_test.go +++ b/test/integration/controller/jobs/mxjob/mxjob_controller_test.go @@ -593,6 +593,6 @@ var _ = ginkgo.Describe("Job controller interacting with scheduler", ginkgo.Orde gomega.Expect(createdJob.Spec.MXReplicaSpecs[kftraining.MXJobReplicaTypeServer].Template.Spec.NodeSelector[instanceKey]).Should(gomega.Equal(spotUntaintedFlavor.Name)) gomega.Expect(createdJob.Spec.MXReplicaSpecs[kftraining.MXJobReplicaTypeWorker].Template.Spec.NodeSelector[instanceKey]).Should(gomega.Equal(onDemandFlavor.Name)) util.ExpectPendingWorkloadsMetric(clusterQueue, 0, 0) - util.ExpectAdmittedActiveWorkloadsMetric(clusterQueue, 1) + util.ExpectReservingActiveWorkloadsMetric(clusterQueue, 1) }) }) diff --git a/test/integration/controller/jobs/paddlejob/paddlejob_controller_test.go b/test/integration/controller/jobs/paddlejob/paddlejob_controller_test.go index 7d3e221945..c0d85bf859 100644 --- a/test/integration/controller/jobs/paddlejob/paddlejob_controller_test.go +++ b/test/integration/controller/jobs/paddlejob/paddlejob_controller_test.go @@ -566,6 +566,6 @@ var _ = ginkgo.Describe("Job controller interacting with scheduler", ginkgo.Orde gomega.Expect(createdJob.Spec.PaddleReplicaSpecs[kftraining.PaddleJobReplicaTypeMaster].Template.Spec.NodeSelector[instanceKey]).Should(gomega.Equal(spotUntaintedFlavor.Name)) gomega.Expect(createdJob.Spec.PaddleReplicaSpecs[kftraining.PaddleJobReplicaTypeWorker].Template.Spec.NodeSelector[instanceKey]).Should(gomega.Equal(onDemandFlavor.Name)) util.ExpectPendingWorkloadsMetric(clusterQueue, 0, 0) - util.ExpectAdmittedActiveWorkloadsMetric(clusterQueue, 1) + util.ExpectReservingActiveWorkloadsMetric(clusterQueue, 1) }) }) diff --git a/test/integration/controller/jobs/pod/pod_controller_test.go b/test/integration/controller/jobs/pod/pod_controller_test.go index 32df544af1..1eaa4ebabf 100644 --- a/test/integration/controller/jobs/pod/pod_controller_test.go +++ b/test/integration/controller/jobs/pod/pod_controller_test.go @@ -510,7 +510,7 @@ var _ = ginkgo.Describe("Pod controller interacting with scheduler", ginkgo.Orde }, util.Timeout, util.Interval).Should(gomega.BeEmpty()) gomega.Expect(createdPod.Spec.NodeSelector[instanceKey]).Should(gomega.Equal(spotUntaintedFlavor.Name)) util.ExpectPendingWorkloadsMetric(clusterQueue, 0, 0) - util.ExpectAdmittedActiveWorkloadsMetric(clusterQueue, 1) + util.ExpectReservingActiveWorkloadsMetric(clusterQueue, 1) }) ginkgo.When("The workload's admission is removed", func() { diff --git a/test/integration/controller/jobs/pytorchjob/pytorchjob_controller_test.go b/test/integration/controller/jobs/pytorchjob/pytorchjob_controller_test.go index c6c960ffc0..1f016ae3d9 100644 --- a/test/integration/controller/jobs/pytorchjob/pytorchjob_controller_test.go +++ b/test/integration/controller/jobs/pytorchjob/pytorchjob_controller_test.go @@ -799,7 +799,7 @@ var _ = ginkgo.Describe("Job controller interacting with scheduler", ginkgo.Orde gomega.Expect(createdJob.Spec.PyTorchReplicaSpecs[kftraining.PyTorchJobReplicaTypeMaster].Template.Spec.NodeSelector[instanceKey]).Should(gomega.Equal(spotUntaintedFlavor.Name)) gomega.Expect(createdJob.Spec.PyTorchReplicaSpecs[kftraining.PyTorchJobReplicaTypeWorker].Template.Spec.NodeSelector[instanceKey]).Should(gomega.Equal(onDemandFlavor.Name)) util.ExpectPendingWorkloadsMetric(clusterQueue, 0, 0) - util.ExpectAdmittedActiveWorkloadsMetric(clusterQueue, 1) + util.ExpectReservingActiveWorkloadsMetric(clusterQueue, 1) }) ginkgo.When("The workload's admission is removed", func() { diff --git a/test/integration/controller/jobs/rayjob/rayjob_controller_test.go b/test/integration/controller/jobs/rayjob/rayjob_controller_test.go index e6177df94e..076ec459a2 100644 --- a/test/integration/controller/jobs/rayjob/rayjob_controller_test.go +++ b/test/integration/controller/jobs/rayjob/rayjob_controller_test.go @@ -597,7 +597,7 @@ var _ = ginkgo.Describe("Job controller interacting with scheduler", ginkgo.Orde gomega.Expect(createdJob.Spec.RayClusterSpec.HeadGroupSpec.Template.Spec.NodeSelector[instanceKey]).Should(gomega.Equal(spotUntaintedFlavor.Name)) gomega.Expect(createdJob.Spec.RayClusterSpec.WorkerGroupSpecs[0].Template.Spec.NodeSelector[instanceKey]).Should(gomega.Equal(onDemandFlavor.Name)) util.ExpectPendingWorkloadsMetric(clusterQueue, 0, 0) - util.ExpectAdmittedActiveWorkloadsMetric(clusterQueue, 1) + util.ExpectReservingActiveWorkloadsMetric(clusterQueue, 1) }) }) diff --git a/test/integration/controller/jobs/tfjob/tfjob_controller_test.go b/test/integration/controller/jobs/tfjob/tfjob_controller_test.go index 49837a43eb..d1daf0327f 100644 --- a/test/integration/controller/jobs/tfjob/tfjob_controller_test.go +++ b/test/integration/controller/jobs/tfjob/tfjob_controller_test.go @@ -593,6 +593,6 @@ var _ = ginkgo.Describe("Job controller interacting with scheduler", ginkgo.Orde gomega.Expect(createdJob.Spec.TFReplicaSpecs[kftraining.TFJobReplicaTypePS].Template.Spec.NodeSelector[instanceKey]).Should(gomega.Equal(spotUntaintedFlavor.Name)) gomega.Expect(createdJob.Spec.TFReplicaSpecs[kftraining.TFJobReplicaTypeWorker].Template.Spec.NodeSelector[instanceKey]).Should(gomega.Equal(onDemandFlavor.Name)) util.ExpectPendingWorkloadsMetric(clusterQueue, 0, 0) - util.ExpectAdmittedActiveWorkloadsMetric(clusterQueue, 1) + util.ExpectReservingActiveWorkloadsMetric(clusterQueue, 1) }) }) diff --git a/test/integration/controller/jobs/xgboostjob/xgboostjob_controller_test.go b/test/integration/controller/jobs/xgboostjob/xgboostjob_controller_test.go index 122e0ade4b..f5111366d5 100644 --- a/test/integration/controller/jobs/xgboostjob/xgboostjob_controller_test.go +++ b/test/integration/controller/jobs/xgboostjob/xgboostjob_controller_test.go @@ -566,6 +566,6 @@ var _ = ginkgo.Describe("Job controller interacting with scheduler", ginkgo.Orde gomega.Expect(createdJob.Spec.XGBReplicaSpecs[kftraining.XGBoostJobReplicaTypeMaster].Template.Spec.NodeSelector[instanceKey]).Should(gomega.Equal(spotUntaintedFlavor.Name)) gomega.Expect(createdJob.Spec.XGBReplicaSpecs[kftraining.XGBoostJobReplicaTypeWorker].Template.Spec.NodeSelector[instanceKey]).Should(gomega.Equal(onDemandFlavor.Name)) util.ExpectPendingWorkloadsMetric(clusterQueue, 0, 0) - util.ExpectAdmittedActiveWorkloadsMetric(clusterQueue, 1) + util.ExpectReservingActiveWorkloadsMetric(clusterQueue, 1) }) }) diff --git a/test/integration/scheduler/podsready/scheduler_test.go b/test/integration/scheduler/podsready/scheduler_test.go index d091186648..23683e2ebe 100644 --- a/test/integration/scheduler/podsready/scheduler_test.go +++ b/test/integration/scheduler/podsready/scheduler_test.go @@ -257,8 +257,16 @@ var _ = ginkgo.Describe("SchedulerWithWaitForPodsReady", func() { gomega.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(prodClusterQ), &updatedCQ)).To(gomega.Succeed()) return updatedCQ.Status }, util.Timeout, util.Interval).Should(gomega.BeComparableTo(kueue.ClusterQueueStatus{ - PendingWorkloads: 0, - AdmittedWorkloads: 1, + PendingWorkloads: 0, + ReservingWorkloads: 1, + AdmittedWorkloads: 1, + FlavorsReservation: []kueue.FlavorUsage{{ + Name: "default", + Resources: []kueue.ResourceUsage{{ + Name: corev1.ResourceCPU, + Total: resource.MustParse("2"), + }}, + }}, FlavorsUsage: []kueue.FlavorUsage{{ Name: "default", Resources: []kueue.ResourceUsage{{ @@ -286,8 +294,16 @@ var _ = ginkgo.Describe("SchedulerWithWaitForPodsReady", func() { gomega.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(prodClusterQ), &updatedCQ)).To(gomega.Succeed()) return updatedCQ.Status }, util.Timeout, util.Interval).Should(gomega.BeComparableTo(kueue.ClusterQueueStatus{ - PendingWorkloads: 1, - AdmittedWorkloads: 0, + PendingWorkloads: 1, + ReservingWorkloads: 0, + AdmittedWorkloads: 0, + FlavorsReservation: []kueue.FlavorUsage{{ + Name: "default", + Resources: []kueue.ResourceUsage{{ + Name: corev1.ResourceCPU, + Total: resource.MustParse("0"), + }}, + }}, FlavorsUsage: []kueue.FlavorUsage{{ Name: "default", Resources: []kueue.ResourceUsage{{ @@ -298,7 +314,7 @@ var _ = ginkgo.Describe("SchedulerWithWaitForPodsReady", func() { }, ignoreCQConditions, ignorePendingWorkloadsStatus)) ginkgo.By("verify the active workload metric is decreased for the cluster queue") - util.ExpectAdmittedActiveWorkloadsMetric(prodClusterQ, 0) + util.ExpectReservingActiveWorkloadsMetric(prodClusterQ, 0) ginkgo.By("wait for the 'dev' workload to get admitted") util.ExpectWorkloadsToHaveQuotaReservation(ctx, k8sClient, devClusterQ.Name, devWl) diff --git a/test/integration/scheduler/scheduler_test.go b/test/integration/scheduler/scheduler_test.go index 183cac564a..50f94d4d56 100644 --- a/test/integration/scheduler/scheduler_test.go +++ b/test/integration/scheduler/scheduler_test.go @@ -162,7 +162,7 @@ var _ = ginkgo.Describe("Scheduler", func() { prodWl1Admission := testing.MakeAdmission(prodClusterQ.Name).Assignment(corev1.ResourceCPU, "on-demand", "2").Obj() util.ExpectWorkloadToBeAdmittedAs(ctx, k8sClient, prodWl1, prodWl1Admission) util.ExpectPendingWorkloadsMetric(prodClusterQ, 0, 0) - util.ExpectAdmittedActiveWorkloadsMetric(prodClusterQ, 1) + util.ExpectReservingActiveWorkloadsMetric(prodClusterQ, 1) util.ExpectAdmittedWorkloadsTotalMetric(prodClusterQ, 1) ginkgo.By("checking a second no-fit workload does not get admitted") @@ -177,7 +177,7 @@ var _ = ginkgo.Describe("Scheduler", func() { spotUntaintedFlavorAdmission := testing.MakeAdmission(devClusterQ.Name).Assignment(corev1.ResourceCPU, "spot-untainted", "5").Obj() util.ExpectWorkloadToBeAdmittedAs(ctx, k8sClient, devWl, spotUntaintedFlavorAdmission) util.ExpectPendingWorkloadsMetric(devClusterQ, 0, 0) - util.ExpectAdmittedActiveWorkloadsMetric(devClusterQ, 1) + util.ExpectReservingActiveWorkloadsMetric(devClusterQ, 1) util.ExpectAdmittedWorkloadsTotalMetric(devClusterQ, 1) ginkgo.By("checking the second workload gets admitted when the first workload finishes") @@ -185,7 +185,7 @@ var _ = ginkgo.Describe("Scheduler", func() { prodWl2Admission := testing.MakeAdmission(prodClusterQ.Name).Assignment(corev1.ResourceCPU, "on-demand", "5").Obj() util.ExpectWorkloadToBeAdmittedAs(ctx, k8sClient, prodWl2, prodWl2Admission) util.ExpectPendingWorkloadsMetric(prodClusterQ, 0, 0) - util.ExpectAdmittedActiveWorkloadsMetric(prodClusterQ, 1) + util.ExpectReservingActiveWorkloadsMetric(prodClusterQ, 1) util.ExpectAdmittedWorkloadsTotalMetric(prodClusterQ, 2) }) @@ -206,7 +206,7 @@ var _ = ginkgo.Describe("Scheduler", func() { Obj() util.ExpectWorkloadToBeAdmittedAs(ctx, k8sClient, wl1, wl1Admission) util.ExpectPendingWorkloadsMetric(podsCountClusterQ, 0, 0) - util.ExpectAdmittedActiveWorkloadsMetric(podsCountClusterQ, 1) + util.ExpectReservingActiveWorkloadsMetric(podsCountClusterQ, 1) util.ExpectAdmittedWorkloadsTotalMetric(podsCountClusterQ, 1) }) @@ -233,7 +233,7 @@ var _ = ginkgo.Describe("Scheduler", func() { util.ExpectWorkloadsToHaveQuotaReservation(ctx, k8sClient, podsCountClusterQ.Name, wl1, wl3) util.ExpectWorkloadsToBePending(ctx, k8sClient, wl2) util.ExpectPendingWorkloadsMetric(podsCountClusterQ, 0, 1) - util.ExpectAdmittedActiveWorkloadsMetric(podsCountClusterQ, 2) + util.ExpectReservingActiveWorkloadsMetric(podsCountClusterQ, 2) util.ExpectAdmittedWorkloadsTotalMetric(podsCountClusterQ, 2) }) @@ -244,7 +244,7 @@ var _ = ginkgo.Describe("Scheduler", func() { ginkgo.By("checking the second workload is also admitted", func() { util.ExpectWorkloadsToHaveQuotaReservation(ctx, k8sClient, podsCountClusterQ.Name, wl2, wl3) util.ExpectPendingWorkloadsMetric(podsCountClusterQ, 0, 0) - util.ExpectAdmittedActiveWorkloadsMetric(podsCountClusterQ, 2) + util.ExpectReservingActiveWorkloadsMetric(podsCountClusterQ, 2) util.ExpectAdmittedWorkloadsTotalMetric(podsCountClusterQ, 3) }) }) @@ -264,7 +264,7 @@ var _ = ginkgo.Describe("Scheduler", func() { Obj() util.ExpectWorkloadToBeAdmittedAs(ctx, k8sClient, wl1, wl1Admission) util.ExpectPendingWorkloadsMetric(podsCountOnlyClusterQ, 0, 0) - util.ExpectAdmittedActiveWorkloadsMetric(podsCountOnlyClusterQ, 1) + util.ExpectReservingActiveWorkloadsMetric(podsCountOnlyClusterQ, 1) util.ExpectAdmittedWorkloadsTotalMetric(podsCountOnlyClusterQ, 1) }) @@ -289,7 +289,7 @@ var _ = ginkgo.Describe("Scheduler", func() { util.ExpectWorkloadsToHaveQuotaReservation(ctx, k8sClient, podsCountOnlyClusterQ.Name, wl1, wl3) util.ExpectWorkloadsToBePending(ctx, k8sClient, wl2) util.ExpectPendingWorkloadsMetric(podsCountOnlyClusterQ, 0, 1) - util.ExpectAdmittedActiveWorkloadsMetric(podsCountOnlyClusterQ, 2) + util.ExpectReservingActiveWorkloadsMetric(podsCountOnlyClusterQ, 2) util.ExpectAdmittedWorkloadsTotalMetric(podsCountOnlyClusterQ, 2) }) @@ -300,7 +300,7 @@ var _ = ginkgo.Describe("Scheduler", func() { ginkgo.By("checking the second workload is also admitted", func() { util.ExpectWorkloadsToHaveQuotaReservation(ctx, k8sClient, podsCountOnlyClusterQ.Name, wl2, wl3) util.ExpectPendingWorkloadsMetric(podsCountOnlyClusterQ, 0, 0) - util.ExpectAdmittedActiveWorkloadsMetric(podsCountOnlyClusterQ, 2) + util.ExpectReservingActiveWorkloadsMetric(podsCountOnlyClusterQ, 2) util.ExpectAdmittedWorkloadsTotalMetric(podsCountOnlyClusterQ, 3) }) }) @@ -322,7 +322,7 @@ var _ = ginkgo.Describe("Scheduler", func() { gomega.Expect(k8sClient.Create(ctx, wlHigh2)).Should(gomega.Succeed()) util.ExpectPendingWorkloadsMetric(prodClusterQ, 0, 0) - util.ExpectAdmittedActiveWorkloadsMetric(prodClusterQ, 0) + util.ExpectReservingActiveWorkloadsMetric(prodClusterQ, 0) // delay creating the queue until after workloads are created. gomega.Expect(k8sClient.Create(ctx, queue)).Should(gomega.Succeed()) @@ -333,7 +333,7 @@ var _ = ginkgo.Describe("Scheduler", func() { util.ExpectWorkloadsToHaveQuotaReservation(ctx, k8sClient, prodClusterQ.Name, wlHigh1, wlHigh2) util.ExpectPendingWorkloadsMetric(prodClusterQ, 0, 3) - util.ExpectAdmittedActiveWorkloadsMetric(prodClusterQ, 2) + util.ExpectReservingActiveWorkloadsMetric(prodClusterQ, 2) util.ExpectAdmittedWorkloadsTotalMetric(prodClusterQ, 2) ginkgo.By("after the high priority workloads finish, only the mid priority workloads should be admitted") @@ -341,7 +341,7 @@ var _ = ginkgo.Describe("Scheduler", func() { util.ExpectWorkloadsToHaveQuotaReservation(ctx, k8sClient, prodClusterQ.Name, wlMid1, wlMid2) util.ExpectPendingWorkloadsMetric(prodClusterQ, 0, 1) - util.ExpectAdmittedActiveWorkloadsMetric(prodClusterQ, 2) + util.ExpectReservingActiveWorkloadsMetric(prodClusterQ, 2) util.ExpectAdmittedWorkloadsTotalMetric(prodClusterQ, 4) }) @@ -352,7 +352,7 @@ var _ = ginkgo.Describe("Scheduler", func() { util.ExpectWorkloadsToHaveQuotaReservation(ctx, k8sClient, prodClusterQ.Name, bigWl) util.ExpectPendingWorkloadsMetric(prodClusterQ, 0, 0) - util.ExpectAdmittedActiveWorkloadsMetric(prodClusterQ, 1) + util.ExpectReservingActiveWorkloadsMetric(prodClusterQ, 1) util.ExpectAdmittedWorkloadsTotalMetric(prodClusterQ, 1) smallWl1 := testing.MakeWorkload("small-wl-1", ns.Name).Queue(prodQueue.Name).Request(corev1.ResourceCPU, "2.5").Obj() @@ -363,14 +363,14 @@ var _ = ginkgo.Describe("Scheduler", func() { util.ExpectWorkloadsToBePending(ctx, k8sClient, smallWl1, smallWl2) util.ExpectPendingWorkloadsMetric(prodClusterQ, 0, 2) - util.ExpectAdmittedActiveWorkloadsMetric(prodClusterQ, 1) + util.ExpectReservingActiveWorkloadsMetric(prodClusterQ, 1) ginkgo.By("Marking the big workload as finished") util.FinishWorkloads(ctx, k8sClient, bigWl) util.ExpectWorkloadsToHaveQuotaReservation(ctx, k8sClient, prodClusterQ.Name, smallWl1, smallWl2) util.ExpectPendingWorkloadsMetric(prodClusterQ, 0, 0) - util.ExpectAdmittedActiveWorkloadsMetric(prodClusterQ, 2) + util.ExpectReservingActiveWorkloadsMetric(prodClusterQ, 2) util.ExpectAdmittedWorkloadsTotalMetric(prodClusterQ, 3) }) @@ -383,7 +383,7 @@ var _ = ginkgo.Describe("Scheduler", func() { util.ExpectWorkloadsToHaveQuotaReservation(ctx, k8sClient, prodClusterQ.Name, firstWl) util.ExpectPendingWorkloadsMetric(prodClusterQ, 0, 0) - util.ExpectAdmittedActiveWorkloadsMetric(prodClusterQ, 1) + util.ExpectReservingActiveWorkloadsMetric(prodClusterQ, 1) }) secondWl := testing.MakeWorkload("second-wl", ns.Name).Queue(prodQueue.Name).Request(corev1.ResourceCPU, "3").Obj() @@ -392,7 +392,7 @@ var _ = ginkgo.Describe("Scheduler", func() { util.ExpectWorkloadsToBePending(ctx, k8sClient, secondWl) util.ExpectPendingWorkloadsMetric(prodClusterQ, 0, 1) - util.ExpectAdmittedActiveWorkloadsMetric(prodClusterQ, 1) + util.ExpectReservingActiveWorkloadsMetric(prodClusterQ, 1) }) ginkgo.By("Reclaim one pod from the first workload", func() { @@ -400,7 +400,7 @@ var _ = ginkgo.Describe("Scheduler", func() { util.ExpectWorkloadsToHaveQuotaReservation(ctx, k8sClient, prodClusterQ.Name, firstWl, secondWl) util.ExpectPendingWorkloadsMetric(prodClusterQ, 0, 0) - util.ExpectAdmittedActiveWorkloadsMetric(prodClusterQ, 2) + util.ExpectReservingActiveWorkloadsMetric(prodClusterQ, 2) }) }) @@ -413,7 +413,7 @@ var _ = ginkgo.Describe("Scheduler", func() { util.ExpectWorkloadsToHaveQuotaReservation(ctx, k8sClient, prodClusterQ.Name, firstWl) util.ExpectPendingWorkloadsMetric(prodClusterQ, 0, 0) - util.ExpectAdmittedActiveWorkloadsMetric(prodClusterQ, 1) + util.ExpectReservingActiveWorkloadsMetric(prodClusterQ, 1) }) secondWl := testing.MakeWorkload("second-wl", ns.Name).Queue(prodQueue.Name). @@ -432,7 +432,7 @@ var _ = ginkgo.Describe("Scheduler", func() { util.ExpectWorkloadsToBePending(ctx, k8sClient, secondWl) util.ExpectPendingWorkloadsMetric(prodClusterQ, 0, 1) - util.ExpectAdmittedActiveWorkloadsMetric(prodClusterQ, 1) + util.ExpectReservingActiveWorkloadsMetric(prodClusterQ, 1) }) ginkgo.By("Reclaim all the pods from the second PodSet", func() { @@ -440,7 +440,7 @@ var _ = ginkgo.Describe("Scheduler", func() { util.ExpectWorkloadsToHaveQuotaReservation(ctx, k8sClient, prodClusterQ.Name, firstWl, secondWl) util.ExpectPendingWorkloadsMetric(prodClusterQ, 0, 0) - util.ExpectAdmittedActiveWorkloadsMetric(prodClusterQ, 2) + util.ExpectReservingActiveWorkloadsMetric(prodClusterQ, 2) }) }) @@ -453,14 +453,14 @@ var _ = ginkgo.Describe("Scheduler", func() { util.ExpectWorkloadsToBePending(ctx, k8sClient, wl) util.ExpectPendingWorkloadsMetric(prodClusterQ, 0, 1) - util.ExpectAdmittedActiveWorkloadsMetric(prodClusterQ, 0) + util.ExpectReservingActiveWorkloadsMetric(prodClusterQ, 0) }) ginkgo.By("Mark one pod as reclaimable", func() { gomega.Expect(workload.UpdateReclaimablePods(ctx, k8sClient, wl, []kueue.ReclaimablePod{{Name: "main", Count: 1}})).To(gomega.Succeed()) util.ExpectWorkloadsToHaveQuotaReservation(ctx, k8sClient, prodClusterQ.Name, wl) util.ExpectPendingWorkloadsMetric(prodClusterQ, 0, 0) - util.ExpectAdmittedActiveWorkloadsMetric(prodClusterQ, 1) + util.ExpectReservingActiveWorkloadsMetric(prodClusterQ, 1) createWl := &kueue.Workload{} gomega.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(wl), createWl)).To(gomega.Succeed()) @@ -505,7 +505,7 @@ var _ = ginkgo.Describe("Scheduler", func() { expectWl1Admission := testing.MakeAdmission(cq.Name).Assignment(corev1.ResourceCPU, "on-demand", "4").Obj() util.ExpectWorkloadToBeAdmittedAs(ctx, k8sClient, wl1, expectWl1Admission) util.ExpectPendingWorkloadsMetric(cq, 0, 0) - util.ExpectAdmittedActiveWorkloadsMetric(cq, 1) + util.ExpectReservingActiveWorkloadsMetric(cq, 1) util.ExpectAdmittedWorkloadsTotalMetric(cq, 1) ginkgo.By("Second big workload is pending") @@ -513,7 +513,7 @@ var _ = ginkgo.Describe("Scheduler", func() { gomega.Expect(k8sClient.Create(ctx, wl2)).Should(gomega.Succeed()) util.ExpectWorkloadsToBePending(ctx, k8sClient, wl2) util.ExpectPendingWorkloadsMetric(cq, 0, 1) - util.ExpectAdmittedActiveWorkloadsMetric(cq, 1) + util.ExpectReservingActiveWorkloadsMetric(cq, 1) util.ExpectAdmittedWorkloadsTotalMetric(cq, 1) ginkgo.By("Third small workload starts") @@ -522,7 +522,7 @@ var _ = ginkgo.Describe("Scheduler", func() { expectWl3Admission := testing.MakeAdmission(cq.Name).Assignment(corev1.ResourceCPU, "on-demand", "1").Obj() util.ExpectWorkloadToBeAdmittedAs(ctx, k8sClient, wl3, expectWl3Admission) util.ExpectPendingWorkloadsMetric(cq, 0, 1) - util.ExpectAdmittedActiveWorkloadsMetric(cq, 2) + util.ExpectReservingActiveWorkloadsMetric(cq, 2) util.ExpectAdmittedWorkloadsTotalMetric(cq, 2) ginkgo.By("Second big workload starts after the first one is deleted") @@ -530,7 +530,7 @@ var _ = ginkgo.Describe("Scheduler", func() { expectWl2Admission := testing.MakeAdmission(cq.Name).Assignment(corev1.ResourceCPU, "on-demand", "4").Obj() util.ExpectWorkloadToBeAdmittedAs(ctx, k8sClient, wl2, expectWl2Admission) util.ExpectPendingWorkloadsMetric(cq, 0, 0) - util.ExpectAdmittedActiveWorkloadsMetric(cq, 2) + util.ExpectReservingActiveWorkloadsMetric(cq, 2) util.ExpectAdmittedWorkloadsTotalMetric(cq, 3) }) @@ -553,7 +553,7 @@ var _ = ginkgo.Describe("Scheduler", func() { expectAdmission := testing.MakeAdmission(fooCQ.Name).Assignment(corev1.ResourceCPU, "on-demand", "8").Obj() util.ExpectWorkloadToBeAdmittedAs(ctx, k8sClient, wl1, expectAdmission) util.ExpectPendingWorkloadsMetric(fooCQ, 0, 0) - util.ExpectAdmittedActiveWorkloadsMetric(fooCQ, 1) + util.ExpectReservingActiveWorkloadsMetric(fooCQ, 1) util.ExpectAdmittedWorkloadsTotalMetric(fooCQ, 1) ginkgo.By("Second big workload is pending") @@ -561,7 +561,7 @@ var _ = ginkgo.Describe("Scheduler", func() { gomega.Expect(k8sClient.Create(ctx, wl2)).Should(gomega.Succeed()) util.ExpectWorkloadsToBePending(ctx, k8sClient, wl2) util.ExpectPendingWorkloadsMetric(cq, 0, 1) - util.ExpectAdmittedActiveWorkloadsMetric(cq, 0) + util.ExpectReservingActiveWorkloadsMetric(cq, 0) util.ExpectAdmittedWorkloadsTotalMetric(cq, 0) ginkgo.By("Third small workload starts") @@ -570,7 +570,7 @@ var _ = ginkgo.Describe("Scheduler", func() { expectAdmission = testing.MakeAdmission(fooCQ.Name).Assignment(corev1.ResourceCPU, "on-demand", "2").Obj() util.ExpectWorkloadToBeAdmittedAs(ctx, k8sClient, wl3, expectAdmission) util.ExpectPendingWorkloadsMetric(fooCQ, 0, 0) - util.ExpectAdmittedActiveWorkloadsMetric(fooCQ, 2) + util.ExpectReservingActiveWorkloadsMetric(fooCQ, 2) util.ExpectAdmittedWorkloadsTotalMetric(fooCQ, 2) ginkgo.By("Second big workload starts after the first one is deleted") @@ -578,7 +578,7 @@ var _ = ginkgo.Describe("Scheduler", func() { expectAdmission = testing.MakeAdmission(cq.Name).Assignment(corev1.ResourceCPU, "on-demand", "8").Obj() util.ExpectWorkloadToBeAdmittedAs(ctx, k8sClient, wl2, expectAdmission) util.ExpectPendingWorkloadsMetric(cq, 0, 0) - util.ExpectAdmittedActiveWorkloadsMetric(cq, 1) + util.ExpectReservingActiveWorkloadsMetric(cq, 1) util.ExpectAdmittedWorkloadsTotalMetric(cq, 1) }) }) @@ -610,7 +610,7 @@ var _ = ginkgo.Describe("Scheduler", func() { gomega.Expect(k8sClient.Create(ctx, wl)).Should(gomega.Succeed()) util.ExpectWorkloadsToBePending(ctx, k8sClient, wl) util.ExpectPendingWorkloadsMetric(cq, 0, 1) - util.ExpectAdmittedActiveWorkloadsMetric(cq, 0) + util.ExpectReservingActiveWorkloadsMetric(cq, 0) util.ExpectAdmittedWorkloadsTotalMetric(cq, 0) ginkgo.By("updating ClusterQueue") @@ -632,7 +632,7 @@ var _ = ginkgo.Describe("Scheduler", func() { expectAdmission := testing.MakeAdmission(cq.Name).Assignment(corev1.ResourceCPU, "on-demand", "6").Obj() util.ExpectWorkloadToBeAdmittedAs(ctx, k8sClient, wl, expectAdmission) util.ExpectPendingWorkloadsMetric(cq, 0, 0) - util.ExpectAdmittedActiveWorkloadsMetric(cq, 1) + util.ExpectReservingActiveWorkloadsMetric(cq, 1) util.ExpectAdmittedWorkloadsTotalMetric(cq, 1) }) }) @@ -689,14 +689,14 @@ var _ = ginkgo.Describe("Scheduler", func() { gomega.Expect(k8sClient.Create(ctx, wl2)).Should(gomega.Succeed()) util.ExpectWorkloadsToBePending(ctx, k8sClient, wl1, wl2) util.ExpectPendingWorkloadsMetric(cq, 0, 2) - util.ExpectAdmittedActiveWorkloadsMetric(cq, 0) + util.ExpectReservingActiveWorkloadsMetric(cq, 0) util.ExpectAdmittedWorkloadsTotalMetric(cq, 0) ginkgo.By("checking the first workload gets admitted after updating the namespace labels to match CQ selector") ns.Labels = map[string]string{"dep": "eng"} gomega.Expect(k8sClient.Update(ctx, ns)).Should(gomega.Succeed()) util.ExpectWorkloadsToHaveQuotaReservation(ctx, k8sClient, cq.Name, wl1) - util.ExpectAdmittedActiveWorkloadsMetric(cq, 1) + util.ExpectReservingActiveWorkloadsMetric(cq, 1) util.ExpectAdmittedWorkloadsTotalMetric(cq, 1) util.ExpectPendingWorkloadsMetric(cq, 0, 1) }) @@ -730,7 +730,7 @@ var _ = ginkgo.Describe("Scheduler", func() { gomega.Expect(k8sClient.Create(ctx, wl)).Should(gomega.Succeed()) util.ExpectWorkloadsToBeFrozen(ctx, k8sClient, fooCQ.Name, wl) util.ExpectPendingWorkloadsMetric(fooCQ, 0, 1) - util.ExpectAdmittedActiveWorkloadsMetric(fooCQ, 0) + util.ExpectReservingActiveWorkloadsMetric(fooCQ, 0) util.ExpectAdmittedWorkloadsTotalMetric(fooCQ, 0) ginkgo.By("Creating foo flavor") @@ -742,7 +742,7 @@ var _ = ginkgo.Describe("Scheduler", func() { util.ExpectClusterQueueStatusMetric(fooCQ, metrics.CQStatusActive) util.ExpectWorkloadsToHaveQuotaReservation(ctx, k8sClient, fooCQ.Name, wl) util.ExpectPendingWorkloadsMetric(fooCQ, 0, 0) - util.ExpectAdmittedActiveWorkloadsMetric(fooCQ, 1) + util.ExpectReservingActiveWorkloadsMetric(fooCQ, 1) util.ExpectAdmittedWorkloadsTotalMetric(fooCQ, 1) }) }) @@ -785,7 +785,7 @@ var _ = ginkgo.Describe("Scheduler", func() { expectAdmission := testing.MakeAdmission(cq.Name).Assignment(corev1.ResourceCPU, "on-demand", "5").Obj() util.ExpectWorkloadToBeAdmittedAs(ctx, k8sClient, wl1, expectAdmission) util.ExpectPendingWorkloadsMetric(cq, 0, 0) - util.ExpectAdmittedActiveWorkloadsMetric(cq, 1) + util.ExpectReservingActiveWorkloadsMetric(cq, 1) util.ExpectAdmittedWorkloadsTotalMetric(cq, 1) ginkgo.By("checking a second workload without toleration doesn't start") @@ -793,7 +793,7 @@ var _ = ginkgo.Describe("Scheduler", func() { gomega.Expect(k8sClient.Create(ctx, wl2)).Should(gomega.Succeed()) util.ExpectWorkloadsToBePending(ctx, k8sClient, wl2) util.ExpectPendingWorkloadsMetric(cq, 0, 1) - util.ExpectAdmittedActiveWorkloadsMetric(cq, 1) + util.ExpectReservingActiveWorkloadsMetric(cq, 1) util.ExpectAdmittedWorkloadsTotalMetric(cq, 1) ginkgo.By("checking a third workload with toleration starts") @@ -803,7 +803,7 @@ var _ = ginkgo.Describe("Scheduler", func() { expectAdmission = testing.MakeAdmission(cq.Name).Assignment(corev1.ResourceCPU, "spot-tainted", "5").Obj() util.ExpectWorkloadToBeAdmittedAs(ctx, k8sClient, wl3, expectAdmission) util.ExpectPendingWorkloadsMetric(cq, 0, 1) - util.ExpectAdmittedActiveWorkloadsMetric(cq, 2) + util.ExpectReservingActiveWorkloadsMetric(cq, 2) util.ExpectAdmittedWorkloadsTotalMetric(cq, 2) }) }) @@ -842,7 +842,7 @@ var _ = ginkgo.Describe("Scheduler", func() { gomega.Expect(k8sClient.Create(ctx, wl1)).Should(gomega.Succeed()) expectAdmission := testing.MakeAdmission(cq.Name).Assignment(corev1.ResourceCPU, "spot-untainted", "1").Obj() util.ExpectWorkloadToBeAdmittedAs(ctx, k8sClient, wl1, expectAdmission) - util.ExpectAdmittedActiveWorkloadsMetric(cq, 1) + util.ExpectReservingActiveWorkloadsMetric(cq, 1) util.ExpectAdmittedWorkloadsTotalMetric(cq, 1) util.ExpectPendingWorkloadsMetric(cq, 0, 0) @@ -855,7 +855,7 @@ var _ = ginkgo.Describe("Scheduler", func() { expectAdmission = testing.MakeAdmission(cq.Name).Assignment(corev1.ResourceCPU, "on-demand", "1").Obj() util.ExpectWorkloadToBeAdmittedAs(ctx, k8sClient, wl2, expectAdmission) util.ExpectPendingWorkloadsMetric(cq, 0, 0) - util.ExpectAdmittedActiveWorkloadsMetric(cq, 2) + util.ExpectReservingActiveWorkloadsMetric(cq, 2) util.ExpectAdmittedWorkloadsTotalMetric(cq, 2) }) }) @@ -955,7 +955,7 @@ var _ = ginkgo.Describe("Scheduler", func() { gomega.Expect(k8sClient.Create(ctx, wl)).Should(gomega.Succeed()) util.ExpectWorkloadsToBePending(ctx, k8sClient, wl) util.ExpectPendingWorkloadsMetric(prodCQ, 0, 1) - util.ExpectAdmittedActiveWorkloadsMetric(prodCQ, 0) + util.ExpectReservingActiveWorkloadsMetric(prodCQ, 0) util.ExpectAdmittedWorkloadsTotalMetric(prodCQ, 0) ginkgo.By("checking the workload gets admitted when a fallback ClusterQueue gets added") @@ -973,7 +973,7 @@ var _ = ginkgo.Describe("Scheduler", func() { expectAdmission := testing.MakeAdmission(prodCQ.Name).Assignment(corev1.ResourceCPU, "on-demand", "10").Obj() util.ExpectWorkloadToBeAdmittedAs(ctx, k8sClient, wl, expectAdmission) util.ExpectPendingWorkloadsMetric(prodCQ, 0, 0) - util.ExpectAdmittedActiveWorkloadsMetric(prodCQ, 1) + util.ExpectReservingActiveWorkloadsMetric(prodCQ, 1) util.ExpectAdmittedWorkloadsTotalMetric(prodCQ, 1) }) @@ -1004,8 +1004,8 @@ var _ = ginkgo.Describe("Scheduler", func() { util.ExpectWorkloadsToBePending(ctx, k8sClient, wl1, wl2) util.ExpectPendingWorkloadsMetric(prodCQ, 0, 1) util.ExpectPendingWorkloadsMetric(devCQ, 0, 1) - util.ExpectAdmittedActiveWorkloadsMetric(prodCQ, 0) - util.ExpectAdmittedActiveWorkloadsMetric(devCQ, 0) + util.ExpectReservingActiveWorkloadsMetric(prodCQ, 0) + util.ExpectReservingActiveWorkloadsMetric(devCQ, 0) util.ExpectAdmittedWorkloadsTotalMetric(prodCQ, 0) util.ExpectAdmittedWorkloadsTotalMetric(devCQ, 0) @@ -1024,8 +1024,8 @@ var _ = ginkgo.Describe("Scheduler", func() { util.ExpectWorkloadsToHaveQuotaReservation(ctx, k8sClient, devCQ.Name, wl2) util.ExpectPendingWorkloadsMetric(prodCQ, 0, 0) util.ExpectPendingWorkloadsMetric(devCQ, 0, 0) - util.ExpectAdmittedActiveWorkloadsMetric(prodCQ, 1) - util.ExpectAdmittedActiveWorkloadsMetric(devCQ, 1) + util.ExpectReservingActiveWorkloadsMetric(prodCQ, 1) + util.ExpectReservingActiveWorkloadsMetric(devCQ, 1) util.ExpectAdmittedWorkloadsTotalMetric(prodCQ, 1) util.ExpectAdmittedWorkloadsTotalMetric(devCQ, 1) }) @@ -1108,7 +1108,7 @@ var _ = ginkgo.Describe("Scheduler", func() { prodWl1Admission := testing.MakeAdmission(prodCQ.Name).Assignment(corev1.ResourceCPU, "on-demand", "9").Obj() util.ExpectWorkloadToBeAdmittedAs(ctx, k8sClient, wl1, prodWl1Admission) util.ExpectPendingWorkloadsMetric(prodCQ, 0, 0) - util.ExpectAdmittedActiveWorkloadsMetric(prodCQ, 1) + util.ExpectReservingActiveWorkloadsMetric(prodCQ, 1) util.ExpectAdmittedWorkloadsTotalMetric(prodCQ, 1) ginkgo.By("Creating another workload") @@ -1117,7 +1117,7 @@ var _ = ginkgo.Describe("Scheduler", func() { prodWl2Admission := testing.MakeAdmission(devCQ.Name).Assignment(corev1.ResourceCPU, "spot-tainted", "11").Obj() util.ExpectWorkloadToBeAdmittedAs(ctx, k8sClient, wl2, prodWl2Admission) util.ExpectPendingWorkloadsMetric(devCQ, 0, 0) - util.ExpectAdmittedActiveWorkloadsMetric(devCQ, 1) + util.ExpectReservingActiveWorkloadsMetric(devCQ, 1) util.ExpectAdmittedWorkloadsTotalMetric(devCQ, 1) }) @@ -1153,7 +1153,7 @@ var _ = ginkgo.Describe("Scheduler", func() { gomega.Expect(k8sClient.Create(ctx, wl1)).Should(gomega.Succeed()) gomega.Expect(k8sClient.Create(ctx, wl2)).Should(gomega.Succeed()) util.ExpectPendingWorkloadsMetric(devCQ, 0, 0) - util.ExpectAdmittedActiveWorkloadsMetric(devCQ, 2) + util.ExpectReservingActiveWorkloadsMetric(devCQ, 2) util.ExpectAdmittedWorkloadsTotalMetric(devCQ, 2) ginkgo.By("Creating another workload") @@ -1161,7 +1161,7 @@ var _ = ginkgo.Describe("Scheduler", func() { gomega.Expect(k8sClient.Create(ctx, wl3)).Should(gomega.Succeed()) util.ExpectWorkloadsToBePreempted(ctx, k8sClient, wl1) util.ExpectPendingWorkloadsMetric(prodCQ, 0, 0) - util.ExpectAdmittedActiveWorkloadsMetric(prodCQ, 1) + util.ExpectReservingActiveWorkloadsMetric(prodCQ, 1) util.ExpectAdmittedWorkloadsTotalMetric(prodCQ, 1) }) }) @@ -1230,7 +1230,7 @@ var _ = ginkgo.Describe("Scheduler", func() { return !workload.HasQuotaReservation(wl3) }, util.ConsistentDuration, util.Interval).Should(gomega.Equal(true)) util.ExpectPendingWorkloadsMetric(strictFIFOClusterQ, 2, 0) - util.ExpectAdmittedActiveWorkloadsMetric(strictFIFOClusterQ, 1) + util.ExpectReservingActiveWorkloadsMetric(strictFIFOClusterQ, 1) util.ExpectAdmittedWorkloadsTotalMetric(strictFIFOClusterQ, 1) }) @@ -1284,7 +1284,7 @@ var _ = ginkgo.Describe("Scheduler", func() { gomega.Expect(util.DeleteWorkload(ctx, k8sClient, wl1)).To(gomega.Succeed()) }() util.ExpectWorkloadsToHaveQuotaReservation(ctx, k8sClient, cq.Name, wl1) - util.ExpectAdmittedActiveWorkloadsMetric(cq, 1) + util.ExpectReservingActiveWorkloadsMetric(cq, 1) util.ExpectAdmittedWorkloadsTotalMetric(cq, 1) ginkgo.By("Delete clusterQueue") @@ -1302,7 +1302,7 @@ var _ = ginkgo.Describe("Scheduler", func() { gomega.Expect(util.DeleteWorkload(ctx, k8sClient, wl2)).To(gomega.Succeed()) }() util.ExpectWorkloadsToBeFrozen(ctx, k8sClient, cq.Name, wl2) - util.ExpectAdmittedActiveWorkloadsMetric(cq, 1) + util.ExpectReservingActiveWorkloadsMetric(cq, 1) util.ExpectAdmittedWorkloadsTotalMetric(cq, 1) util.ExpectPendingWorkloadsMetric(cq, 0, 1) }) diff --git a/test/integration/scheduler/workload_controller_test.go b/test/integration/scheduler/workload_controller_test.go index 846f3e247f..09375606db 100644 --- a/test/integration/scheduler/workload_controller_test.go +++ b/test/integration/scheduler/workload_controller_test.go @@ -36,7 +36,7 @@ import ( // +kubebuilder:docs-gen:collapse=Imports var ignoreCqCondition = cmpopts.IgnoreFields(kueue.ClusterQueueStatus{}, "Conditions") -var ignorePendingWorkloadsStatus = cmpopts.IgnoreFields(kueue.ClusterQueueStatus{}, "PendingWorkloadsStatus") +var ignoreInClusterQueueStatus = cmpopts.IgnoreFields(kueue.ClusterQueueStatus{}, "PendingWorkloadsStatus", "FlavorsUsage", "AdmittedWorkloads") var _ = ginkgo.Describe("Workload controller with scheduler", func() { var ( @@ -113,16 +113,16 @@ var _ = ginkgo.Describe("Workload controller with scheduler", func() { gomega.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(clusterQueue), &updatedCQ)).To(gomega.Succeed()) return updatedCQ.Status }, util.Timeout, util.Interval).Should(gomega.BeComparableTo(kueue.ClusterQueueStatus{ - PendingWorkloads: 0, - AdmittedWorkloads: 1, - FlavorsUsage: []kueue.FlavorUsage{{ + PendingWorkloads: 0, + ReservingWorkloads: 1, + FlavorsReservation: []kueue.FlavorUsage{{ Name: kueue.ResourceFlavorReference(onDemandFlavor.Name), Resources: []kueue.ResourceUsage{{ Name: corev1.ResourceCPU, Total: resource.MustParse("2"), }}, }}, - }, ignoreCqCondition, ignorePendingWorkloadsStatus)) + }, ignoreCqCondition, ignoreInClusterQueueStatus)) }) }) }) @@ -168,16 +168,16 @@ var _ = ginkgo.Describe("Workload controller with scheduler", func() { gomega.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(clusterQueue), &updatedCQ)).To(gomega.Succeed()) return updatedCQ.Status }, util.Timeout, util.Interval).Should(gomega.BeComparableTo(kueue.ClusterQueueStatus{ - PendingWorkloads: 0, - AdmittedWorkloads: 1, - FlavorsUsage: []kueue.FlavorUsage{{ + PendingWorkloads: 0, + ReservingWorkloads: 1, + FlavorsReservation: []kueue.FlavorUsage{{ Name: kueue.ResourceFlavorReference(onDemandFlavor.Name), Resources: []kueue.ResourceUsage{{ Name: corev1.ResourceCPU, Total: resource.MustParse("1"), }}, }}, - }, ignoreCqCondition, ignorePendingWorkloadsStatus)) + }, ignoreCqCondition, ignoreInClusterQueueStatus)) }) }) }) @@ -222,9 +222,9 @@ var _ = ginkgo.Describe("Workload controller with scheduler", func() { gomega.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(clusterQueue), &updatedCQ)).To(gomega.Succeed()) return updatedCQ.Status }, util.Timeout, util.Interval).Should(gomega.BeComparableTo(kueue.ClusterQueueStatus{ - PendingWorkloads: 0, - AdmittedWorkloads: 1, - FlavorsUsage: []kueue.FlavorUsage{{ + PendingWorkloads: 0, + ReservingWorkloads: 1, + FlavorsReservation: []kueue.FlavorUsage{{ Name: kueue.ResourceFlavorReference(onDemandFlavor.Name), Resources: []kueue.ResourceUsage{ { @@ -233,7 +233,7 @@ var _ = ginkgo.Describe("Workload controller with scheduler", func() { }, }, }}, - }, ignoreCqCondition, ignorePendingWorkloadsStatus)) + }, ignoreCqCondition, ignoreInClusterQueueStatus)) }) ginkgo.By("Check podSets spec", func() { @@ -264,9 +264,9 @@ var _ = ginkgo.Describe("Workload controller with scheduler", func() { gomega.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(clusterQueue), &updatedCQ)).To(gomega.Succeed()) return updatedCQ.Status }, util.Timeout, util.Interval).Should(gomega.BeComparableTo(kueue.ClusterQueueStatus{ - PendingWorkloads: 0, - AdmittedWorkloads: 1, - FlavorsUsage: []kueue.FlavorUsage{{ + PendingWorkloads: 0, + ReservingWorkloads: 1, + FlavorsReservation: []kueue.FlavorUsage{{ Name: kueue.ResourceFlavorReference(onDemandFlavor.Name), Resources: []kueue.ResourceUsage{ { @@ -275,7 +275,7 @@ var _ = ginkgo.Describe("Workload controller with scheduler", func() { }, }, }}, - }, ignoreCqCondition, ignorePendingWorkloadsStatus)) + }, ignoreCqCondition, ignoreInClusterQueueStatus)) }) ginkgo.By("Check podSets spec", func() { @@ -326,16 +326,16 @@ var _ = ginkgo.Describe("Workload controller with scheduler", func() { gomega.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(clusterQueue), &updatedCQ)).To(gomega.Succeed()) return updatedCQ.Status }, util.Timeout, util.Interval).Should(gomega.BeComparableTo(kueue.ClusterQueueStatus{ - PendingWorkloads: 0, - AdmittedWorkloads: 1, - FlavorsUsage: []kueue.FlavorUsage{{ + PendingWorkloads: 0, + ReservingWorkloads: 1, + FlavorsReservation: []kueue.FlavorUsage{{ Name: kueue.ResourceFlavorReference(onDemandFlavor.Name), Resources: []kueue.ResourceUsage{{ Name: corev1.ResourceCPU, Total: resource.MustParse("1"), }}, }}, - }, ignoreCqCondition, ignorePendingWorkloadsStatus)) + }, ignoreCqCondition, ignoreInClusterQueueStatus)) }) ginkgo.By("Check podSets spec", func() { @@ -429,16 +429,16 @@ var _ = ginkgo.Describe("Workload controller with scheduler", func() { gomega.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(clusterQueue), &updatedCQ)).To(gomega.Succeed()) return updatedCQ.Status }, util.Timeout, util.Interval).Should(gomega.BeComparableTo(kueue.ClusterQueueStatus{ - PendingWorkloads: 0, - AdmittedWorkloads: 2, - FlavorsUsage: []kueue.FlavorUsage{{ + PendingWorkloads: 0, + ReservingWorkloads: 2, + FlavorsReservation: []kueue.FlavorUsage{{ Name: kueue.ResourceFlavorReference(onDemandFlavor.Name), Resources: []kueue.ResourceUsage{{ Name: corev1.ResourceCPU, Total: resource.MustParse("5"), }}, }}, - }, ignoreCqCondition, ignorePendingWorkloadsStatus)) + }, ignoreCqCondition, ignoreInClusterQueueStatus)) }) }) }) @@ -519,16 +519,16 @@ var _ = ginkgo.Describe("Workload controller with scheduler", func() { gomega.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(clusterQueue), &updatedCQ)).To(gomega.Succeed()) return updatedCQ.Status }, util.Timeout, util.Interval).Should(gomega.BeComparableTo(kueue.ClusterQueueStatus{ - PendingWorkloads: 0, - AdmittedWorkloads: 2, - FlavorsUsage: []kueue.FlavorUsage{{ + PendingWorkloads: 0, + ReservingWorkloads: 2, + FlavorsReservation: []kueue.FlavorUsage{{ Name: kueue.ResourceFlavorReference(onDemandFlavor.Name), Resources: []kueue.ResourceUsage{{ Name: corev1.ResourceCPU, Total: resource.MustParse("5"), }}, }}, - }, ignoreCqCondition, ignorePendingWorkloadsStatus)) + }, ignoreCqCondition, ignoreInClusterQueueStatus)) }) }) }) @@ -553,16 +553,16 @@ var _ = ginkgo.Describe("Workload controller with scheduler", func() { gomega.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(clusterQueue), &updatedCQ)).To(gomega.Succeed()) return updatedCQ.Status }, util.Timeout, util.Interval).Should(gomega.BeComparableTo(kueue.ClusterQueueStatus{ - PendingWorkloads: 0, - AdmittedWorkloads: 0, - FlavorsUsage: []kueue.FlavorUsage{{ + PendingWorkloads: 0, + ReservingWorkloads: 0, + FlavorsReservation: []kueue.FlavorUsage{{ Name: kueue.ResourceFlavorReference(onDemandFlavor.Name), Resources: []kueue.ResourceUsage{{ Name: corev1.ResourceCPU, Total: resource.MustParse("0"), }}, }}, - }, ignoreCqCondition, ignorePendingWorkloadsStatus)) + }, ignoreCqCondition, ignoreInClusterQueueStatus)) }) gomega.Expect(util.DeleteNamespace(ctx, k8sClient, ns)).To(gomega.Succeed()) util.ExpectClusterQueueToBeDeleted(ctx, k8sClient, clusterQueue, true) diff --git a/test/util/util.go b/test/util/util.go index df0701f81e..0577fdc288 100644 --- a/test/util/util.go +++ b/test/util/util.go @@ -271,8 +271,8 @@ func ExpectPendingWorkloadsMetric(cq *kueue.ClusterQueue, active, inadmissible i } } -func ExpectAdmittedActiveWorkloadsMetric(cq *kueue.ClusterQueue, v int) { - metric := metrics.AdmittedActiveWorkloads.WithLabelValues(cq.Name) +func ExpectReservingActiveWorkloadsMetric(cq *kueue.ClusterQueue, v int) { + metric := metrics.ReservingActiveWorkloads.WithLabelValues(cq.Name) gomega.EventuallyWithOffset(1, func() int { v, err := testutil.GetGaugeMetricValue(metric) gomega.Expect(err).ToNot(gomega.HaveOccurred()) @@ -358,8 +358,8 @@ func ExpectCQResourceBorrowingQuota(cq *kueue.ClusterQueue, flavor, resource str }, Timeout, Interval).Should(gomega.Equal(v)) } -func ExpectCQResourceUsage(cq *kueue.ClusterQueue, flavor, resource string, v float64) { - metric := metrics.ClusterQueueResourceUsage.WithLabelValues(cq.Spec.Cohort, cq.Name, flavor, resource) +func ExpectCQResourceReservations(cq *kueue.ClusterQueue, flavor, resource string, v float64) { + metric := metrics.ClusterQueueResourceReservations.WithLabelValues(cq.Spec.Cohort, cq.Name, flavor, resource) gomega.EventuallyWithOffset(1, func() float64 { v, err := testutil.GetGaugeMetricValue(metric) gomega.Expect(err).ToNot(gomega.HaveOccurred()) @@ -436,3 +436,14 @@ func SetAdmissionCheckActive(ctx context.Context, k8sClient client.Client, admis return k8sClient.Status().Update(ctx, &updatedAc) }, Timeout, Interval).Should(gomega.Succeed()) } + +func SetWorkloadsAdmissionCkeck(ctx context.Context, k8sClient client.Client, wl *kueue.Workload, check string, state kueue.CheckState) { + var updatedWorkload kueue.Workload + gomega.EventuallyWithOffset(1, func(g gomega.Gomega) { + g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(wl), &updatedWorkload)).To(gomega.Succeed()) + currentCheck := workload.FindAdmissionCheck(updatedWorkload.Status.AdmissionChecks, check) + g.Expect(currentCheck).NotTo(gomega.BeNil(), "the check %s was not found in %s", check, workload.Key(wl)) + currentCheck.State = state + g.Expect(k8sClient.Status().Update(ctx, &updatedWorkload)).To(gomega.Succeed()) + }, Timeout, Interval).Should(gomega.Succeed()) +}