Skip to content

Commit

Permalink
Update wl/job handling for the case, when the job is finished (kubern…
Browse files Browse the repository at this point in the history
…etes-sigs#1383)

* Update wl/job handling for the case, when the job is finished

* Workload is not being recreated, when the job is
  finished.

* If related workload is not found, the job will be stopped
  only if it's not finished.

* Add job finalization if wl has a finished condition
  • Loading branch information
achernevskii authored and kannon92 committed Nov 19, 2024
1 parent a6b5d9d commit fa140c4
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 12 deletions.
27 changes: 16 additions & 11 deletions pkg/controller/jobframework/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,13 @@ func (r *JobReconciler) ReconcileGenericJob(ctx context.Context, req ctrl.Reques
}

if wl != nil && apimeta.IsStatusConditionTrue(wl.Status.Conditions, kueue.WorkloadFinished) {
// Finalize the job if it's finished
if _, finished := job.Finished(); finished {
if err := r.finalizeJob(ctx, job); err != nil {
return ctrl.Result{}, err
}
}

return ctrl.Result{}, r.removeFinalizer(ctx, wl)
}

Expand All @@ -258,17 +265,19 @@ func (r *JobReconciler) ReconcileGenericJob(ctx context.Context, req ctrl.Reques
}

// 2. handle job is finished.
if condition, finished := job.Finished(); finished && wl != nil {
if condition, finished := job.Finished(); finished {
if wl != nil && !apimeta.IsStatusConditionTrue(wl.Status.Conditions, kueue.WorkloadFinished) {
err := workload.UpdateStatus(ctx, r.client, wl, condition.Type, condition.Status, condition.Reason, condition.Message, constants.JobControllerName)
if err != nil && !apierrors.IsNotFound(err) {
return ctrl.Result{}, err
}
}

// Execute job finalization logic
if err := r.finalizeJob(ctx, job); err != nil {
return ctrl.Result{}, err
}

err := workload.UpdateStatus(ctx, r.client, wl, condition.Type, condition.Status, condition.Reason, condition.Message, constants.JobControllerName)
if err != nil {
log.Error(err, "Updating workload status")
}

return ctrl.Result{}, nil
}

Expand Down Expand Up @@ -460,11 +469,7 @@ func (r *JobReconciler) ensureOneWorkload(ctx context.Context, job GenericJob, o
w = toDelete[0]
}

if _, finished := job.Finished(); finished {
if err := r.finalizeJob(ctx, job); err != nil {
return nil, fmt.Errorf("finalizing job with no matching workload: %w", err)
}
} else {
if _, finished := job.Finished(); !finished {
if err := r.stopJob(ctx, job, w, StopReasonNoMatchingWorkload, "No matching Workload"); err != nil {
return nil, fmt.Errorf("stopping job with no matching workload: %w", err)
}
Expand Down
10 changes: 10 additions & 0 deletions pkg/controller/jobs/job/job_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1658,6 +1658,16 @@ func TestReconciler(t *testing.T) {
Obj(),
},
},
"the workload shouldn't be recreated for the completed job": {
job: *baseJobWrapper.Clone().
Condition(batchv1.JobCondition{Type: batchv1.JobComplete, Status: corev1.ConditionTrue}).
Obj(),
workloads: []kueue.Workload{},
wantJob: *baseJobWrapper.Clone().
Condition(batchv1.JobCondition{Type: batchv1.JobComplete, Status: corev1.ConditionTrue}).
Obj(),
wantWorkloads: []kueue.Workload{},
},
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
Expand Down
12 changes: 12 additions & 0 deletions pkg/controller/jobs/kubeflow/jobs/mxjob/mxjob_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,18 @@ func TestReconciler(t *testing.T) {
Obj(),
},
},
"workload shouldn't be recreated for the completed mx job": {
job: testingmxjob.MakeMXJob("mxjob", "ns").
Queue("foo").
StatusConditions(kftraining.JobCondition{Type: kftraining.JobSucceeded, Status: v1.ConditionTrue}).
Obj(),
workloads: []kueue.Workload{},
wantJob: testingmxjob.MakeMXJob("mxjob", "ns").
Queue("foo").
StatusConditions(kftraining.JobCondition{Type: kftraining.JobSucceeded, Status: v1.ConditionTrue}).
Obj(),
wantWorkloads: []kueue.Workload{},
},
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
Expand Down
97 changes: 96 additions & 1 deletion pkg/controller/jobs/pod/pod_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1461,6 +1461,101 @@ func TestReconciler(t *testing.T) {
},
workloadCmpOpts: defaultWorkloadCmpOpts,
},
"if pod group is finished and wl is deleted, new workload shouldn't be created": {
pods: []corev1.Pod{
*basePodWrapper.
Clone().
Label("kueue.x-k8s.io/managed", "true").
KueueFinalizer().
Group("test-group").
GroupTotalCount("2").
StatusPhase(corev1.PodSucceeded).
Obj(),
*basePodWrapper.
Clone().
Name("pod2").
Label("kueue.x-k8s.io/managed", "true").
KueueFinalizer().
Group("test-group").
GroupTotalCount("2").
StatusPhase(corev1.PodSucceeded).
Obj(),
},
wantPods: []corev1.Pod{
*basePodWrapper.
Clone().
Label("kueue.x-k8s.io/managed", "true").
Group("test-group").
GroupTotalCount("2").
StatusPhase(corev1.PodSucceeded).
Obj(),
*basePodWrapper.
Clone().
Name("pod2").
Label("kueue.x-k8s.io/managed", "true").
Group("test-group").
GroupTotalCount("2").
StatusPhase(corev1.PodSucceeded).
Obj(),
},
workloads: []kueue.Workload{},
wantWorkloads: []kueue.Workload{},
workloadCmpOpts: defaultWorkloadCmpOpts,
},
"if pod in group is scheduling gated and wl is deleted, workload should be recreated": {
pods: []corev1.Pod{
*basePodWrapper.
Clone().
Label("kueue.x-k8s.io/managed", "true").
KueueFinalizer().
KueueSchedulingGate().
Group("test-group").
GroupTotalCount("2").
Obj(),
*basePodWrapper.
Clone().
Name("pod2").
Label("kueue.x-k8s.io/managed", "true").
KueueFinalizer().
Group("test-group").
GroupTotalCount("2").
StatusPhase(corev1.PodSucceeded).
Obj(),
},
wantPods: []corev1.Pod{
*basePodWrapper.
Clone().
Label("kueue.x-k8s.io/managed", "true").
KueueFinalizer().
KueueSchedulingGate().
Group("test-group").
GroupTotalCount("2").
Obj(),
*basePodWrapper.
Clone().
Name("pod2").
Label("kueue.x-k8s.io/managed", "true").
KueueFinalizer().
Group("test-group").
GroupTotalCount("2").
StatusPhase(corev1.PodSucceeded).
Obj(),
},
workloads: []kueue.Workload{},
wantWorkloads: []kueue.Workload{
*utiltesting.MakeWorkload("test-group", "ns").Finalizers(kueue.ResourceInUseFinalizerName).
PodSets(
*utiltesting.MakePodSet("b990493b", 2).
SchedulingGates(corev1.PodSchedulingGate{Name: "kueue.x-k8s.io/admission"}).
Request(corev1.ResourceCPU, "1").
Obj(),
).
Queue("user-queue").
Priority(0).
Obj(),
},
workloadCmpOpts: defaultWorkloadCmpOpts,
},
}

for name, tc := range testCases {
Expand Down Expand Up @@ -1649,7 +1744,7 @@ func TestReconciler_ErrorFinalizingPod(t *testing.T) {
}

// Workload should be finished after the second reconcile
wantWl := *utiltesting.MakeWorkload("unit-test", "ns").Finalizers(kueue.ResourceInUseFinalizerName).
wantWl := *utiltesting.MakeWorkload("unit-test", "ns").
PodSets(*utiltesting.MakePodSet(kueue.DefaultPodSetName, 1).Request(corev1.ResourceCPU, "1").Obj()).
ReserveQuota(utiltesting.MakeAdmission("cq").AssignmentPodCount(1).Obj()).
Admitted(true).
Expand Down
6 changes: 6 additions & 0 deletions pkg/util/testingjobs/mxjob/wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,3 +197,9 @@ func (j *MXJobWrapper) Active(rType kftraining.ReplicaType, c int32) *MXJobWrapp
}
return j
}

// StatusConditions updates status conditions of the MXJob.
func (j *MXJobWrapper) StatusConditions(conditions ...kftraining.JobCondition) *MXJobWrapper {
j.Status.Conditions = conditions
return j
}

0 comments on commit fa140c4

Please sign in to comment.