Skip to content

Commit

Permalink
[Feature][GCS FT] Best-effort redis cleanup job for 5 minutes (#1766)
Browse files Browse the repository at this point in the history
  • Loading branch information
rueian authored Dec 24, 2023
1 parent 3066e42 commit cfa1203
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 18 deletions.
36 changes: 18 additions & 18 deletions ray-operator/controllers/ray/raycluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,29 +267,27 @@ func (r *RayClusterReconciler) rayClusterReconcile(ctx context.Context, request
redisCleanupJob := redisCleanupJobs.Items[0]
r.Log.Info("Redis cleanup Job status", "Job name", redisCleanupJob.Name,
"Active", redisCleanupJob.Status.Active, "Succeeded", redisCleanupJob.Status.Succeeded, "Failed", redisCleanupJob.Status.Failed)
if redisCleanupJob.Status.Succeeded > 0 {
r.Log.Info(fmt.Sprintf(
"The Redis cleanup Job %s has been completed. "+
"The storage namespace %s in Redis has been fully deleted.",
redisCleanupJob.Name, redisCleanupJob.Annotations[utils.RayExternalStorageNSAnnotationKey]))
// Remove the finalizer from the RayCluster CR.
if condition, finished := utils.IsJobFinished(&redisCleanupJob); finished {
controllerutil.RemoveFinalizer(instance, utils.GCSFaultToleranceRedisCleanupFinalizer)
if err := r.Update(ctx, instance); err != nil {
r.Log.Error(err, "Failed to remove finalizer for RayCluster")
return ctrl.Result{RequeueAfter: DefaultRequeueDuration}, err
}
r.Log.Info(fmt.Sprintf(
"The Redis cleanup finalizer has been successfully removed from the RayCluster CR %s. "+
"We do not need to requeue the RayCluster CR anymore.", instance.Name))
switch condition {
case batchv1.JobComplete:
r.Log.Info(fmt.Sprintf(
"The Redis cleanup Job %s has been completed. "+
"The storage namespace %s in Redis has been fully deleted.",
redisCleanupJob.Name, redisCleanupJob.Annotations[utils.RayExternalStorageNSAnnotationKey]))
case batchv1.JobFailed:
r.Log.Info(fmt.Sprintf(
"The Redis cleanup Job %s has failed, requeue the RayCluster CR after 5 minute. "+
"You should manually delete the storage namespace %s in Redis and remove the RayCluster's finalizer. "+
"Please check https://docs.ray.io/en/master/cluster/kubernetes/user-guides/kuberay-gcs-ft.html for more details.",
redisCleanupJob.Name, redisCleanupJob.Annotations[utils.RayExternalStorageNSAnnotationKey]))
}
return ctrl.Result{}, nil
}
if redisCleanupJob.Status.Failed > 0 {
r.Log.Info(fmt.Sprintf(
"The Redis cleanup Job %s has failed, requeue the RayCluster CR after 5 minute. "+
"You should manually delete the storage namespace %s in Redis and remove the RayCluster's finalizer. "+
"Please check https://docs.ray.io/en/master/cluster/kubernetes/user-guides/kuberay-gcs-ft.html for more details.",
redisCleanupJob.Name, redisCleanupJob.Annotations[utils.RayExternalStorageNSAnnotationKey]))
return ctrl.Result{RequeueAfter: 5 * time.Minute}, nil
} else { // the redisCleanupJob is still running
return ctrl.Result{RequeueAfter: DefaultRequeueDuration}, nil
}
} else {
redisCleanupJob := r.buildRedisCleanupJob(*instance)
Expand Down Expand Up @@ -1169,6 +1167,8 @@ func (r *RayClusterReconciler) buildRedisCleanupJob(instance rayv1.RayCluster) b
ObjectMeta: pod.ObjectMeta,
Spec: pod.Spec,
},
// make this job be best-effort only for 5 minutes.
ActiveDeadlineSeconds: pointer.Int64(300),
},
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2231,10 +2231,12 @@ func Test_RedisCleanup(t *testing.T) {
assert.Nil(t, err, "Fail to get RayCluster list")
assert.Equal(t, 1, len(rayClusterList.Items))
assert.True(t, controllerutil.ContainsFinalizer(&rayClusterList.Items[0], utils.GCSFaultToleranceRedisCleanupFinalizer))
assert.Equal(t, int64(300), *jobList.Items[0].Spec.ActiveDeadlineSeconds)

// Simulate the Job succeeded.
job := jobList.Items[0]
job.Status.Succeeded = 1
job.Status.Conditions = []batchv1.JobCondition{{Type: batchv1.JobComplete, Status: corev1.ConditionTrue}}
err = fakeClient.Status().Update(ctx, &job)
assert.Nil(t, err, "Fail to update Job status")

Expand Down
13 changes: 13 additions & 0 deletions ray-operator/controllers/ray/utils/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"time"
"unicode"

batchv1 "k8s.io/api/batch/v1"
"k8s.io/apimachinery/pkg/util/json"

"k8s.io/apimachinery/pkg/util/rand"
Expand Down Expand Up @@ -510,3 +511,15 @@ func FindContainerPort(container *corev1.Container, portName string, defaultPort
}
return defaultPort
}

// IsJobFinished checks whether the given Job has finished execution.
// It does not discriminate between successful and failed terminations.
// src: https://github.com/kubernetes/kubernetes/blob/a8a1abc25cad87333840cd7d54be2efaf31a3177/pkg/controller/job/utils.go#L26
func IsJobFinished(j *batchv1.Job) (batchv1.JobConditionType, bool) {
for _, c := range j.Status.Conditions {
if (c.Type == batchv1.JobComplete || c.Type == batchv1.JobFailed) && c.Status == corev1.ConditionTrue {
return c.Type, true
}
}
return "", false
}

0 comments on commit cfa1203

Please sign in to comment.