Skip to content

Commit

Permalink
Passing parameters to ProvisioningRequest from annotations
Browse files Browse the repository at this point in the history
  • Loading branch information
PBundyra committed Mar 20, 2024
1 parent ba46285 commit 9072875
Show file tree
Hide file tree
Showing 25 changed files with 605 additions and 41 deletions.
25 changes: 20 additions & 5 deletions pkg/controller/admissionchecks/provisioning/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"maps"
"regexp"
"strconv"
"strings"
"time"

corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -57,6 +58,8 @@ const (
// 253 is the maximal length for a CRD name. We need to subtract one for '-', and the hash length.
objNameMaxPrefixLength = 252 - objNameHashLength
podTemplatesPrefix = "ppt"

provReqAnnotationPrefix = "provreq.kueue.x-k8s.io/"
)

var (
Expand Down Expand Up @@ -162,6 +165,7 @@ func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (reco
if requeAfter != nil {
return reconcile.Result{RequeueAfter: *requeAfter}, nil
}

return reconcile.Result{}, nil
}

Expand Down Expand Up @@ -267,6 +271,7 @@ func (c *Controller) syncOwnedProvisionRequest(ctx context.Context, wl *kueue.Wo
Parameters: parametersKueueToProvisioning(prc.Spec.Parameters),
},
}
passProvReqParams(wl, req)

