diff --git a/ray-operator/controllers/ray/raycluster_controller.go b/ray-operator/controllers/ray/raycluster_controller.go index a8fe122699..b544f53187 100644 --- a/ray-operator/controllers/ray/raycluster_controller.go +++ b/ray-operator/controllers/ray/raycluster_controller.go @@ -267,29 +267,27 @@ func (r *RayClusterReconciler) rayClusterReconcile(ctx context.Context, request redisCleanupJob := redisCleanupJobs.Items[0] r.Log.Info("Redis cleanup Job status", "Job name", redisCleanupJob.Name, "Active", redisCleanupJob.Status.Active, "Succeeded", redisCleanupJob.Status.Succeeded, "Failed", redisCleanupJob.Status.Failed) - if redisCleanupJob.Status.Succeeded > 0 { - r.Log.Info(fmt.Sprintf( - "The Redis cleanup Job %s has been completed. "+ - "The storage namespace %s in Redis has been fully deleted.", - redisCleanupJob.Name, redisCleanupJob.Annotations[utils.RayExternalStorageNSAnnotationKey])) - // Remove the finalizer from the RayCluster CR. + if condition, finished := utils.IsJobFinished(&redisCleanupJob); finished { controllerutil.RemoveFinalizer(instance, utils.GCSFaultToleranceRedisCleanupFinalizer) if err := r.Update(ctx, instance); err != nil { - r.Log.Error(err, "Failed to remove finalizer for RayCluster") return ctrl.Result{RequeueAfter: DefaultRequeueDuration}, err } - r.Log.Info(fmt.Sprintf( - "The Redis cleanup finalizer has been successfully removed from the RayCluster CR %s. "+ - "We do not need to requeue the RayCluster CR anymore.", instance.Name)) + switch condition { + case batchv1.JobComplete: + r.Log.Info(fmt.Sprintf( + "The Redis cleanup Job %s has been completed. "+ + "The storage namespace %s in Redis has been fully deleted.", + redisCleanupJob.Name, redisCleanupJob.Annotations[utils.RayExternalStorageNSAnnotationKey])) + case batchv1.JobFailed: + r.Log.Info(fmt.Sprintf( + "The Redis cleanup Job %s has failed, requeue the RayCluster CR after 5 minute. "+ + "You should manually delete the storage namespace %s in Redis and remove the RayCluster's finalizer. "+ + "Please check https://docs.ray.io/en/master/cluster/kubernetes/user-guides/kuberay-gcs-ft.html for more details.", + redisCleanupJob.Name, redisCleanupJob.Annotations[utils.RayExternalStorageNSAnnotationKey])) + } return ctrl.Result{}, nil - } - if redisCleanupJob.Status.Failed > 0 { - r.Log.Info(fmt.Sprintf( - "The Redis cleanup Job %s has failed, requeue the RayCluster CR after 5 minute. "+ - "You should manually delete the storage namespace %s in Redis and remove the RayCluster's finalizer. "+ - "Please check https://docs.ray.io/en/master/cluster/kubernetes/user-guides/kuberay-gcs-ft.html for more details.", - redisCleanupJob.Name, redisCleanupJob.Annotations[utils.RayExternalStorageNSAnnotationKey])) - return ctrl.Result{RequeueAfter: 5 * time.Minute}, nil + } else { // the redisCleanupJob is still running + return ctrl.Result{RequeueAfter: DefaultRequeueDuration}, nil } } else { redisCleanupJob := r.buildRedisCleanupJob(*instance) @@ -1169,6 +1167,8 @@ func (r *RayClusterReconciler) buildRedisCleanupJob(instance rayv1.RayCluster) b ObjectMeta: pod.ObjectMeta, Spec: pod.Spec, }, + // make this job be best-effort only for 5 minutes. + ActiveDeadlineSeconds: pointer.Int64(300), }, } diff --git a/ray-operator/controllers/ray/raycluster_controller_fake_test.go b/ray-operator/controllers/ray/raycluster_controller_fake_test.go index 511305f198..0e40fb9b0d 100644 --- a/ray-operator/controllers/ray/raycluster_controller_fake_test.go +++ b/ray-operator/controllers/ray/raycluster_controller_fake_test.go @@ -2231,10 +2231,12 @@ func Test_RedisCleanup(t *testing.T) { assert.Nil(t, err, "Fail to get RayCluster list") assert.Equal(t, 1, len(rayClusterList.Items)) assert.True(t, controllerutil.ContainsFinalizer(&rayClusterList.Items[0], utils.GCSFaultToleranceRedisCleanupFinalizer)) + assert.Equal(t, int64(300), *jobList.Items[0].Spec.ActiveDeadlineSeconds) // Simulate the Job succeeded. job := jobList.Items[0] job.Status.Succeeded = 1 + job.Status.Conditions = []batchv1.JobCondition{{Type: batchv1.JobComplete, Status: corev1.ConditionTrue}} err = fakeClient.Status().Update(ctx, &job) assert.Nil(t, err, "Fail to update Job status") diff --git a/ray-operator/controllers/ray/utils/util.go b/ray-operator/controllers/ray/utils/util.go index 3927197493..1f765c92ad 100644 --- a/ray-operator/controllers/ray/utils/util.go +++ b/ray-operator/controllers/ray/utils/util.go @@ -12,6 +12,7 @@ import ( "time" "unicode" + batchv1 "k8s.io/api/batch/v1" "k8s.io/apimachinery/pkg/util/json" "k8s.io/apimachinery/pkg/util/rand" @@ -510,3 +511,15 @@ func FindContainerPort(container *corev1.Container, portName string, defaultPort } return defaultPort } + +// IsJobFinished checks whether the given Job has finished execution. +// It does not discriminate between successful and failed terminations. +// src: https://github.com/kubernetes/kubernetes/blob/a8a1abc25cad87333840cd7d54be2efaf31a3177/pkg/controller/job/utils.go#L26 +func IsJobFinished(j *batchv1.Job) (batchv1.JobConditionType, bool) { + for _, c := range j.Status.Conditions { + if (c.Type == batchv1.JobComplete || c.Type == batchv1.JobFailed) && c.Status == corev1.ConditionTrue { + return c.Type, true + } + } + return "", false +}