diff --git a/apis/visibility/v1alpha1/openapi/zz_generated.openapi.go b/apis/visibility/v1alpha1/openapi/zz_generated.openapi.go index 70f655b536..6d04d889cd 100644 --- a/apis/visibility/v1alpha1/openapi/zz_generated.openapi.go +++ b/apis/visibility/v1alpha1/openapi/zz_generated.openapi.go @@ -84,6 +84,8 @@ func GetOpenAPIDefinitions(ref common.ReferenceCallback) map[string]common.OpenA "k8s.io/apimachinery/pkg/version.Info": schema_k8sio_apimachinery_pkg_version_Info(ref), "sigs.k8s.io/kueue/apis/visibility/v1alpha1.ClusterQueue": schema_kueue_apis_visibility_v1alpha1_ClusterQueue(ref), "sigs.k8s.io/kueue/apis/visibility/v1alpha1.ClusterQueueList": schema_kueue_apis_visibility_v1alpha1_ClusterQueueList(ref), + "sigs.k8s.io/kueue/apis/visibility/v1alpha1.LocalQueue": schema_kueue_apis_visibility_v1alpha1_LocalQueue(ref), + "sigs.k8s.io/kueue/apis/visibility/v1alpha1.LocalQueueList": schema_kueue_apis_visibility_v1alpha1_LocalQueueList(ref), "sigs.k8s.io/kueue/apis/visibility/v1alpha1.PendingWorkload": schema_kueue_apis_visibility_v1alpha1_PendingWorkload(ref), "sigs.k8s.io/kueue/apis/visibility/v1alpha1.PendingWorkloadOptions": schema_kueue_apis_visibility_v1alpha1_PendingWorkloadOptions(ref), "sigs.k8s.io/kueue/apis/visibility/v1alpha1.PendingWorkloadsSummary": schema_kueue_apis_visibility_v1alpha1_PendingWorkloadsSummary(ref), @@ -2604,6 +2606,95 @@ func schema_kueue_apis_visibility_v1alpha1_ClusterQueueList(ref common.Reference } } +func schema_kueue_apis_visibility_v1alpha1_LocalQueue(ref common.ReferenceCallback) common.OpenAPIDefinition { + return common.OpenAPIDefinition{ + Schema: spec.Schema{ + SchemaProps: spec.SchemaProps{ + Type: []string{"object"}, + Properties: map[string]spec.Schema{ + "kind": { + SchemaProps: spec.SchemaProps{ + Description: "Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds", + Type: []string{"string"}, + Format: "", + }, + }, + "apiVersion": { + SchemaProps: spec.SchemaProps{ + Description: "APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources", + Type: []string{"string"}, + Format: "", + }, + }, + "metadata": { + SchemaProps: spec.SchemaProps{ + Default: map[string]interface{}{}, + Ref: ref("k8s.io/apimachinery/pkg/apis/meta/v1.ObjectMeta"), + }, + }, + "pendingWorkloadsSummary": { + SchemaProps: spec.SchemaProps{ + Default: map[string]interface{}{}, + Ref: ref("sigs.k8s.io/kueue/apis/visibility/v1alpha1.PendingWorkloadsSummary"), + }, + }, + }, + Required: []string{"pendingWorkloadsSummary"}, + }, + }, + Dependencies: []string{ + "k8s.io/apimachinery/pkg/apis/meta/v1.ObjectMeta", "sigs.k8s.io/kueue/apis/visibility/v1alpha1.PendingWorkloadsSummary"}, + } +} + +func schema_kueue_apis_visibility_v1alpha1_LocalQueueList(ref common.ReferenceCallback) common.OpenAPIDefinition { + return common.OpenAPIDefinition{ + Schema: spec.Schema{ + SchemaProps: spec.SchemaProps{ + Type: []string{"object"}, + Properties: map[string]spec.Schema{ + "kind": { + SchemaProps: spec.SchemaProps{ + Description: "Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds", + Type: []string{"string"}, + Format: "", + }, + }, + "apiVersion": { + SchemaProps: spec.SchemaProps{ + Description: "APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources", + Type: []string{"string"}, + Format: "", + }, + }, + "metadata": { + SchemaProps: spec.SchemaProps{ + Default: map[string]interface{}{}, + Ref: ref("k8s.io/apimachinery/pkg/apis/meta/v1.ListMeta"), + }, + }, + "items": { + SchemaProps: spec.SchemaProps{ + Type: []string{"array"}, + Items: &spec.SchemaOrArray{ + Schema: &spec.Schema{ + SchemaProps: spec.SchemaProps{ + Default: map[string]interface{}{}, + Ref: ref("sigs.k8s.io/kueue/apis/visibility/v1alpha1.LocalQueue"), + }, + }, + }, + }, + }, + }, + Required: []string{"items"}, + }, + }, + Dependencies: []string{ + "k8s.io/apimachinery/pkg/apis/meta/v1.ListMeta", "sigs.k8s.io/kueue/apis/visibility/v1alpha1.LocalQueue"}, + } +} + func schema_kueue_apis_visibility_v1alpha1_PendingWorkload(ref common.ReferenceCallback) common.OpenAPIDefinition { return common.OpenAPIDefinition{ Schema: spec.Schema{ diff --git a/apis/visibility/v1alpha1/types.go b/apis/visibility/v1alpha1/types.go index 9aac5b0a2a..c0a1dd8467 100644 --- a/apis/visibility/v1alpha1/types.go +++ b/apis/visibility/v1alpha1/types.go @@ -61,6 +61,25 @@ type ClusterQueueList struct { Items []ClusterQueue `json:"items"` } +// +genclient +// +kubebuilder:object:root=true +// +k8s:openapi-gen=true +// +genclient:method=GetPendingWorkloadsSummary,verb=get,subresource=pendingworkloads,result=sigs.k8s.io/kueue/apis/visibility/v1alpha1.PendingWorkloadsSummary +type LocalQueue struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty"` + + Summary PendingWorkloadsSummary `json:"pendingWorkloadsSummary"` +} + +// +kubebuilder:object:root=true +type LocalQueueList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata,omitempty"` + + Items []LocalQueue `json:"items"` +} + // +k8s:openapi-gen=true // +kubebuilder:object:root=true diff --git a/apis/visibility/v1alpha1/zz_generated.deepcopy.go b/apis/visibility/v1alpha1/zz_generated.deepcopy.go index 7b17764e46..14382e4424 100644 --- a/apis/visibility/v1alpha1/zz_generated.deepcopy.go +++ b/apis/visibility/v1alpha1/zz_generated.deepcopy.go @@ -83,6 +83,64 @@ func (in *ClusterQueueList) DeepCopyObject() runtime.Object { return nil } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *LocalQueue) DeepCopyInto(out *LocalQueue) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + in.Summary.DeepCopyInto(&out.Summary) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new LocalQueue. +func (in *LocalQueue) DeepCopy() *LocalQueue { + if in == nil { + return nil + } + out := new(LocalQueue) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *LocalQueue) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *LocalQueueList) DeepCopyInto(out *LocalQueueList) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ListMeta.DeepCopyInto(&out.ListMeta) + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]LocalQueue, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new LocalQueueList. +func (in *LocalQueueList) DeepCopy() *LocalQueueList { + if in == nil { + return nil + } + out := new(LocalQueueList) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *LocalQueueList) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *PendingWorkload) DeepCopyInto(out *PendingWorkload) { *out = *in diff --git a/charts/kueue/templates/rbac/pending_workloads_viewer_role.yaml b/charts/kueue/templates/rbac/pending_workloads_cq_viewer_role.yaml similarity index 81% rename from charts/kueue/templates/rbac/pending_workloads_viewer_role.yaml rename to charts/kueue/templates/rbac/pending_workloads_cq_viewer_role.yaml index 41ef2e393e..147af59fd7 100644 --- a/charts/kueue/templates/rbac/pending_workloads_viewer_role.yaml +++ b/charts/kueue/templates/rbac/pending_workloads_cq_viewer_role.yaml @@ -2,7 +2,7 @@ apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRole metadata: - name: '{{ include "kueue.fullname" . }}-pending-workloads-viewer-role' + name: '{{ include "kueue.fullname" . }}-pending-workloads-cq-viewer-role' labels: rbac.kueue.x-k8s.io/batch-admin: "true" rules: diff --git a/charts/kueue/templates/rbac/pending_workloads_lq_viewer_role.yaml b/charts/kueue/templates/rbac/pending_workloads_lq_viewer_role.yaml new file mode 100644 index 0000000000..c6ff389dd6 --- /dev/null +++ b/charts/kueue/templates/rbac/pending_workloads_lq_viewer_role.yaml @@ -0,0 +1,18 @@ +# permissions for end users to view pending workloads. +apiVersion: rbac.authorization.k8s.io/v1 +kind: Role +metadata: + name: '{{ include "kueue.fullname" . }}-pending-workloads-lq-viewer-role' + namespace: '{{ .Release.Namespace }}' + labels: + rbac.kueue.x-k8s.io/batch-admin: "true" + rbac.kueue.x-k8s.io/batch-user: "true" +rules: + - apiGroups: + - visibility.kueue.x-k8s.io + resources: + - localqueues/pendingworkloads + verbs: + - get + - list + - watch diff --git a/client-go/applyconfiguration/utils.go b/client-go/applyconfiguration/utils.go index 61aa945085..d160d50a1e 100644 --- a/client-go/applyconfiguration/utils.go +++ b/client-go/applyconfiguration/utils.go @@ -104,6 +104,8 @@ func ForKind(kind schema.GroupVersionKind) interface{} { // Group=visibility.kueue.x-k8s.io, Version=v1alpha1 case v1alpha1.SchemeGroupVersion.WithKind("ClusterQueue"): return &visibilityv1alpha1.ClusterQueueApplyConfiguration{} + case v1alpha1.SchemeGroupVersion.WithKind("LocalQueue"): + return &visibilityv1alpha1.LocalQueueApplyConfiguration{} case v1alpha1.SchemeGroupVersion.WithKind("PendingWorkload"): return &visibilityv1alpha1.PendingWorkloadApplyConfiguration{} case v1alpha1.SchemeGroupVersion.WithKind("PendingWorkloadsSummary"): diff --git a/client-go/applyconfiguration/visibility/v1alpha1/localqueue.go b/client-go/applyconfiguration/visibility/v1alpha1/localqueue.go new file mode 100644 index 0000000000..1b57d71a65 --- /dev/null +++ b/client-go/applyconfiguration/visibility/v1alpha1/localqueue.go @@ -0,0 +1,209 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +// Code generated by applyconfiguration-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + types "k8s.io/apimachinery/pkg/types" + v1 "k8s.io/client-go/applyconfigurations/meta/v1" +) + +// LocalQueueApplyConfiguration represents an declarative configuration of the LocalQueue type for use +// with apply. +type LocalQueueApplyConfiguration struct { + v1.TypeMetaApplyConfiguration `json:",inline"` + *v1.ObjectMetaApplyConfiguration `json:"metadata,omitempty"` + Summary *PendingWorkloadsSummaryApplyConfiguration `json:"pendingWorkloadsSummary,omitempty"` +} + +// LocalQueue constructs an declarative configuration of the LocalQueue type for use with +// apply. +func LocalQueue(name, namespace string) *LocalQueueApplyConfiguration { + b := &LocalQueueApplyConfiguration{} + b.WithName(name) + b.WithNamespace(namespace) + b.WithKind("LocalQueue") + b.WithAPIVersion("visibility.kueue.x-k8s.io/v1alpha1") + return b +} + +// WithKind sets the Kind field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the Kind field is set to the value of the last call. +func (b *LocalQueueApplyConfiguration) WithKind(value string) *LocalQueueApplyConfiguration { + b.Kind = &value + return b +} + +// WithAPIVersion sets the APIVersion field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the APIVersion field is set to the value of the last call. +func (b *LocalQueueApplyConfiguration) WithAPIVersion(value string) *LocalQueueApplyConfiguration { + b.APIVersion = &value + return b +} + +// WithName sets the Name field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the Name field is set to the value of the last call. +func (b *LocalQueueApplyConfiguration) WithName(value string) *LocalQueueApplyConfiguration { + b.ensureObjectMetaApplyConfigurationExists() + b.Name = &value + return b +} + +// WithGenerateName sets the GenerateName field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the GenerateName field is set to the value of the last call. +func (b *LocalQueueApplyConfiguration) WithGenerateName(value string) *LocalQueueApplyConfiguration { + b.ensureObjectMetaApplyConfigurationExists() + b.GenerateName = &value + return b +} + +// WithNamespace sets the Namespace field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the Namespace field is set to the value of the last call. +func (b *LocalQueueApplyConfiguration) WithNamespace(value string) *LocalQueueApplyConfiguration { + b.ensureObjectMetaApplyConfigurationExists() + b.Namespace = &value + return b +} + +// WithUID sets the UID field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the UID field is set to the value of the last call. +func (b *LocalQueueApplyConfiguration) WithUID(value types.UID) *LocalQueueApplyConfiguration { + b.ensureObjectMetaApplyConfigurationExists() + b.UID = &value + return b +} + +// WithResourceVersion sets the ResourceVersion field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the ResourceVersion field is set to the value of the last call. +func (b *LocalQueueApplyConfiguration) WithResourceVersion(value string) *LocalQueueApplyConfiguration { + b.ensureObjectMetaApplyConfigurationExists() + b.ResourceVersion = &value + return b +} + +// WithGeneration sets the Generation field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the Generation field is set to the value of the last call. +func (b *LocalQueueApplyConfiguration) WithGeneration(value int64) *LocalQueueApplyConfiguration { + b.ensureObjectMetaApplyConfigurationExists() + b.Generation = &value + return b +} + +// WithCreationTimestamp sets the CreationTimestamp field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the CreationTimestamp field is set to the value of the last call. +func (b *LocalQueueApplyConfiguration) WithCreationTimestamp(value metav1.Time) *LocalQueueApplyConfiguration { + b.ensureObjectMetaApplyConfigurationExists() + b.CreationTimestamp = &value + return b +} + +// WithDeletionTimestamp sets the DeletionTimestamp field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the DeletionTimestamp field is set to the value of the last call. +func (b *LocalQueueApplyConfiguration) WithDeletionTimestamp(value metav1.Time) *LocalQueueApplyConfiguration { + b.ensureObjectMetaApplyConfigurationExists() + b.DeletionTimestamp = &value + return b +} + +// WithDeletionGracePeriodSeconds sets the DeletionGracePeriodSeconds field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the DeletionGracePeriodSeconds field is set to the value of the last call. +func (b *LocalQueueApplyConfiguration) WithDeletionGracePeriodSeconds(value int64) *LocalQueueApplyConfiguration { + b.ensureObjectMetaApplyConfigurationExists() + b.DeletionGracePeriodSeconds = &value + return b +} + +// WithLabels puts the entries into the Labels field in the declarative configuration +// and returns the receiver, so that objects can be build by chaining "With" function invocations. +// If called multiple times, the entries provided by each call will be put on the Labels field, +// overwriting an existing map entries in Labels field with the same key. +func (b *LocalQueueApplyConfiguration) WithLabels(entries map[string]string) *LocalQueueApplyConfiguration { + b.ensureObjectMetaApplyConfigurationExists() + if b.Labels == nil && len(entries) > 0 { + b.Labels = make(map[string]string, len(entries)) + } + for k, v := range entries { + b.Labels[k] = v + } + return b +} + +// WithAnnotations puts the entries into the Annotations field in the declarative configuration +// and returns the receiver, so that objects can be build by chaining "With" function invocations. +// If called multiple times, the entries provided by each call will be put on the Annotations field, +// overwriting an existing map entries in Annotations field with the same key. +func (b *LocalQueueApplyConfiguration) WithAnnotations(entries map[string]string) *LocalQueueApplyConfiguration { + b.ensureObjectMetaApplyConfigurationExists() + if b.Annotations == nil && len(entries) > 0 { + b.Annotations = make(map[string]string, len(entries)) + } + for k, v := range entries { + b.Annotations[k] = v + } + return b +} + +// WithOwnerReferences adds the given value to the OwnerReferences field in the declarative configuration +// and returns the receiver, so that objects can be build by chaining "With" function invocations. +// If called multiple times, values provided by each call will be appended to the OwnerReferences field. +func (b *LocalQueueApplyConfiguration) WithOwnerReferences(values ...*v1.OwnerReferenceApplyConfiguration) *LocalQueueApplyConfiguration { + b.ensureObjectMetaApplyConfigurationExists() + for i := range values { + if values[i] == nil { + panic("nil value passed to WithOwnerReferences") + } + b.OwnerReferences = append(b.OwnerReferences, *values[i]) + } + return b +} + +// WithFinalizers adds the given value to the Finalizers field in the declarative configuration +// and returns the receiver, so that objects can be build by chaining "With" function invocations. +// If called multiple times, values provided by each call will be appended to the Finalizers field. +func (b *LocalQueueApplyConfiguration) WithFinalizers(values ...string) *LocalQueueApplyConfiguration { + b.ensureObjectMetaApplyConfigurationExists() + for i := range values { + b.Finalizers = append(b.Finalizers, values[i]) + } + return b +} + +func (b *LocalQueueApplyConfiguration) ensureObjectMetaApplyConfigurationExists() { + if b.ObjectMetaApplyConfiguration == nil { + b.ObjectMetaApplyConfiguration = &v1.ObjectMetaApplyConfiguration{} + } +} + +// WithSummary sets the Summary field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the Summary field is set to the value of the last call. +func (b *LocalQueueApplyConfiguration) WithSummary(value *PendingWorkloadsSummaryApplyConfiguration) *LocalQueueApplyConfiguration { + b.Summary = value + return b +} diff --git a/client-go/clientset/versioned/typed/visibility/v1alpha1/fake/fake_localqueue.go b/client-go/clientset/versioned/typed/visibility/v1alpha1/fake/fake_localqueue.go new file mode 100644 index 0000000000..849a90c323 --- /dev/null +++ b/client-go/clientset/versioned/typed/visibility/v1alpha1/fake/fake_localqueue.go @@ -0,0 +1,164 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +// Code generated by client-gen. DO NOT EDIT. + +package fake + +import ( + "context" + json "encoding/json" + "fmt" + + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + labels "k8s.io/apimachinery/pkg/labels" + types "k8s.io/apimachinery/pkg/types" + watch "k8s.io/apimachinery/pkg/watch" + testing "k8s.io/client-go/testing" + v1alpha1 "sigs.k8s.io/kueue/apis/visibility/v1alpha1" + visibilityv1alpha1 "sigs.k8s.io/kueue/client-go/applyconfiguration/visibility/v1alpha1" +) + +// FakeLocalQueues implements LocalQueueInterface +type FakeLocalQueues struct { + Fake *FakeVisibilityV1alpha1 + ns string +} + +var localqueuesResource = v1alpha1.SchemeGroupVersion.WithResource("localqueues") + +var localqueuesKind = v1alpha1.SchemeGroupVersion.WithKind("LocalQueue") + +// Get takes name of the localQueue, and returns the corresponding localQueue object, and an error if there is any. +func (c *FakeLocalQueues) Get(ctx context.Context, name string, options v1.GetOptions) (result *v1alpha1.LocalQueue, err error) { + obj, err := c.Fake. + Invokes(testing.NewGetAction(localqueuesResource, c.ns, name), &v1alpha1.LocalQueue{}) + + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.LocalQueue), err +} + +// List takes label and field selectors, and returns the list of LocalQueues that match those selectors. +func (c *FakeLocalQueues) List(ctx context.Context, opts v1.ListOptions) (result *v1alpha1.LocalQueueList, err error) { + obj, err := c.Fake. + Invokes(testing.NewListAction(localqueuesResource, localqueuesKind, c.ns, opts), &v1alpha1.LocalQueueList{}) + + if obj == nil { + return nil, err + } + + label, _, _ := testing.ExtractFromListOptions(opts) + if label == nil { + label = labels.Everything() + } + list := &v1alpha1.LocalQueueList{ListMeta: obj.(*v1alpha1.LocalQueueList).ListMeta} + for _, item := range obj.(*v1alpha1.LocalQueueList).Items { + if label.Matches(labels.Set(item.Labels)) { + list.Items = append(list.Items, item) + } + } + return list, err +} + +// Watch returns a watch.Interface that watches the requested localQueues. +func (c *FakeLocalQueues) Watch(ctx context.Context, opts v1.ListOptions) (watch.Interface, error) { + return c.Fake. + InvokesWatch(testing.NewWatchAction(localqueuesResource, c.ns, opts)) + +} + +// Create takes the representation of a localQueue and creates it. Returns the server's representation of the localQueue, and an error, if there is any. +func (c *FakeLocalQueues) Create(ctx context.Context, localQueue *v1alpha1.LocalQueue, opts v1.CreateOptions) (result *v1alpha1.LocalQueue, err error) { + obj, err := c.Fake. + Invokes(testing.NewCreateAction(localqueuesResource, c.ns, localQueue), &v1alpha1.LocalQueue{}) + + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.LocalQueue), err +} + +// Update takes the representation of a localQueue and updates it. Returns the server's representation of the localQueue, and an error, if there is any. +func (c *FakeLocalQueues) Update(ctx context.Context, localQueue *v1alpha1.LocalQueue, opts v1.UpdateOptions) (result *v1alpha1.LocalQueue, err error) { + obj, err := c.Fake. + Invokes(testing.NewUpdateAction(localqueuesResource, c.ns, localQueue), &v1alpha1.LocalQueue{}) + + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.LocalQueue), err +} + +// Delete takes name of the localQueue and deletes it. Returns an error if one occurs. +func (c *FakeLocalQueues) Delete(ctx context.Context, name string, opts v1.DeleteOptions) error { + _, err := c.Fake. + Invokes(testing.NewDeleteActionWithOptions(localqueuesResource, c.ns, name, opts), &v1alpha1.LocalQueue{}) + + return err +} + +// DeleteCollection deletes a collection of objects. +func (c *FakeLocalQueues) DeleteCollection(ctx context.Context, opts v1.DeleteOptions, listOpts v1.ListOptions) error { + action := testing.NewDeleteCollectionAction(localqueuesResource, c.ns, listOpts) + + _, err := c.Fake.Invokes(action, &v1alpha1.LocalQueueList{}) + return err +} + +// Patch applies the patch and returns the patched localQueue. +func (c *FakeLocalQueues) Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts v1.PatchOptions, subresources ...string) (result *v1alpha1.LocalQueue, err error) { + obj, err := c.Fake. + Invokes(testing.NewPatchSubresourceAction(localqueuesResource, c.ns, name, pt, data, subresources...), &v1alpha1.LocalQueue{}) + + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.LocalQueue), err +} + +// Apply takes the given apply declarative configuration, applies it and returns the applied localQueue. +func (c *FakeLocalQueues) Apply(ctx context.Context, localQueue *visibilityv1alpha1.LocalQueueApplyConfiguration, opts v1.ApplyOptions) (result *v1alpha1.LocalQueue, err error) { + if localQueue == nil { + return nil, fmt.Errorf("localQueue provided to Apply must not be nil") + } + data, err := json.Marshal(localQueue) + if err != nil { + return nil, err + } + name := localQueue.Name + if name == nil { + return nil, fmt.Errorf("localQueue.Name must be provided to Apply") + } + obj, err := c.Fake. + Invokes(testing.NewPatchSubresourceAction(localqueuesResource, c.ns, *name, types.ApplyPatchType, data), &v1alpha1.LocalQueue{}) + + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.LocalQueue), err +} + +// GetPendingWorkloadsSummary takes name of the localQueue, and returns the corresponding pendingWorkloadsSummary object, and an error if there is any. +func (c *FakeLocalQueues) GetPendingWorkloadsSummary(ctx context.Context, localQueueName string, options v1.GetOptions) (result *v1alpha1.PendingWorkloadsSummary, err error) { + obj, err := c.Fake. + Invokes(testing.NewGetSubresourceAction(localqueuesResource, c.ns, "pendingworkloads", localQueueName), &v1alpha1.PendingWorkloadsSummary{}) + + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.PendingWorkloadsSummary), err +} diff --git a/client-go/clientset/versioned/typed/visibility/v1alpha1/fake/fake_visibility_client.go b/client-go/clientset/versioned/typed/visibility/v1alpha1/fake/fake_visibility_client.go index 920a55c223..0ec79caea2 100644 --- a/client-go/clientset/versioned/typed/visibility/v1alpha1/fake/fake_visibility_client.go +++ b/client-go/clientset/versioned/typed/visibility/v1alpha1/fake/fake_visibility_client.go @@ -31,6 +31,10 @@ func (c *FakeVisibilityV1alpha1) ClusterQueues() v1alpha1.ClusterQueueInterface return &FakeClusterQueues{c} } +func (c *FakeVisibilityV1alpha1) LocalQueues(namespace string) v1alpha1.LocalQueueInterface { + return &FakeLocalQueues{c, namespace} +} + // RESTClient returns a RESTClient that is used to communicate // with API server by this client implementation. func (c *FakeVisibilityV1alpha1) RESTClient() rest.Interface { diff --git a/client-go/clientset/versioned/typed/visibility/v1alpha1/generated_expansion.go b/client-go/clientset/versioned/typed/visibility/v1alpha1/generated_expansion.go index 49426d5faf..0fab901ec9 100644 --- a/client-go/clientset/versioned/typed/visibility/v1alpha1/generated_expansion.go +++ b/client-go/clientset/versioned/typed/visibility/v1alpha1/generated_expansion.go @@ -18,3 +18,5 @@ limitations under the License. package v1alpha1 type ClusterQueueExpansion interface{} + +type LocalQueueExpansion interface{} diff --git a/client-go/clientset/versioned/typed/visibility/v1alpha1/localqueue.go b/client-go/clientset/versioned/typed/visibility/v1alpha1/localqueue.go new file mode 100644 index 0000000000..f7b73d1a0c --- /dev/null +++ b/client-go/clientset/versioned/typed/visibility/v1alpha1/localqueue.go @@ -0,0 +1,223 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +// Code generated by client-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + "context" + json "encoding/json" + "fmt" + "time" + + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + types "k8s.io/apimachinery/pkg/types" + watch "k8s.io/apimachinery/pkg/watch" + rest "k8s.io/client-go/rest" + v1alpha1 "sigs.k8s.io/kueue/apis/visibility/v1alpha1" + visibilityv1alpha1 "sigs.k8s.io/kueue/client-go/applyconfiguration/visibility/v1alpha1" + scheme "sigs.k8s.io/kueue/client-go/clientset/versioned/scheme" +) + +// LocalQueuesGetter has a method to return a LocalQueueInterface. +// A group's client should implement this interface. +type LocalQueuesGetter interface { + LocalQueues(namespace string) LocalQueueInterface +} + +// LocalQueueInterface has methods to work with LocalQueue resources. +type LocalQueueInterface interface { + Create(ctx context.Context, localQueue *v1alpha1.LocalQueue, opts v1.CreateOptions) (*v1alpha1.LocalQueue, error) + Update(ctx context.Context, localQueue *v1alpha1.LocalQueue, opts v1.UpdateOptions) (*v1alpha1.LocalQueue, error) + Delete(ctx context.Context, name string, opts v1.DeleteOptions) error + DeleteCollection(ctx context.Context, opts v1.DeleteOptions, listOpts v1.ListOptions) error + Get(ctx context.Context, name string, opts v1.GetOptions) (*v1alpha1.LocalQueue, error) + List(ctx context.Context, opts v1.ListOptions) (*v1alpha1.LocalQueueList, error) + Watch(ctx context.Context, opts v1.ListOptions) (watch.Interface, error) + Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts v1.PatchOptions, subresources ...string) (result *v1alpha1.LocalQueue, err error) + Apply(ctx context.Context, localQueue *visibilityv1alpha1.LocalQueueApplyConfiguration, opts v1.ApplyOptions) (result *v1alpha1.LocalQueue, err error) + GetPendingWorkloadsSummary(ctx context.Context, localQueueName string, options v1.GetOptions) (*v1alpha1.PendingWorkloadsSummary, error) + + LocalQueueExpansion +} + +// localQueues implements LocalQueueInterface +type localQueues struct { + client rest.Interface + ns string +} + +// newLocalQueues returns a LocalQueues +func newLocalQueues(c *VisibilityV1alpha1Client, namespace string) *localQueues { + return &localQueues{ + client: c.RESTClient(), + ns: namespace, + } +} + +// Get takes name of the localQueue, and returns the corresponding localQueue object, and an error if there is any. +func (c *localQueues) Get(ctx context.Context, name string, options v1.GetOptions) (result *v1alpha1.LocalQueue, err error) { + result = &v1alpha1.LocalQueue{} + err = c.client.Get(). + Namespace(c.ns). + Resource("localqueues"). + Name(name). + VersionedParams(&options, scheme.ParameterCodec). + Do(ctx). + Into(result) + return +} + +// List takes label and field selectors, and returns the list of LocalQueues that match those selectors. +func (c *localQueues) List(ctx context.Context, opts v1.ListOptions) (result *v1alpha1.LocalQueueList, err error) { + var timeout time.Duration + if opts.TimeoutSeconds != nil { + timeout = time.Duration(*opts.TimeoutSeconds) * time.Second + } + result = &v1alpha1.LocalQueueList{} + err = c.client.Get(). + Namespace(c.ns). + Resource("localqueues"). + VersionedParams(&opts, scheme.ParameterCodec). + Timeout(timeout). + Do(ctx). + Into(result) + return +} + +// Watch returns a watch.Interface that watches the requested localQueues. +func (c *localQueues) Watch(ctx context.Context, opts v1.ListOptions) (watch.Interface, error) { + var timeout time.Duration + if opts.TimeoutSeconds != nil { + timeout = time.Duration(*opts.TimeoutSeconds) * time.Second + } + opts.Watch = true + return c.client.Get(). + Namespace(c.ns). + Resource("localqueues"). + VersionedParams(&opts, scheme.ParameterCodec). + Timeout(timeout). + Watch(ctx) +} + +// Create takes the representation of a localQueue and creates it. Returns the server's representation of the localQueue, and an error, if there is any. +func (c *localQueues) Create(ctx context.Context, localQueue *v1alpha1.LocalQueue, opts v1.CreateOptions) (result *v1alpha1.LocalQueue, err error) { + result = &v1alpha1.LocalQueue{} + err = c.client.Post(). + Namespace(c.ns). + Resource("localqueues"). + VersionedParams(&opts, scheme.ParameterCodec). + Body(localQueue). + Do(ctx). + Into(result) + return +} + +// Update takes the representation of a localQueue and updates it. Returns the server's representation of the localQueue, and an error, if there is any. +func (c *localQueues) Update(ctx context.Context, localQueue *v1alpha1.LocalQueue, opts v1.UpdateOptions) (result *v1alpha1.LocalQueue, err error) { + result = &v1alpha1.LocalQueue{} + err = c.client.Put(). + Namespace(c.ns). + Resource("localqueues"). + Name(localQueue.Name). + VersionedParams(&opts, scheme.ParameterCodec). + Body(localQueue). + Do(ctx). + Into(result) + return +} + +// Delete takes name of the localQueue and deletes it. Returns an error if one occurs. +func (c *localQueues) Delete(ctx context.Context, name string, opts v1.DeleteOptions) error { + return c.client.Delete(). + Namespace(c.ns). + Resource("localqueues"). + Name(name). + Body(&opts). + Do(ctx). + Error() +} + +// DeleteCollection deletes a collection of objects. +func (c *localQueues) DeleteCollection(ctx context.Context, opts v1.DeleteOptions, listOpts v1.ListOptions) error { + var timeout time.Duration + if listOpts.TimeoutSeconds != nil { + timeout = time.Duration(*listOpts.TimeoutSeconds) * time.Second + } + return c.client.Delete(). + Namespace(c.ns). + Resource("localqueues"). + VersionedParams(&listOpts, scheme.ParameterCodec). + Timeout(timeout). + Body(&opts). + Do(ctx). + Error() +} + +// Patch applies the patch and returns the patched localQueue. +func (c *localQueues) Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts v1.PatchOptions, subresources ...string) (result *v1alpha1.LocalQueue, err error) { + result = &v1alpha1.LocalQueue{} + err = c.client.Patch(pt). + Namespace(c.ns). + Resource("localqueues"). + Name(name). + SubResource(subresources...). + VersionedParams(&opts, scheme.ParameterCodec). + Body(data). + Do(ctx). + Into(result) + return +} + +// Apply takes the given apply declarative configuration, applies it and returns the applied localQueue. +func (c *localQueues) Apply(ctx context.Context, localQueue *visibilityv1alpha1.LocalQueueApplyConfiguration, opts v1.ApplyOptions) (result *v1alpha1.LocalQueue, err error) { + if localQueue == nil { + return nil, fmt.Errorf("localQueue provided to Apply must not be nil") + } + patchOpts := opts.ToPatchOptions() + data, err := json.Marshal(localQueue) + if err != nil { + return nil, err + } + name := localQueue.Name + if name == nil { + return nil, fmt.Errorf("localQueue.Name must be provided to Apply") + } + result = &v1alpha1.LocalQueue{} + err = c.client.Patch(types.ApplyPatchType). + Namespace(c.ns). + Resource("localqueues"). + Name(*name). + VersionedParams(&patchOpts, scheme.ParameterCodec). + Body(data). + Do(ctx). + Into(result) + return +} + +// GetPendingWorkloadsSummary takes name of the localQueue, and returns the corresponding v1alpha1.PendingWorkloadsSummary object, and an error if there is any. +func (c *localQueues) GetPendingWorkloadsSummary(ctx context.Context, localQueueName string, options v1.GetOptions) (result *v1alpha1.PendingWorkloadsSummary, err error) { + result = &v1alpha1.PendingWorkloadsSummary{} + err = c.client.Get(). + Namespace(c.ns). + Resource("localqueues"). + Name(localQueueName). + SubResource("pendingworkloads"). + VersionedParams(&options, scheme.ParameterCodec). + Do(ctx). + Into(result) + return +} diff --git a/client-go/clientset/versioned/typed/visibility/v1alpha1/visibility_client.go b/client-go/clientset/versioned/typed/visibility/v1alpha1/visibility_client.go index 49bb94ab6d..665097ea01 100644 --- a/client-go/clientset/versioned/typed/visibility/v1alpha1/visibility_client.go +++ b/client-go/clientset/versioned/typed/visibility/v1alpha1/visibility_client.go @@ -28,6 +28,7 @@ import ( type VisibilityV1alpha1Interface interface { RESTClient() rest.Interface ClusterQueuesGetter + LocalQueuesGetter } // VisibilityV1alpha1Client is used to interact with features provided by the visibility.kueue.x-k8s.io group. @@ -39,6 +40,10 @@ func (c *VisibilityV1alpha1Client) ClusterQueues() ClusterQueueInterface { return newClusterQueues(c) } +func (c *VisibilityV1alpha1Client) LocalQueues(namespace string) LocalQueueInterface { + return newLocalQueues(c, namespace) +} + // NewForConfig creates a new VisibilityV1alpha1Client for the given config. // NewForConfig is equivalent to NewForConfigAndClient(c, httpClient), // where httpClient was generated with rest.HTTPClientFor(c). diff --git a/client-go/informers/externalversions/generic.go b/client-go/informers/externalversions/generic.go index 7018f4a17f..713291d280 100644 --- a/client-go/informers/externalversions/generic.go +++ b/client-go/informers/externalversions/generic.go @@ -71,6 +71,8 @@ func (f *sharedInformerFactory) ForResource(resource schema.GroupVersionResource // Group=visibility.kueue.x-k8s.io, Version=v1alpha1 case v1alpha1.SchemeGroupVersion.WithResource("clusterqueues"): return &genericInformer{resource: resource.GroupResource(), informer: f.Visibility().V1alpha1().ClusterQueues().Informer()}, nil + case v1alpha1.SchemeGroupVersion.WithResource("localqueues"): + return &genericInformer{resource: resource.GroupResource(), informer: f.Visibility().V1alpha1().LocalQueues().Informer()}, nil } diff --git a/client-go/informers/externalversions/visibility/v1alpha1/interface.go b/client-go/informers/externalversions/visibility/v1alpha1/interface.go index e5f344dd27..b924ab73de 100644 --- a/client-go/informers/externalversions/visibility/v1alpha1/interface.go +++ b/client-go/informers/externalversions/visibility/v1alpha1/interface.go @@ -25,6 +25,8 @@ import ( type Interface interface { // ClusterQueues returns a ClusterQueueInformer. ClusterQueues() ClusterQueueInformer + // LocalQueues returns a LocalQueueInformer. + LocalQueues() LocalQueueInformer } type version struct { @@ -42,3 +44,8 @@ func New(f internalinterfaces.SharedInformerFactory, namespace string, tweakList func (v *version) ClusterQueues() ClusterQueueInformer { return &clusterQueueInformer{factory: v.factory, tweakListOptions: v.tweakListOptions} } + +// LocalQueues returns a LocalQueueInformer. +func (v *version) LocalQueues() LocalQueueInformer { + return &localQueueInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions} +} diff --git a/client-go/informers/externalversions/visibility/v1alpha1/localqueue.go b/client-go/informers/externalversions/visibility/v1alpha1/localqueue.go new file mode 100644 index 0000000000..538ec47f71 --- /dev/null +++ b/client-go/informers/externalversions/visibility/v1alpha1/localqueue.go @@ -0,0 +1,89 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +// Code generated by informer-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + "context" + time "time" + + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + runtime "k8s.io/apimachinery/pkg/runtime" + watch "k8s.io/apimachinery/pkg/watch" + cache "k8s.io/client-go/tools/cache" + visibilityv1alpha1 "sigs.k8s.io/kueue/apis/visibility/v1alpha1" + versioned "sigs.k8s.io/kueue/client-go/clientset/versioned" + internalinterfaces "sigs.k8s.io/kueue/client-go/informers/externalversions/internalinterfaces" + v1alpha1 "sigs.k8s.io/kueue/client-go/listers/visibility/v1alpha1" +) + +// LocalQueueInformer provides access to a shared informer and lister for +// LocalQueues. +type LocalQueueInformer interface { + Informer() cache.SharedIndexInformer + Lister() v1alpha1.LocalQueueLister +} + +type localQueueInformer struct { + factory internalinterfaces.SharedInformerFactory + tweakListOptions internalinterfaces.TweakListOptionsFunc + namespace string +} + +// NewLocalQueueInformer constructs a new informer for LocalQueue type. +// Always prefer using an informer factory to get a shared informer instead of getting an independent +// one. This reduces memory footprint and number of connections to the server. +func NewLocalQueueInformer(client versioned.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers) cache.SharedIndexInformer { + return NewFilteredLocalQueueInformer(client, namespace, resyncPeriod, indexers, nil) +} + +// NewFilteredLocalQueueInformer constructs a new informer for LocalQueue type. +// Always prefer using an informer factory to get a shared informer instead of getting an independent +// one. This reduces memory footprint and number of connections to the server. +func NewFilteredLocalQueueInformer(client versioned.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer { + return cache.NewSharedIndexInformer( + &cache.ListWatch{ + ListFunc: func(options v1.ListOptions) (runtime.Object, error) { + if tweakListOptions != nil { + tweakListOptions(&options) + } + return client.VisibilityV1alpha1().LocalQueues(namespace).List(context.TODO(), options) + }, + WatchFunc: func(options v1.ListOptions) (watch.Interface, error) { + if tweakListOptions != nil { + tweakListOptions(&options) + } + return client.VisibilityV1alpha1().LocalQueues(namespace).Watch(context.TODO(), options) + }, + }, + &visibilityv1alpha1.LocalQueue{}, + resyncPeriod, + indexers, + ) +} + +func (f *localQueueInformer) defaultInformer(client versioned.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { + return NewFilteredLocalQueueInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions) +} + +func (f *localQueueInformer) Informer() cache.SharedIndexInformer { + return f.factory.InformerFor(&visibilityv1alpha1.LocalQueue{}, f.defaultInformer) +} + +func (f *localQueueInformer) Lister() v1alpha1.LocalQueueLister { + return v1alpha1.NewLocalQueueLister(f.Informer().GetIndexer()) +} diff --git a/client-go/listers/visibility/v1alpha1/expansion_generated.go b/client-go/listers/visibility/v1alpha1/expansion_generated.go index 25593a80c5..c5f90b4e7f 100644 --- a/client-go/listers/visibility/v1alpha1/expansion_generated.go +++ b/client-go/listers/visibility/v1alpha1/expansion_generated.go @@ -20,3 +20,11 @@ package v1alpha1 // ClusterQueueListerExpansion allows custom methods to be added to // ClusterQueueLister. type ClusterQueueListerExpansion interface{} + +// LocalQueueListerExpansion allows custom methods to be added to +// LocalQueueLister. +type LocalQueueListerExpansion interface{} + +// LocalQueueNamespaceListerExpansion allows custom methods to be added to +// LocalQueueNamespaceLister. +type LocalQueueNamespaceListerExpansion interface{} diff --git a/client-go/listers/visibility/v1alpha1/localqueue.go b/client-go/listers/visibility/v1alpha1/localqueue.go new file mode 100644 index 0000000000..66ca0e640c --- /dev/null +++ b/client-go/listers/visibility/v1alpha1/localqueue.go @@ -0,0 +1,98 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +// Code generated by lister-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/tools/cache" + v1alpha1 "sigs.k8s.io/kueue/apis/visibility/v1alpha1" +) + +// LocalQueueLister helps list LocalQueues. +// All objects returned here must be treated as read-only. +type LocalQueueLister interface { + // List lists all LocalQueues in the indexer. + // Objects returned here must be treated as read-only. + List(selector labels.Selector) (ret []*v1alpha1.LocalQueue, err error) + // LocalQueues returns an object that can list and get LocalQueues. + LocalQueues(namespace string) LocalQueueNamespaceLister + LocalQueueListerExpansion +} + +// localQueueLister implements the LocalQueueLister interface. +type localQueueLister struct { + indexer cache.Indexer +} + +// NewLocalQueueLister returns a new LocalQueueLister. +func NewLocalQueueLister(indexer cache.Indexer) LocalQueueLister { + return &localQueueLister{indexer: indexer} +} + +// List lists all LocalQueues in the indexer. +func (s *localQueueLister) List(selector labels.Selector) (ret []*v1alpha1.LocalQueue, err error) { + err = cache.ListAll(s.indexer, selector, func(m interface{}) { + ret = append(ret, m.(*v1alpha1.LocalQueue)) + }) + return ret, err +} + +// LocalQueues returns an object that can list and get LocalQueues. +func (s *localQueueLister) LocalQueues(namespace string) LocalQueueNamespaceLister { + return localQueueNamespaceLister{indexer: s.indexer, namespace: namespace} +} + +// LocalQueueNamespaceLister helps list and get LocalQueues. +// All objects returned here must be treated as read-only. +type LocalQueueNamespaceLister interface { + // List lists all LocalQueues in the indexer for a given namespace. + // Objects returned here must be treated as read-only. + List(selector labels.Selector) (ret []*v1alpha1.LocalQueue, err error) + // Get retrieves the LocalQueue from the indexer for a given namespace and name. + // Objects returned here must be treated as read-only. + Get(name string) (*v1alpha1.LocalQueue, error) + LocalQueueNamespaceListerExpansion +} + +// localQueueNamespaceLister implements the LocalQueueNamespaceLister +// interface. +type localQueueNamespaceLister struct { + indexer cache.Indexer + namespace string +} + +// List lists all LocalQueues in the indexer for a given namespace. +func (s localQueueNamespaceLister) List(selector labels.Selector) (ret []*v1alpha1.LocalQueue, err error) { + err = cache.ListAllByNamespace(s.indexer, s.namespace, selector, func(m interface{}) { + ret = append(ret, m.(*v1alpha1.LocalQueue)) + }) + return ret, err +} + +// Get retrieves the LocalQueue from the indexer for a given namespace and name. +func (s localQueueNamespaceLister) Get(name string) (*v1alpha1.LocalQueue, error) { + obj, exists, err := s.indexer.GetByKey(s.namespace + "/" + name) + if err != nil { + return nil, err + } + if !exists { + return nil, errors.NewNotFound(v1alpha1.Resource("localqueue"), name) + } + return obj.(*v1alpha1.LocalQueue), nil +} diff --git a/config/components/rbac/kustomization.yaml b/config/components/rbac/kustomization.yaml index b830d78b9f..8fbcc3facd 100644 --- a/config/components/rbac/kustomization.yaml +++ b/config/components/rbac/kustomization.yaml @@ -24,7 +24,8 @@ resources: - localqueue_viewer_role.yaml - resourceflavor_editor_role.yaml - resourceflavor_viewer_role.yaml -- pending_workloads_viewer_role.yaml +- pending_workloads_cq_viewer_role.yaml +- pending_workloads_lq_viewer_role.yaml - workload_editor_role.yaml - workload_viewer_role.yaml diff --git a/config/components/rbac/pending_workloads_viewer_role.yaml b/config/components/rbac/pending_workloads_cq_viewer_role.yaml similarity index 88% rename from config/components/rbac/pending_workloads_viewer_role.yaml rename to config/components/rbac/pending_workloads_cq_viewer_role.yaml index 63a4f4c36d..dbddbd3908 100644 --- a/config/components/rbac/pending_workloads_viewer_role.yaml +++ b/config/components/rbac/pending_workloads_cq_viewer_role.yaml @@ -2,7 +2,7 @@ apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRole metadata: - name: pending-workloads-viewer-role + name: pending-workloads-cq-viewer-role labels: rbac.kueue.x-k8s.io/batch-admin: "true" rules: diff --git a/config/components/rbac/pending_workloads_lq_viewer_role.yaml b/config/components/rbac/pending_workloads_lq_viewer_role.yaml new file mode 100644 index 0000000000..5ed1d1ee0c --- /dev/null +++ b/config/components/rbac/pending_workloads_lq_viewer_role.yaml @@ -0,0 +1,18 @@ +# permissions for end users to view pending workloads. +apiVersion: rbac.authorization.k8s.io/v1 +kind: Role +metadata: + name: pending-workloads-lq-viewer-role + namespace: default + labels: + rbac.kueue.x-k8s.io/batch-admin: "true" + rbac.kueue.x-k8s.io/batch-user: "true" +rules: +- apiGroups: + - visibility.kueue.x-k8s.io + resources: + - localqueues/pendingworkloads + verbs: + - get + - list + - watch diff --git a/pkg/queue/manager.go b/pkg/queue/manager.go index f663ceb11b..96a2b4141b 100644 --- a/pkg/queue/manager.go +++ b/pkg/queue/manager.go @@ -579,6 +579,13 @@ func (m *Manager) PendingWorkloadsInfo(cqName string) []*workload.Info { return cq.Snapshot() } +func (m *Manager) ClusterQueueFromLocalQueue(lqName string) (string, error) { + if lq, ok := m.localQueues[lqName]; ok { + return lq.ClusterQueue, nil + } + return "", errQueueDoesNotExist +} + // UpdateSnapshot computes the new snapshot and replaces if it differs from the // previous version. It returns true if the snapshot was actually updated. func (m *Manager) UpdateSnapshot(cqName string, maxCount int32) bool { diff --git a/pkg/visibility/api/install.go b/pkg/visibility/api/install.go index f3f741ca9f..48f8a9b761 100644 --- a/pkg/visibility/api/install.go +++ b/pkg/visibility/api/install.go @@ -47,9 +47,14 @@ func Install(server *genericapiserver.GenericAPIServer, kueueMgr *queue.Manager) apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(v1alpha1.GroupVersion.Group, Scheme, ParameterCodec, Codecs) pendingWorkloadsInCqREST := apirest.NewPendingWorkloadsInCqREST(kueueMgr) cqREST := apirest.NewCqREST() + pendingWorkloadsInLqREST := apirest.NewPendingWorkloadsInLqREST(kueueMgr) + lqREST := apirest.NewLqREST() + visibilityServerResources := map[string]rest.Storage{ "clusterqueues": cqREST, "clusterqueues/pendingworkloads": pendingWorkloadsInCqREST, + "localqueues": lqREST, + "localqueues/pendingworkloads": pendingWorkloadsInLqREST, } apiGroupInfo.VersionedResourcesStorageMap[v1alpha1.GroupVersion.Version] = visibilityServerResources return server.InstallAPIGroups(&apiGroupInfo) diff --git a/pkg/visibility/api/rest/local_queue.go b/pkg/visibility/api/rest/local_queue.go new file mode 100644 index 0000000000..c0fdb6bdd7 --- /dev/null +++ b/pkg/visibility/api/rest/local_queue.go @@ -0,0 +1,53 @@ +// Copyright 2023 The Kubernetes Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rest + +import ( + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apiserver/pkg/registry/rest" + + "sigs.k8s.io/kueue/apis/visibility/v1alpha1" +) + +// This type is used only to install localqueues/ resource so we can install localqueues/pending_workloads subresource. +// It implements the necessary interfaces for genericapiserver but does not provide any actual functionalities. +type LqREST struct{} + +// Those interfaces are necessary for genericapiserver to work properly +var _ rest.Storage = &LqREST{} +var _ rest.Scoper = &LqREST{} +var _ rest.SingularNameProvider = &LqREST{} + +func NewLqREST() *LqREST { + return &LqREST{} +} + +// New implements rest.Storage interface +func (m *LqREST) New() runtime.Object { + return &v1alpha1.PendingWorkloadsSummary{} +} + +// Destroy implements rest.Storage interface +func (m *LqREST) Destroy() {} + +// NamespaceScoped implements rest.Scoper interface +func (m *LqREST) NamespaceScoped() bool { + return true +} + +// GetSingularName implements rest.SingularNameProvider interface +func (m *LqREST) GetSingularName() string { + return "localqueue" +} diff --git a/pkg/visibility/api/rest/pending_workload_CQ.go b/pkg/visibility/api/rest/pending_workloads_cq.go similarity index 100% rename from pkg/visibility/api/rest/pending_workload_CQ.go rename to pkg/visibility/api/rest/pending_workloads_cq.go diff --git a/pkg/visibility/api/rest/pending_workload_CQ_test.go b/pkg/visibility/api/rest/pending_workloads_cq_test.go similarity index 99% rename from pkg/visibility/api/rest/pending_workload_CQ_test.go rename to pkg/visibility/api/rest/pending_workloads_cq_test.go index 656df427a1..1421768ac1 100644 --- a/pkg/visibility/api/rest/pending_workload_CQ_test.go +++ b/pkg/visibility/api/rest/pending_workloads_cq_test.go @@ -32,7 +32,7 @@ import ( utiltesting "sigs.k8s.io/kueue/pkg/util/testing" ) -func TestPendingWorkloads(t *testing.T) { +func TestPendingWorkloadsInCQ(t *testing.T) { const ( nsName = "foo" cqNameA = "cqA" diff --git a/pkg/visibility/api/rest/pending_workloads_lq.go b/pkg/visibility/api/rest/pending_workloads_lq.go new file mode 100644 index 0000000000..25a07640ba --- /dev/null +++ b/pkg/visibility/api/rest/pending_workloads_lq.go @@ -0,0 +1,117 @@ +// Copyright 2023 The Kubernetes Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rest + +import ( + "context" + "errors" + "fmt" + + "github.com/go-logr/logr" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + genericapirequest "k8s.io/apiserver/pkg/endpoints/request" + "k8s.io/apiserver/pkg/registry/rest" + ctrl "sigs.k8s.io/controller-runtime" + + v1alpha1 "sigs.k8s.io/kueue/apis/visibility/v1alpha1" + "sigs.k8s.io/kueue/pkg/constants" + "sigs.k8s.io/kueue/pkg/queue" + + _ "k8s.io/metrics/pkg/apis/metrics/install" +) + +type pendingWorkloadsInLqREST struct { + queueMgr *queue.Manager + log logr.Logger +} + +var _ rest.Storage = &pendingWorkloadsInLqREST{} +var _ rest.GetterWithOptions = &pendingWorkloadsInLqREST{} +var _ rest.Scoper = &pendingWorkloadsInLqREST{} + +var errQueueDoesNotExist = errors.New("queue doesn't exist") + +func NewPendingWorkloadsInLqREST(kueueMgr *queue.Manager) *pendingWorkloadsInLqREST { + return &pendingWorkloadsInLqREST{ + queueMgr: kueueMgr, + log: ctrl.Log.WithName("pending-workload-in-lq"), + } +} + +// New implements rest.Storage interface +func (m *pendingWorkloadsInLqREST) New() runtime.Object { + return &v1alpha1.PendingWorkloadsSummary{} +} + +// Destroy implements rest.Storage interface +func (m *pendingWorkloadsInLqREST) Destroy() {} + +// Get implements rest.GetterWithOptions interface +// It fetches information about pending workloads and returns according to query params +func (m *pendingWorkloadsInLqREST) Get(ctx context.Context, name string, opts runtime.Object) (runtime.Object, error) { + pendingWorkloadOpts, ok := opts.(*v1alpha1.PendingWorkloadOptions) + if !ok { + return nil, fmt.Errorf("invalid options object: %#v", opts) + } + limit := pendingWorkloadOpts.Limit + offset := pendingWorkloadOpts.Offset + + namespace := genericapirequest.NamespaceValue(ctx) + cqName, err := m.queueMgr.ClusterQueueFromLocalQueue(fmt.Sprintf("%s/%s", namespace, name)) + if err != nil { + return nil, errQueueDoesNotExist + } + + wls := make([]v1alpha1.PendingWorkload, 0, limit) + skippedWls := 0 + for index, wlInfo := range m.queueMgr.PendingWorkloadsInfo(cqName) { + if len(wls) >= int(limit) { + break + } + if wlInfo.Obj.Spec.QueueName == name { + if skippedWls < int(offset) { + skippedWls++ + } else { + // Add a workload to results + wls = append(wls, v1alpha1.PendingWorkload{ + ObjectMeta: metav1.ObjectMeta{ + Name: wlInfo.Obj.Name, + Namespace: wlInfo.Obj.Namespace, + }, + PositionInClusterQueue: int32(index), + Priority: *wlInfo.Obj.Spec.Priority, + LocalQueueName: name, + PositionInLocalQueue: int32(len(wls) + int(offset)), + }) + } + } + } + + return &v1alpha1.PendingWorkloadsSummary{Items: wls}, nil +} + +// NewGetOptions creates a new options object +func (m *pendingWorkloadsInLqREST) NewGetOptions() (runtime.Object, bool, string) { + // If no query parameters were passed the generated defaults function are not executed so it's necessary to set default values here as well + return &v1alpha1.PendingWorkloadOptions{ + Limit: constants.DefaultPendingWorkloadsLimit, + }, false, "" +} + +// NamespaceScoped implements rest.Scoper interface +func (m *pendingWorkloadsInLqREST) NamespaceScoped() bool { + return true +} diff --git a/pkg/visibility/api/rest/pending_workloads_lq_test.go b/pkg/visibility/api/rest/pending_workloads_lq_test.go new file mode 100644 index 0000000000..191c6821a0 --- /dev/null +++ b/pkg/visibility/api/rest/pending_workloads_lq_test.go @@ -0,0 +1,408 @@ +// Copyright 2023 The Kubernetes Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rest + +import ( + "context" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apiserver/pkg/endpoints/request" + + kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" + visibility "sigs.k8s.io/kueue/apis/visibility/v1alpha1" + "sigs.k8s.io/kueue/pkg/constants" + "sigs.k8s.io/kueue/pkg/queue" + utiltesting "sigs.k8s.io/kueue/pkg/util/testing" +) + +type req struct { + nsName string + queueName string + queryParams *visibility.PendingWorkloadOptions +} + +type resp struct { + wantErr error + wantPendingWorkloads []visibility.PendingWorkload +} + +func TestPendingWorkloadsInLQ(t *testing.T) { + const ( + nsNameA = "nsA" + nsNameB = "nsB" + cqNameA = "cqA" + cqNameB = "cqB" + lqNameA = "lqA" + lqNameB = "lqB" + lowPrio = 50 + highPrio = 100 + ) + + defaultQueryParams := &visibility.PendingWorkloadOptions{ + Offset: 0, + Limit: constants.DefaultPendingWorkloadsLimit, + } + + scheme := runtime.NewScheme() + if err := kueue.AddToScheme(scheme); err != nil { + t.Fatalf("Failed adding kueue scheme: %s", err) + } + if err := visibility.AddToScheme(scheme); err != nil { + t.Fatalf("Failed adding kueue scheme: %s", err) + } + + now := time.Now() + cases := map[string]struct { + clusterQueues []*kueue.ClusterQueue + queues []*kueue.LocalQueue + workloads []*kueue.Workload + wantResponse map[req]*resp + }{ + "single ClusterQueue and single LocalQueue setup with two workloads and default query parameters": { + clusterQueues: []*kueue.ClusterQueue{ + utiltesting.MakeClusterQueue(cqNameA).Obj(), + }, + queues: []*kueue.LocalQueue{ + utiltesting.MakeLocalQueue(lqNameA, nsNameA).ClusterQueue(cqNameA).Obj(), + }, + workloads: []*kueue.Workload{ + utiltesting.MakeWorkload("a", nsNameA).Queue(lqNameA).Priority(highPrio).Creation(now).Obj(), + utiltesting.MakeWorkload("b", nsNameA).Queue(lqNameA).Priority(lowPrio).Obj(), + }, + wantResponse: map[req]*resp{ + { + nsName: nsNameA, + queueName: lqNameA, + queryParams: defaultQueryParams, + }: { + wantPendingWorkloads: []visibility.PendingWorkload{ + { + ObjectMeta: v1.ObjectMeta{ + Name: "a", + Namespace: nsNameA, + }, + LocalQueueName: lqNameA, + Priority: highPrio, + PositionInClusterQueue: 0, + PositionInLocalQueue: 0, + }, + { + ObjectMeta: v1.ObjectMeta{ + Name: "b", + Namespace: nsNameA, + }, + LocalQueueName: lqNameA, + Priority: lowPrio, + PositionInClusterQueue: 1, + PositionInLocalQueue: 1, + }}, + }, + }, + }, + "single ClusterQueue and two LocalQueue setup with four workloads and default query parameters": { + clusterQueues: []*kueue.ClusterQueue{ + utiltesting.MakeClusterQueue(cqNameA).Obj(), + }, + queues: []*kueue.LocalQueue{ + utiltesting.MakeLocalQueue(lqNameA, nsNameA).ClusterQueue(cqNameA).Obj(), + utiltesting.MakeLocalQueue(lqNameB, nsNameA).ClusterQueue(cqNameA).Obj(), + }, + workloads: []*kueue.Workload{ + utiltesting.MakeWorkload("lqA-high-prio", nsNameA).Queue(lqNameA).Priority(highPrio).Creation(now).Obj(), + utiltesting.MakeWorkload("lqA-low-prio", nsNameA).Queue(lqNameA).Priority(lowPrio).Creation(now).Obj(), + utiltesting.MakeWorkload("lqB-high-prio", nsNameA).Queue(lqNameB).Priority(highPrio).Creation(now.Add(time.Second)).Obj(), + utiltesting.MakeWorkload("lqB-low-prio", nsNameA).Queue(lqNameB).Priority(lowPrio).Creation(now.Add(time.Second)).Obj(), + }, + wantResponse: map[req]*resp{ + { + nsName: nsNameA, + queueName: lqNameA, + queryParams: defaultQueryParams, + }: { + wantPendingWorkloads: []visibility.PendingWorkload{ + { + ObjectMeta: v1.ObjectMeta{ + Name: "lqA-high-prio", + Namespace: nsNameA, + }, + LocalQueueName: lqNameA, + Priority: highPrio, + PositionInClusterQueue: 0, + PositionInLocalQueue: 0, + }, + { + ObjectMeta: v1.ObjectMeta{ + Name: "lqA-low-prio", + Namespace: nsNameA, + }, + LocalQueueName: lqNameA, + Priority: lowPrio, + PositionInClusterQueue: 2, + PositionInLocalQueue: 1, + }}, + }, + { + nsName: nsNameA, + queueName: lqNameB, + queryParams: defaultQueryParams, + }: { + wantPendingWorkloads: []visibility.PendingWorkload{ + { + ObjectMeta: v1.ObjectMeta{ + Name: "lqB-high-prio", + Namespace: nsNameA, + }, + LocalQueueName: lqNameB, + Priority: highPrio, + PositionInClusterQueue: 1, + PositionInLocalQueue: 0, + }, + { + ObjectMeta: v1.ObjectMeta{ + Name: "lqB-low-prio", + Namespace: nsNameA, + }, + LocalQueueName: lqNameB, + Priority: lowPrio, + PositionInClusterQueue: 3, + PositionInLocalQueue: 1, + }}, + }, + }, + }, + "two Namespaces, two ClusterQueue and two LocalQueue setup with four workloads and default query parameters": { + clusterQueues: []*kueue.ClusterQueue{ + utiltesting.MakeClusterQueue(cqNameA).Obj(), + utiltesting.MakeClusterQueue(cqNameB).Obj(), + }, + queues: []*kueue.LocalQueue{ + utiltesting.MakeLocalQueue(lqNameA, nsNameA).ClusterQueue(cqNameA).Obj(), + utiltesting.MakeLocalQueue(lqNameB, nsNameB).ClusterQueue(cqNameB).Obj(), + }, + workloads: []*kueue.Workload{ + utiltesting.MakeWorkload("lqA-high-prio", nsNameA).Queue(lqNameA).Priority(highPrio).Creation(now).Obj(), + utiltesting.MakeWorkload("lqA-low-prio", nsNameA).Queue(lqNameA).Priority(lowPrio).Creation(now).Obj(), + utiltesting.MakeWorkload("lqB-high-prio", nsNameB).Queue(lqNameB).Priority(highPrio).Creation(now.Add(time.Second)).Obj(), + utiltesting.MakeWorkload("lqB-low-prio", nsNameB).Queue(lqNameB).Priority(lowPrio).Creation(now.Add(time.Second)).Obj(), + }, + wantResponse: map[req]*resp{ + { + nsName: nsNameA, + queueName: lqNameA, + queryParams: defaultQueryParams, + }: { + wantPendingWorkloads: []visibility.PendingWorkload{ + { + ObjectMeta: v1.ObjectMeta{ + Name: "lqA-high-prio", + Namespace: nsNameA, + }, + LocalQueueName: lqNameA, + Priority: highPrio, + PositionInClusterQueue: 0, + PositionInLocalQueue: 0, + }, + { + ObjectMeta: v1.ObjectMeta{ + Name: "lqA-low-prio", + Namespace: nsNameA, + }, + LocalQueueName: lqNameA, + Priority: lowPrio, + PositionInClusterQueue: 1, + PositionInLocalQueue: 1, + }}, + }, + { + nsName: nsNameB, + queueName: lqNameB, + queryParams: defaultQueryParams, + }: { + wantPendingWorkloads: []visibility.PendingWorkload{ + { + ObjectMeta: v1.ObjectMeta{ + Name: "lqB-high-prio", + Namespace: nsNameB, + }, + LocalQueueName: lqNameB, + Priority: highPrio, + PositionInClusterQueue: 0, + PositionInLocalQueue: 0, + }, + { + ObjectMeta: v1.ObjectMeta{ + Name: "lqB-low-prio", + Namespace: nsNameB, + }, + LocalQueueName: lqNameB, + Priority: lowPrio, + PositionInClusterQueue: 1, + PositionInLocalQueue: 1, + }}, + }, + }, + }, + "valid query parameters set": { + clusterQueues: []*kueue.ClusterQueue{ + utiltesting.MakeClusterQueue(cqNameA).Obj(), + }, + queues: []*kueue.LocalQueue{ + utiltesting.MakeLocalQueue(lqNameA, nsNameA).ClusterQueue(cqNameA).Obj(), + }, + workloads: []*kueue.Workload{ + utiltesting.MakeWorkload("a", nsNameA).Queue(lqNameA).Priority(highPrio).Creation(now).Obj(), + utiltesting.MakeWorkload("b", nsNameA).Queue(lqNameA).Priority(highPrio).Creation(now.Add(time.Second)).Obj(), + utiltesting.MakeWorkload("c", nsNameA).Queue(lqNameA).Priority(highPrio).Creation(now.Add(time.Second * 2)).Obj(), + }, + wantResponse: map[req]*resp{ + // Only limit is set + { + nsName: nsNameA, + queueName: lqNameA, + queryParams: &visibility.PendingWorkloadOptions{ + Limit: 2, + }, + }: { + wantPendingWorkloads: []visibility.PendingWorkload{ + { + ObjectMeta: v1.ObjectMeta{ + Name: "a", + Namespace: nsNameA, + }, + LocalQueueName: lqNameA, + Priority: highPrio, + PositionInClusterQueue: 0, + PositionInLocalQueue: 0, + }, + { + ObjectMeta: v1.ObjectMeta{ + Name: "b", + Namespace: nsNameA, + }, + LocalQueueName: lqNameA, + Priority: highPrio, + PositionInClusterQueue: 1, + PositionInLocalQueue: 1, + }}, + }, + // Only offset is set + { + nsName: nsNameA, + queueName: lqNameA, + queryParams: &visibility.PendingWorkloadOptions{ + Offset: 1, + Limit: constants.DefaultPendingWorkloadsLimit, + }, + }: { + wantPendingWorkloads: []visibility.PendingWorkload{ + { + ObjectMeta: v1.ObjectMeta{ + Name: "b", + Namespace: nsNameA, + }, + LocalQueueName: lqNameA, + Priority: highPrio, + PositionInClusterQueue: 1, + PositionInLocalQueue: 1, + }, + { + ObjectMeta: v1.ObjectMeta{ + Name: "c", + Namespace: nsNameA, + }, + LocalQueueName: lqNameA, + Priority: highPrio, + PositionInClusterQueue: 2, + PositionInLocalQueue: 2, + }}, + }, + // Both limit and offset are set + { + nsName: nsNameA, + queueName: lqNameA, + queryParams: &visibility.PendingWorkloadOptions{ + Offset: 1, + Limit: 1, + }, + }: { + wantPendingWorkloads: []visibility.PendingWorkload{ + { + ObjectMeta: v1.ObjectMeta{ + Name: "b", + Namespace: nsNameA, + }, + LocalQueueName: lqNameA, + Priority: highPrio, + PositionInClusterQueue: 1, + PositionInLocalQueue: 1, + }}, + }, + }, + }, + "invalid queue name": { + wantResponse: map[req]*resp{ + { + queueName: "invalid", + queryParams: defaultQueryParams, + }: { + wantErr: errQueueDoesNotExist, + }, + }, + }, + } + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + manager := queue.NewManager(utiltesting.NewFakeClient(), nil) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go manager.CleanUpOnContext(ctx) + pendingWorkloadsInLqRest := NewPendingWorkloadsInLqREST(manager) + for _, cq := range tc.clusterQueues { + if err := manager.AddClusterQueue(ctx, cq); err != nil { + t.Fatalf("Adding cluster queue %s: %v", cq.Name, err) + } + } + for _, q := range tc.queues { + if err := manager.AddLocalQueue(ctx, q); err != nil { + t.Fatalf("Adding queue %q: %v", q.Name, err) + } + } + for _, w := range tc.workloads { + manager.AddOrUpdateWorkload(w) + } + + for req, resp := range tc.wantResponse { + ctx := request.WithNamespace(ctx, req.nsName) + info, err := pendingWorkloadsInLqRest.Get(ctx, req.queueName, req.queryParams) + if err != nil { + if diff := cmp.Diff(resp.wantErr, err, cmpopts.EquateErrors()); diff != "" { + t.Errorf("Error differs: (-want,+got):\n%s", diff) + } + } else { + pendingWorkloadsInfo := info.(*visibility.PendingWorkloadsSummary) + if diff := cmp.Diff(resp.wantPendingWorkloads, pendingWorkloadsInfo.Items, cmpopts.IgnoreTypes(metav1.TypeMeta{})); diff != "" { + t.Errorf("Pending workloads differ: (-want,+got):\n%s", diff) + } + } + } + }) + } +} diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go index ac1ef42029..f26ed5bd01 100644 --- a/test/e2e/e2e_test.go +++ b/test/e2e/e2e_test.go @@ -48,8 +48,9 @@ var _ = ginkgo.Describe("Kueue visibility server", func() { localQueueA *kueue.LocalQueue localQueueB *kueue.LocalQueue clusterQueue *kueue.ClusterQueue - ns *corev1.Namespace - sampleJob1 *batchv1.Job + nsA *corev1.Namespace + nsB *corev1.Namespace + blockingJob *batchv1.Job sampleJob2 *batchv1.Job highPriorityClass *schedulingv1.PriorityClass midPriorityClass *schedulingv1.PriorityClass @@ -57,15 +58,22 @@ var _ = ginkgo.Describe("Kueue visibility server", func() { ) ginkgo.BeforeEach(func() { - ns = &corev1.Namespace{ + nsA = &corev1.Namespace{ ObjectMeta: metav1.ObjectMeta{ GenerateName: "e2e-", }, } - gomega.Expect(k8sClient.Create(ctx, ns)).To(gomega.Succeed()) + gomega.Expect(k8sClient.Create(ctx, nsA)).To(gomega.Succeed()) + nsB = &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "e2e-", + }, + } + gomega.Expect(k8sClient.Create(ctx, nsB)).To(gomega.Succeed()) }) ginkgo.AfterEach(func() { - gomega.Expect(util.DeleteNamespace(ctx, k8sClient, ns)).To(gomega.Succeed()) + gomega.Expect(util.DeleteNamespace(ctx, k8sClient, nsA)).To(gomega.Succeed()) + gomega.Expect(util.DeleteNamespace(ctx, k8sClient, nsB)).To(gomega.Succeed()) }) ginkgo.When("There are pending workloads due to capacity maxed by the admitted job", func() { @@ -82,10 +90,10 @@ var _ = ginkgo.Describe("Kueue visibility server", func() { Obj() gomega.Expect(k8sClient.Create(ctx, clusterQueue)).Should(gomega.Succeed()) - localQueueA = testing.MakeLocalQueue("a", ns.Name).ClusterQueue(clusterQueue.Name).Obj() + localQueueA = testing.MakeLocalQueue("a", nsA.Name).ClusterQueue(clusterQueue.Name).Obj() gomega.Expect(k8sClient.Create(ctx, localQueueA)).Should(gomega.Succeed()) - localQueueB = testing.MakeLocalQueue("b", ns.Name).ClusterQueue(clusterQueue.Name).Obj() + localQueueB = testing.MakeLocalQueue("b", nsA.Name).ClusterQueue(clusterQueue.Name).Obj() gomega.Expect(k8sClient.Create(ctx, localQueueB)).Should(gomega.Succeed()) highPriorityClass = testing.MakePriorityClass("high").PriorityValue(100).Obj() @@ -98,7 +106,7 @@ var _ = ginkgo.Describe("Kueue visibility server", func() { gomega.Expect(k8sClient.Create(ctx, lowPriorityClass)) ginkgo.By("Schedule a job that when admitted workload blocks the queue", func() { - sampleJob1 = testingjob.MakeJob("test-job-1", ns.Name). + blockingJob = testingjob.MakeJob("test-job-1", nsA.Name). Queue(localQueueA.Name). Image("gcr.io/k8s-staging-perf-tests/sleep:v0.0.3", []string{"60s", "-termination-grace-period", "0s"}). Request(corev1.ResourceCPU, "1"). @@ -106,7 +114,7 @@ var _ = ginkgo.Describe("Kueue visibility server", func() { BackoffLimit(0). PriorityClass(highPriorityClass.Name). Obj() - gomega.Expect(k8sClient.Create(ctx, sampleJob1)).Should(gomega.Succeed()) + gomega.Expect(k8sClient.Create(ctx, blockingJob)).Should(gomega.Succeed()) }) }) ginkgo.AfterEach(func() { @@ -115,22 +123,139 @@ var _ = ginkgo.Describe("Kueue visibility server", func() { gomega.Expect(k8sClient.Delete(ctx, highPriorityClass)).To(gomega.Succeed()) gomega.Expect(util.DeleteLocalQueue(ctx, k8sClient, localQueueB)).Should(gomega.Succeed()) gomega.Expect(util.DeleteLocalQueue(ctx, k8sClient, localQueueA)).Should(gomega.Succeed()) - gomega.Expect(util.DeleteAllJobsInNamespace(ctx, k8sClient, ns)).Should(gomega.Succeed()) + gomega.Expect(util.DeleteAllJobsInNamespace(ctx, k8sClient, nsA)).Should(gomega.Succeed()) util.ExpectClusterQueueToBeDeleted(ctx, k8sClient, clusterQueue, true) util.ExpectResourceFlavorToBeDeleted(ctx, k8sClient, defaultRF, true) }) - ginkgo.It("Should allow fetching information about pending workloads", func() { + ginkgo.It("Should allow fetching information about pending workloads in ClusterQueue", func() { ginkgo.By("Verify there are zero pending workloads", func() { + info, err := visibilityClient.LocalQueues(nsA.Name).GetPendingWorkloadsSummary(ctx, localQueueA.Name, metav1.GetOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + gomega.Expect(len(info.Items)).Should(gomega.Equal(0)) + }) + + ginkgo.By("Schedule a job which is pending due to lower priority", func() { + sampleJob2 = testingjob.MakeJob("test-job-2", nsA.Name). + Queue(localQueueA.Name). + Image("gcr.io/k8s-staging-perf-tests/sleep:v0.0.3", []string{"1ms"}). + Request(corev1.ResourceCPU, "1"). + PriorityClass(lowPriorityClass.Name). + Obj() + gomega.Expect(k8sClient.Create(ctx, sampleJob2)).Should(gomega.Succeed()) + }) + + ginkgo.By("Verify there is one pending workload", func() { + gomega.Eventually(func() int { + info, err := visibilityClient.ClusterQueues().GetPendingWorkloadsSummary(ctx, clusterQueue.Name, metav1.GetOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + return len(info.Items) + }, util.Timeout, util.Interval).Should(gomega.Equal(1)) + }) + + ginkgo.By("Await for pods to be running", func() { + gomega.Eventually(func() int { + createdJob := &batchv1.Job{} + gomega.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(blockingJob), createdJob)).Should(gomega.Succeed()) + return int(*createdJob.Status.Ready) + }, util.Timeout, util.Interval).Should(gomega.Equal(1)) + }) + + ginkgo.By("Terminate execution of the first workload to release the quota", func() { + gomega.Expect(util.DeleteAllPodsInNamespace(ctx, k8sClient, nsA)).Should(gomega.Succeed()) + }) + + ginkgo.By("Verify there are zero pending workloads, after the second workload is admitted", func() { gomega.Eventually(func() int { info, err := visibilityClient.ClusterQueues().GetPendingWorkloadsSummary(ctx, clusterQueue.Name, metav1.GetOptions{}) gomega.Expect(err).NotTo(gomega.HaveOccurred()) return len(info.Items) }, util.Timeout, util.Interval).Should(gomega.Equal(0)) }) + }) + + ginkgo.It("Should allow fetching information about position of pending workloads in ClusterQueue", func() { + ginkgo.By("Schedule three different jobs with different priorities and two different LocalQueues", func() { + jobCases := []struct { + JobName string + JobPrioClassName string + LocalQueueName string + }{ + { + JobName: "lq-a-high-prio", + JobPrioClassName: highPriorityClass.Name, + LocalQueueName: localQueueA.Name, + }, + { + JobName: "lq-b-mid-prio", + JobPrioClassName: midPriorityClass.Name, + LocalQueueName: localQueueB.Name, + }, + { + JobName: "lq-b-low-prio", + JobPrioClassName: lowPriorityClass.Name, + LocalQueueName: localQueueB.Name, + }, + } + for _, jobCase := range jobCases { + job := testingjob.MakeJob(jobCase.JobName, nsA.Name). + Queue(jobCase.LocalQueueName). + Image("gcr.io/k8s-staging-perf-tests/sleep:v0.0.3", []string{"1ms"}). + Request(corev1.ResourceCPU, "1"). + PriorityClass(jobCase.JobPrioClassName). + Obj() + gomega.Expect(k8sClient.Create(ctx, job)).Should(gomega.Succeed()) + } + }) + + ginkgo.By("Verify their positions and priorities", func() { + wantPendingWorkloads := []visibility.PendingWorkload{ + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: nsA.Name, + }, + Priority: highPriorityClass.Value, + PositionInLocalQueue: 0, + PositionInClusterQueue: 0, + LocalQueueName: localQueueA.Name, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: nsA.Name, + }, + Priority: midPriorityClass.Value, + PositionInLocalQueue: 0, + PositionInClusterQueue: 1, + LocalQueueName: localQueueB.Name, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: nsA.Name, + }, + Priority: lowPriorityClass.Value, + PositionInLocalQueue: 1, + PositionInClusterQueue: 2, + LocalQueueName: localQueueB.Name, + }, + } + gomega.Eventually(func() []visibility.PendingWorkload { + info, err := visibilityClient.ClusterQueues().GetPendingWorkloadsSummary(ctx, clusterQueue.Name, metav1.GetOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + return info.Items + // We do not check Name as it's generated for workloads + }, util.Timeout, util.Interval).Should(gomega.BeComparableTo(wantPendingWorkloads, cmpopts.IgnoreFields(metav1.ObjectMeta{}, "Name"))) + }) + }) + + ginkgo.It("Should allow fetching information about pending workloads in LocalQueue", func() { + ginkgo.By("Verify there are zero pending workloads", func() { + info, err := visibilityClient.LocalQueues(nsA.Name).GetPendingWorkloadsSummary(ctx, localQueueA.Name, metav1.GetOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + gomega.Expect(len(info.Items)).Should(gomega.Equal(0)) + }) ginkgo.By("Schedule a job which is pending due to lower priority", func() { - sampleJob2 = testingjob.MakeJob("test-job-2", ns.Name). + sampleJob2 = testingjob.MakeJob("test-job-2", nsA.Name). Queue(localQueueA.Name). Image("gcr.io/k8s-staging-perf-tests/sleep:v0.0.3", []string{"1ms"}). Request(corev1.ResourceCPU, "1"). @@ -141,7 +266,7 @@ var _ = ginkgo.Describe("Kueue visibility server", func() { ginkgo.By("Verify there is one pending workload", func() { gomega.Eventually(func() int { - info, err := visibilityClient.ClusterQueues().GetPendingWorkloadsSummary(ctx, clusterQueue.Name, metav1.GetOptions{}) + info, err := visibilityClient.LocalQueues(nsA.Name).GetPendingWorkloadsSummary(ctx, localQueueA.Name, metav1.GetOptions{}) gomega.Expect(err).NotTo(gomega.HaveOccurred()) return len(info.Items) }, util.Timeout, util.Interval).Should(gomega.Equal(1)) @@ -150,25 +275,25 @@ var _ = ginkgo.Describe("Kueue visibility server", func() { ginkgo.By("Await for pods to be running", func() { gomega.Eventually(func() int { createdJob := &batchv1.Job{} - gomega.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(sampleJob1), createdJob)).Should(gomega.Succeed()) + gomega.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(blockingJob), createdJob)).Should(gomega.Succeed()) return int(*createdJob.Status.Ready) }, util.Timeout, util.Interval).Should(gomega.Equal(1)) }) ginkgo.By("Terminate execution of the first workload to release the quota", func() { - gomega.Expect(util.DeleteAllPodsInNamespace(ctx, k8sClient, ns)).Should(gomega.Succeed()) + gomega.Expect(util.DeleteAllPodsInNamespace(ctx, k8sClient, nsA)).Should(gomega.Succeed()) }) ginkgo.By("Verify there are zero pending workloads, after the second workload is admitted", func() { gomega.Eventually(func() int { - info, err := visibilityClient.ClusterQueues().GetPendingWorkloadsSummary(ctx, clusterQueue.Name, metav1.GetOptions{}) + info, err := visibilityClient.LocalQueues(nsA.Name).GetPendingWorkloadsSummary(ctx, localQueueA.Name, metav1.GetOptions{}) gomega.Expect(err).NotTo(gomega.HaveOccurred()) return len(info.Items) }, util.Timeout, util.Interval).Should(gomega.Equal(0)) }) }) - ginkgo.It("Should allow fetching information about position of pending workloads", func() { + ginkgo.It("Should allow fetching information about position of pending workloads from different LocalQueues", func() { ginkgo.By("Schedule three different jobs with different priorities and two different LocalQueues", func() { jobCases := []struct { JobName string @@ -192,7 +317,7 @@ var _ = ginkgo.Describe("Kueue visibility server", func() { }, } for _, jobCase := range jobCases { - job := testingjob.MakeJob(jobCase.JobName, ns.Name). + job := testingjob.MakeJob(jobCase.JobName, nsA.Name). Queue(jobCase.LocalQueueName). Image("gcr.io/k8s-staging-perf-tests/sleep:v0.0.3", []string{"1ms"}). Request(corev1.ResourceCPU, "1"). @@ -202,20 +327,31 @@ var _ = ginkgo.Describe("Kueue visibility server", func() { } }) - ginkgo.By("Verify their positions and priorities", func() { + ginkgo.By("Verify their positions and priorities in LocalQueueA", func() { wantPendingWorkloads := []visibility.PendingWorkload{ { ObjectMeta: metav1.ObjectMeta{ - Namespace: ns.Name, + Namespace: nsA.Name, }, Priority: highPriorityClass.Value, PositionInLocalQueue: 0, PositionInClusterQueue: 0, LocalQueueName: localQueueA.Name, }, + } + gomega.Eventually(func() []visibility.PendingWorkload { + info, err := visibilityClient.LocalQueues(nsA.Name).GetPendingWorkloadsSummary(ctx, localQueueA.Name, metav1.GetOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + return info.Items + // We do not check Name as it's generated for workloads + }, util.Timeout, util.Interval).Should(gomega.BeComparableTo(wantPendingWorkloads, cmpopts.IgnoreFields(metav1.ObjectMeta{}, "Name"))) + }) + + ginkgo.By("Verify their positions and priorities in LocalQueueB", func() { + wantPendingWorkloads := []visibility.PendingWorkload{ { ObjectMeta: metav1.ObjectMeta{ - Namespace: ns.Name, + Namespace: nsA.Name, }, Priority: midPriorityClass.Value, PositionInLocalQueue: 0, @@ -224,7 +360,7 @@ var _ = ginkgo.Describe("Kueue visibility server", func() { }, { ObjectMeta: metav1.ObjectMeta{ - Namespace: ns.Name, + Namespace: nsA.Name, }, Priority: lowPriorityClass.Value, PositionInLocalQueue: 1, @@ -233,7 +369,100 @@ var _ = ginkgo.Describe("Kueue visibility server", func() { }, } gomega.Eventually(func() []visibility.PendingWorkload { - info, err := visibilityClient.ClusterQueues().GetPendingWorkloadsSummary(ctx, clusterQueue.Name, metav1.GetOptions{}) + info, err := visibilityClient.LocalQueues(nsA.Name).GetPendingWorkloadsSummary(ctx, localQueueB.Name, metav1.GetOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + return info.Items + // We do not check Name as it's generated for workloads + }, util.Timeout, util.Interval).Should(gomega.BeComparableTo(wantPendingWorkloads, cmpopts.IgnoreFields(metav1.ObjectMeta{}, "Name"))) + }) + }) + ginkgo.It("Should allow fetching information about position of pending workloads from different LocalQueues from different Namespaces", func() { + + ginkgo.By("Create a LocalQueue in a different Namespace", func() { + localQueueB = testing.MakeLocalQueue("b", nsB.Name).ClusterQueue(clusterQueue.Name).Obj() + gomega.Expect(k8sClient.Create(ctx, localQueueB)).Should(gomega.Succeed()) + }) + + ginkgo.By("Schedule three different jobs with different priorities and different LocalQueues in different Namespaces", func() { + jobCases := []struct { + JobName string + JobPrioClassName string + LocalQueueName string + nsName string + }{ + { + JobName: "lq-a-high-prio", + JobPrioClassName: highPriorityClass.Name, + LocalQueueName: localQueueA.Name, + nsName: nsA.Name, + }, + { + JobName: "lq-b-mid-prio", + JobPrioClassName: midPriorityClass.Name, + LocalQueueName: localQueueB.Name, + nsName: nsB.Name, + }, + { + JobName: "lq-b-low-prio", + JobPrioClassName: lowPriorityClass.Name, + LocalQueueName: localQueueB.Name, + nsName: nsB.Name, + }, + } + for _, jobCase := range jobCases { + job := testingjob.MakeJob(jobCase.JobName, jobCase.nsName). + Queue(jobCase.LocalQueueName). + Image("gcr.io/k8s-staging-perf-tests/sleep:v0.0.3", []string{"1ms"}). + Request(corev1.ResourceCPU, "1"). + PriorityClass(jobCase.JobPrioClassName). + Obj() + gomega.Expect(k8sClient.Create(ctx, job)).Should(gomega.Succeed()) + } + }) + + ginkgo.By("Verify their positions and priorities in LocalQueueA", func() { + wantPendingWorkloads := []visibility.PendingWorkload{ + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: nsA.Name, + }, + Priority: highPriorityClass.Value, + PositionInLocalQueue: 0, + PositionInClusterQueue: 0, + LocalQueueName: localQueueA.Name, + }, + } + gomega.Eventually(func() []visibility.PendingWorkload { + info, err := visibilityClient.LocalQueues(nsA.Name).GetPendingWorkloadsSummary(ctx, localQueueA.Name, metav1.GetOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + return info.Items + // We do not check Name as it's generated for workloads + }, util.Timeout, util.Interval).Should(gomega.BeComparableTo(wantPendingWorkloads, cmpopts.IgnoreFields(metav1.ObjectMeta{}, "Name"))) + }) + + ginkgo.By("Verify their positions and priorities in LocalQueueB", func() { + wantPendingWorkloads := []visibility.PendingWorkload{ + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: nsB.Name, + }, + Priority: midPriorityClass.Value, + PositionInLocalQueue: 0, + PositionInClusterQueue: 1, + LocalQueueName: localQueueB.Name, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: nsB.Name, + }, + Priority: lowPriorityClass.Value, + PositionInLocalQueue: 1, + PositionInClusterQueue: 2, + LocalQueueName: localQueueB.Name, + }, + } + gomega.Eventually(func() []visibility.PendingWorkload { + info, err := visibilityClient.LocalQueues(nsA.Name).GetPendingWorkloadsSummary(ctx, localQueueB.Name, metav1.GetOptions{}) gomega.Expect(err).NotTo(gomega.HaveOccurred()) return info.Items // We do not check Name as it's generated for workloads