expectedPodSets := requiredPodSets(wl.Spec.PodSets, prc.Spec.ManagedResources)
psaMap := slices.ToRefMap(wl.Status.Admission.PodSetAssignments, func(p *kueue.PodSetAssignment) string { return p.Name })
Expand Down Expand Up @@ -438,17 +443,27 @@ func requestHasParameters(req *autoscaling.ProvisioningRequest, prc *kueue.Provi
if req.Spec.ProvisioningClassName != prc.Spec.ProvisioningClassName {
return false
}
if len(req.Spec.Parameters) != len(prc.Spec.Parameters) {
return false
}
for k, vReq := range req.Spec.Parameters {
if vCfg, found := prc.Spec.Parameters[k]; !found || vReq != autoscaling.Parameter(vCfg) {
for k, vCfg := range prc.Spec.Parameters {
if vReq, found := req.Spec.Parameters[k]; !found || vReq != autoscaling.Parameter(vCfg) {
return false
}
}
return true
}

// passProvReqParams extracts from Workload's annotations ones that should be passed to ProvisioningRequest
func passProvReqParams(wl *kueue.Workload, req *autoscaling.ProvisioningRequest) {
if req.Spec.Parameters == nil {
req.Spec.Parameters = make(map[string]autoscaling.Parameter, 0)
}
for annotation, val := range wl.Annotations {
if strings.HasPrefix(annotation, provReqAnnotationPrefix) {
paramName := strings.Split(annotation, "/")[1]
req.Spec.Parameters[paramName] = autoscaling.Parameter(val)
}
}
}

func (c *Controller) syncCheckStates(ctx context.Context, wl *kueue.Workload, checks []string, activeOrLastPRForChecks map[string]*autoscaling.ProvisioningRequest) error {
log := ctrl.LoggerFrom(ctx)
checksMap := slices.ToRefMap(wl.Status.AdmissionChecks, func(c *kueue.AdmissionCheckState) string { return c.Name })
Expand Down
45 changes: 42 additions & 3 deletions pkg/controller/admissionchecks/provisioning/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ func TestReconcile(t *testing.T) {
}).
Obj()

basePodSet := []autoscaling.PodSet{{PodTemplateRef: autoscaling.Reference{Name: "ppt-wl-check1-1-main"}, Count: 1}}

baseWorkloadWithCheck1Ready := baseWorkload.DeepCopy()
workload.SetAdmissionCheckState(&baseWorkloadWithCheck1Ready.Status.AdmissionChecks, kueue.AdmissionCheckState{
Name: "check1",
Expand Down Expand Up @@ -311,6 +313,46 @@ func TestReconcile(t *testing.T) {
},
},
},
"workload with provreq annotation": {
workload: utiltesting.MakeWorkload("wl", TestNamespace).
SetAnnotations(map[string]string{
"provreq.kueue.x-k8s.io/ValidUntilSeconds": "0",
"invalid-provreq-prefix/MyAnnotation": "MyValue"}).
AdmissionChecks(kueue.AdmissionCheckState{
Name: "check1",
State: kueue.CheckStatePending}).
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Obj(),
checks: []kueue.AdmissionCheck{*baseCheck.DeepCopy()},
configs: []kueue.ProvisioningRequestConfig{{ObjectMeta: metav1.ObjectMeta{Name: "config1"}}},
wantRequests: map[string]*autoscaling.ProvisioningRequest{
GetProvisioningRequestName("wl", baseCheck.Name, 1): {
ObjectMeta: metav1.ObjectMeta{
Namespace: TestNamespace,
Name: GetProvisioningRequestName("wl", baseCheck.Name, 1),
OwnerReferences: []metav1.OwnerReference{
{
Name: "wl",
},
},
},
Spec: autoscaling.ProvisioningRequestSpec{
Parameters: map[string]autoscaling.Parameter{
"ValidUntilSeconds": "0",
},
PodSets: basePodSet,
},
},
},
wantEvents: []utiltesting.EventRecord{
{
Key: client.ObjectKeyFromObject(baseWorkload),
EventType: corev1.EventTypeNormal,
Reason: "ProvisioningRequestCreated",
Message: `Created ProvisioningRequest: "wl-check1-1"`,
},
},
},
"remove unnecessary requests": {
workload: baseWorkload.DeepCopy(),
checks: []kueue.AdmissionCheck{*baseCheck.DeepCopy()},
Expand Down Expand Up @@ -646,7 +688,6 @@ func TestReconcile(t *testing.T) {
gotWl := &kueue.Workload{}
if err := k8sclient.Get(ctx, types.NamespacedName{Namespace: TestNamespace, Name: name}, gotWl); err != nil {
t.Errorf("unexpected error getting workload %q", name)

}

if diff := cmp.Diff(wantWl, gotWl, wlCmpOptions...); diff != "" {
Expand All @@ -658,7 +699,6 @@ func TestReconcile(t *testing.T) {
gotRequest := &autoscaling.ProvisioningRequest{}
if err := k8sclient.Get(ctx, types.NamespacedName{Namespace: TestNamespace, Name: name}, gotRequest); err != nil {
t.Errorf("unexpected error getting request %q", name)

}

if diff := cmp.Diff(wantRequest, gotRequest, reqCmpOptions...); diff != "" {
Expand All @@ -670,7 +710,6 @@ func TestReconcile(t *testing.T) {
gotTemplate := &corev1.PodTemplate{}
if err := k8sclient.Get(ctx, types.NamespacedName{Namespace: TestNamespace, Name: name}, gotTemplate); err != nil {
t.Errorf("unexpected error getting template %q", name)

}

if diff := cmp.Diff(wantTemplate, gotTemplate, tmplCmpOptions...); diff != "" {
Expand Down
9 changes: 5 additions & 4 deletions pkg/controller/jobframework/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -803,10 +803,11 @@ func (r *JobReconciler) constructWorkload(ctx context.Context, job GenericJob, o

wl := &kueue.Workload{
ObjectMeta: metav1.ObjectMeta{
Name: GetWorkloadNameForOwnerWithGVK(object.GetName(), object.GetUID(), job.GVK()),
Namespace: object.GetNamespace(),
Labels: map[string]string{},
Finalizers: []string{kueue.ResourceInUseFinalizerName},
Name: GetWorkloadNameForOwnerWithGVK(object.GetName(), object.GetUID(), job.GVK()),
Namespace: object.GetNamespace(),
Labels: map[string]string{},
Finalizers: []string{kueue.ResourceInUseFinalizerName},
Annotations: job.Object().GetAnnotations(),
},
Spec: kueue.WorkloadSpec{
PodSets: podSets,
Expand Down
6 changes: 5 additions & 1 deletion pkg/controller/jobframework/reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"

configapi "sigs.k8s.io/kueue/apis/config/v1beta1"
_ "sigs.k8s.io/kueue/pkg/controller/jobs"

"sigs.k8s.io/kueue/pkg/util/kubeversion"
utiltesting "sigs.k8s.io/kueue/pkg/util/testing"
testingjob "sigs.k8s.io/kueue/pkg/util/testingjobs/job"
Expand All @@ -38,6 +38,10 @@ import (
. "sigs.k8s.io/kueue/pkg/controller/jobframework"
)

const (
testNamespace = "ns"
)

func TestIsParentJobManaged(t *testing.T) {
parentJobName := "test-job-parent"
childJobName := "test-job-child"
Expand Down
34 changes: 32 additions & 2 deletions pkg/controller/jobs/job/job_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,36 @@ func TestReconciler(t *testing.T) {
wantEvents []utiltesting.EventRecord
wantErr error
}{
"when workload is created, it has its owner annotations": {
job: *baseJobWrapper.Clone().
SetAnnotation("test-annotation", "test-val").
UID("test-uid").
Obj(),
wantJob: *baseJobWrapper.Clone().
SetAnnotation("test-annotation", "test-val").
UID("test-uid").
Suspend(true).
Obj(),
wantWorkloads: []kueue.Workload{
*utiltesting.MakeWorkload("job", "ns").
SetAnnotations(map[string]string{"test-annotation": "test-val"}).
Finalizers(kueue.ResourceInUseFinalizerName).
PodSets(*utiltesting.MakePodSet(kueue.DefaultPodSetName, 10).Request(corev1.ResourceCPU, "1").Obj()).
Queue("foo").
Priority(0).
Labels(map[string]string{controllerconsts.JobUIDLabel: "test-uid"}).
Obj(),
},

wantEvents: []utiltesting.EventRecord{
{
Key: types.NamespacedName{Name: "job", Namespace: "ns"},
EventType: "Normal",
Reason: "CreatedWorkload",
Message: "Created Workload: ns/" + GetWorkloadNameForJob(baseJobWrapper.Name, types.UID("test-uid")),
},
},
},
"when workload is admitted the PodSetUpdates are propagated to job": {
job: *baseJobWrapper.Clone().
Obj(),
Expand Down Expand Up @@ -1692,17 +1722,17 @@ func TestReconciler(t *testing.T) {
job: *baseJobWrapper.
Clone().
Suspend(false).
Queue("test-queue").
UID("test-uid").
Queue("test-queue").
WorkloadPriorityClass("test-wpc").
Obj(),
priorityClasses: []client.Object{
baseWPCWrapper.Obj(),
},
wantJob: *baseJobWrapper.
Clone().
Queue("test-queue").
UID("test-uid").
Queue("test-queue").
WorkloadPriorityClass("test-wpc").
Obj(),
wantWorkloads: []kueue.Workload{
Expand Down
16 changes: 10 additions & 6 deletions pkg/controller/jobs/jobset/jobset_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,8 @@ var (
}
workloadCmpOpts = []cmp.Option{
cmpopts.EquateEmpty(),
cmpopts.IgnoreFields(kueue.Workload{}, "TypeMeta", "ObjectMeta"),
cmpopts.IgnoreFields(kueue.Workload{}, "TypeMeta"),
cmpopts.IgnoreFields(metav1.ObjectMeta{}, "Name", "Labels", "ResourceVersion", "OwnerReferences", "Finalizers"),
cmpopts.IgnoreFields(kueue.WorkloadSpec{}, "Priority"),
cmpopts.IgnoreFields(metav1.Condition{}, "LastTransitionTime"),
cmpopts.IgnoreFields(kueue.PodSet{}, "Template"),
Expand All @@ -223,7 +224,7 @@ func TestReconciler(t *testing.T) {
wantWorkloads []kueue.Workload
wantErr error
}{
"workload is created with podsets": {
"workload is created with podsets and annotation": {
reconcilerOptions: []jobframework.Option{
jobframework.WithManageJobsWithoutQueueName(true),
},
Expand All @@ -239,8 +240,9 @@ func TestReconciler(t *testing.T) {
Replicas: 2,
Completions: 2,
Parallelism: 2,
},
).Obj(),
}).
SetAnnotations(map[string]string{"test-annotation": "test-val"}).
Obj(),
wantJob: testingjobset.MakeJobSet("jobset", "ns").ReplicatedJobs(
testingjobset.ReplicatedJobRequirements{
Name: "replicated-job-1",
Expand All @@ -253,10 +255,12 @@ func TestReconciler(t *testing.T) {
Replicas: 2,
Completions: 2,
Parallelism: 2,
},
).Obj(),
}).
SetAnnotations(map[string]string{"test-annotation": "test-val"}).
Obj(),
wantWorkloads: []kueue.Workload{
*utiltesting.MakeWorkload("jobset", "ns").
SetAnnotations(map[string]string{"test-annotation": "test-val"}).
PodSets(
*utiltesting.MakePodSet("replicated-job-1", 1).Obj(),
*utiltesting.MakePodSet("replicated-job-2", 4).Obj(),
Expand Down
25 changes: 23 additions & 2 deletions pkg/controller/jobs/kubeflow/jobs/mxjob/mxjob_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,8 +301,8 @@ var (
}
workloadCmpOpts = cmp.Options{
cmpopts.EquateEmpty(),
cmpopts.IgnoreFields(kueue.Workload{}, "TypeMeta", "ObjectMeta"),
cmpopts.IgnoreFields(kueue.WorkloadSpec{}, "Priority"),
cmpopts.IgnoreFields(kueue.Workload{}, "TypeMeta"),
cmpopts.IgnoreFields(metav1.ObjectMeta{}, "Name", "Labels", "ResourceVersion", "OwnerReferences", "Finalizers"), cmpopts.IgnoreFields(kueue.WorkloadSpec{}, "Priority"),
cmpopts.IgnoreFields(metav1.Condition{}, "LastTransitionTime"),
cmpopts.IgnoreFields(kueue.PodSet{}, "Template"),
}
Expand Down Expand Up @@ -334,6 +334,27 @@ func TestReconciler(t *testing.T) {
Obj(),
},
},
"workload is created with an annotation": {
reconcilerOptions: []jobframework.Option{
jobframework.WithManageJobsWithoutQueueName(true),
},
job: testingmxjob.MakeMXJob("mxjob", "ns").
SetAnnotations(map[string]string{"test-annotation": "test-val"}).
Obj(),
wantJob: testingmxjob.MakeMXJob("mxjob", "ns").
SetAnnotations(map[string]string{"test-annotation": "test-val"}).
Obj(),
wantWorkloads: []kueue.Workload{
*utiltesting.MakeWorkload("mxjob", "ns").
SetAnnotations(map[string]string{"test-annotation": "test-val"}).
PodSets(
*utiltesting.MakePodSet("scheduler", 1).Obj(),
*utiltesting.MakePodSet("server", 1).Obj(),
*utiltesting.MakePodSet("worker", 1).Obj(),
).
Obj(),
},
},
"workload isn't created due to manageJobsWithoutQueueName=false": {
reconcilerOptions: []jobframework.Option{
jobframework.WithManageJobsWithoutQueueName(false),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,8 @@ var (
}
workloadCmpOpts = cmp.Options{
cmpopts.EquateEmpty(),
cmpopts.IgnoreFields(kueue.Workload{}, "TypeMeta", "ObjectMeta"),
cmpopts.IgnoreFields(kueue.Workload{}, "TypeMeta"),
cmpopts.IgnoreFields(metav1.ObjectMeta{}, "Name", "Labels", "ResourceVersion", "OwnerReferences", "Finalizers"),
cmpopts.IgnoreFields(kueue.WorkloadSpec{}, "Priority"),
cmpopts.IgnoreFields(metav1.Condition{}, "LastTransitionTime"),
cmpopts.IgnoreFields(kueue.PodSet{}, "Template"),
Expand Down Expand Up @@ -266,6 +267,26 @@ func TestReconciler(t *testing.T) {
Obj(),
},
},
"workload is created with an annotation": {
reconcilerOptions: []jobframework.Option{
jobframework.WithManageJobsWithoutQueueName(true),
},
job: testingpaddlejob.MakePaddleJob("paddlejob", "ns").
SetAnnotations(map[string]string{"test-annotation": "test-val"}).
Obj(),
wantJob: testingpaddlejob.MakePaddleJob("paddlejob", "ns").
SetAnnotations(map[string]string{"test-annotation": "test-val"}).
Obj(),
wantWorkloads: []kueue.Workload{
*utiltesting.MakeWorkload("paddlejob", "ns").
SetAnnotations(map[string]string{"test-annotation": "test-val"}).
PodSets(
*utiltesting.MakePodSet("master", 1).Obj(),
*utiltesting.MakePodSet("worker", 1).Obj(),
).
Obj(),
},
},
"workload isn't created due to manageJobsWithoutQueueName=false": {
reconcilerOptions: []jobframework.Option{
jobframework.WithManageJobsWithoutQueueName(false),
Expand Down
Loading

0 comments on commit 9072875

Please sign in to comment.