Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][GCS FT] Best-effort redis cleanup job #1766

Merged
merged 1 commit into from
Dec 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 18 additions & 18 deletions ray-operator/controllers/ray/raycluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,29 +267,27 @@ func (r *RayClusterReconciler) rayClusterReconcile(ctx context.Context, request
redisCleanupJob := redisCleanupJobs.Items[0]
r.Log.Info("Redis cleanup Job status", "Job name", redisCleanupJob.Name,
"Active", redisCleanupJob.Status.Active, "Succeeded", redisCleanupJob.Status.Succeeded, "Failed", redisCleanupJob.Status.Failed)
if redisCleanupJob.Status.Succeeded > 0 {
r.Log.Info(fmt.Sprintf(
"The Redis cleanup Job %s has been completed. "+
"The storage namespace %s in Redis has been fully deleted.",
redisCleanupJob.Name, redisCleanupJob.Annotations[utils.RayExternalStorageNSAnnotationKey]))
// Remove the finalizer from the RayCluster CR.
if condition, finished := utils.IsJobFinished(&redisCleanupJob); finished {
controllerutil.RemoveFinalizer(instance, utils.GCSFaultToleranceRedisCleanupFinalizer)
if err := r.Update(ctx, instance); err != nil {
r.Log.Error(err, "Failed to remove finalizer for RayCluster")
return ctrl.Result{RequeueAfter: DefaultRequeueDuration}, err
}
r.Log.Info(fmt.Sprintf(
"The Redis cleanup finalizer has been successfully removed from the RayCluster CR %s. "+
"We do not need to requeue the RayCluster CR anymore.", instance.Name))
switch condition {
case batchv1.JobComplete:
r.Log.Info(fmt.Sprintf(
"The Redis cleanup Job %s has been completed. "+
"The storage namespace %s in Redis has been fully deleted.",
redisCleanupJob.Name, redisCleanupJob.Annotations[utils.RayExternalStorageNSAnnotationKey]))
case batchv1.JobFailed:
r.Log.Info(fmt.Sprintf(
"The Redis cleanup Job %s has failed, requeue the RayCluster CR after 5 minute. "+
"You should manually delete the storage namespace %s in Redis and remove the RayCluster's finalizer. "+
"Please check https://docs.ray.io/en/master/cluster/kubernetes/user-guides/kuberay-gcs-ft.html for more details.",
redisCleanupJob.Name, redisCleanupJob.Annotations[utils.RayExternalStorageNSAnnotationKey]))
}
return ctrl.Result{}, nil
}
if redisCleanupJob.Status.Failed > 0 {
r.Log.Info(fmt.Sprintf(
"The Redis cleanup Job %s has failed, requeue the RayCluster CR after 5 minute. "+
"You should manually delete the storage namespace %s in Redis and remove the RayCluster's finalizer. "+
"Please check https://docs.ray.io/en/master/cluster/kubernetes/user-guides/kuberay-gcs-ft.html for more details.",
redisCleanupJob.Name, redisCleanupJob.Annotations[utils.RayExternalStorageNSAnnotationKey]))
return ctrl.Result{RequeueAfter: 5 * time.Minute}, nil
} else { // the redisCleanupJob is still running
return ctrl.Result{RequeueAfter: DefaultRequeueDuration}, nil
}
} else {
redisCleanupJob := r.buildRedisCleanupJob(*instance)
Expand Down Expand Up @@ -1169,6 +1167,8 @@ func (r *RayClusterReconciler) buildRedisCleanupJob(instance rayv1.RayCluster) b
ObjectMeta: pod.ObjectMeta,
Spec: pod.Spec,
},
// make this job be best-effort only for 5 minutes.
ActiveDeadlineSeconds: pointer.Int64(300),
},
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2230,10 +2230,12 @@ func Test_RedisCleanup(t *testing.T) {
assert.Nil(t, err, "Fail to get RayCluster list")
assert.Equal(t, 1, len(rayClusterList.Items))
assert.True(t, controllerutil.ContainsFinalizer(&rayClusterList.Items[0], utils.GCSFaultToleranceRedisCleanupFinalizer))
assert.Equal(t, int64(300), *jobList.Items[0].Spec.ActiveDeadlineSeconds)

// Simulate the Job succeeded.
job := jobList.Items[0]
job.Status.Succeeded = 1
job.Status.Conditions = []batchv1.JobCondition{{Type: batchv1.JobComplete, Status: corev1.ConditionTrue}}
err = fakeClient.Status().Update(ctx, &job)
assert.Nil(t, err, "Fail to update Job status")

Expand Down
13 changes: 13 additions & 0 deletions ray-operator/controllers/ray/utils/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"time"
"unicode"

batchv1 "k8s.io/api/batch/v1"
"k8s.io/apimachinery/pkg/util/json"

"k8s.io/apimachinery/pkg/util/rand"
Expand Down Expand Up @@ -508,3 +509,15 @@ func FindContainerPort(container *corev1.Container, portName string, defaultPort
}
return defaultPort
}

// IsJobFinished checks whether the given Job has finished execution.
rueian marked this conversation as resolved.
Show resolved Hide resolved
// It does not discriminate between successful and failed terminations.
// src: https://github.com/kubernetes/kubernetes/blob/a8a1abc25cad87333840cd7d54be2efaf31a3177/pkg/controller/job/utils.go#L26
func IsJobFinished(j *batchv1.Job) (batchv1.JobConditionType, bool) {
for _, c := range j.Status.Conditions {
if (c.Type == batchv1.JobComplete || c.Type == batchv1.JobFailed) && c.Status == corev1.ConditionTrue {
return c.Type, true
}
}
return "", false
}
Loading