Skip to content

Commit

Permalink
[Feature] Support suspend in RayJob (#926)
Browse files Browse the repository at this point in the history
Native Kubernetes Jobs have a suspend flag that allows to temporarily suspend a Job execution and resume it later, or start Jobs in a suspended state and have a custom controller, such as Kueue, decide later when to start them.

So adding it to RayJob spec for consistency. Moreover, some frameworks like Kubeflow are adding it, so it becomes a standard functionality. An example implementation for MPIJob: kubeflow/mpi-operator#511

Implementation details
If a RayJob is created with a spec.suspend == true, then RayCluster instance (with corresponding Kubernetes resources) is not created and the Ray job is not submitted to the cluster. The JobDeploymentStatus is set to Suspended and the corresponding event is issued. The RayJob remains in this state until somebody unsuspends the job.

If suspend flips from true to false, then the RayJob controller immediately creates a RayCluster instance and submits the job.

If suspend flips from false to true while Job is running, then the RayJob controller tries to gracefully stop the job and deletes the RayCluster instance (with underlying Kubernetes resources). The JobDeploymentStatus is set to Suspended; JobStatus is set to STOPPED and the corresponding event is issued.

Edge case: suspend flag is ignored if a RayJob is submitted against an existing RayCluster instance (matched with ClusterSelector) since we can't delete a RayCluster created by somebody else.

No Kueue-specific code leaked to Kuberay implementation

Contributors from Kueue/Kubernetes cc'ed:

@alculquicondor
@mwielgus
  • Loading branch information
oginskis authored May 16, 2023
1 parent 28d07c9 commit 9bc5d85
Show file tree
Hide file tree
Showing 10 changed files with 466 additions and 68 deletions.
4 changes: 4 additions & 0 deletions helm-chart/kuberay-operator/crds/ray.io_rayjobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12060,6 +12060,10 @@ spec:
description: ShutdownAfterJobFinishes will determine whether to delete
the ray cluster once rayJob succeed or fai
type: boolean
suspend:
description: suspend specifies whether the RayJob controller should
create a RayCluster instance If a job is appl
type: boolean
ttlSecondsAfterFinished:
description: TTLSecondsAfterFinished is the TTL to clean up RayCluster.
format: int32
Expand Down
7 changes: 7 additions & 0 deletions ray-operator/apis/ray/v1alpha1/rayjob_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ const (
JobDeploymentStatusRunning JobDeploymentStatus = "Running"
JobDeploymentStatusFailedToGetJobStatus JobDeploymentStatus = "FailedToGetJobStatus"
JobDeploymentStatusComplete JobDeploymentStatus = "Complete"
JobDeploymentStatusSuspended JobDeploymentStatus = "Suspended"
)

// RayJobSpec defines the desired state of RayJob
Expand All @@ -61,6 +62,12 @@ type RayJobSpec struct {
RayClusterSpec *RayClusterSpec `json:"rayClusterSpec,omitempty"`
// clusterSelector is used to select running rayclusters by labels
ClusterSelector map[string]string `json:"clusterSelector,omitempty"`
// suspend specifies whether the RayJob controller should create a RayCluster instance
// If a job is applied with the suspend field set to true,
// the RayCluster will not be created and will wait for the transition to false.
// If the RayCluster is already created, it will be deleted.
// In case of transition to false a new RayCluster will be created.
Suspend bool `json:"suspend,omitempty"`
}

// RayJobStatus defines the observed state of RayJob
Expand Down
4 changes: 4 additions & 0 deletions ray-operator/config/crd/bases/ray.io_rayjobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12060,6 +12060,10 @@ spec:
description: ShutdownAfterJobFinishes will determine whether to delete
the ray cluster once rayJob succeed or fai
type: boolean
suspend:
description: suspend specifies whether the RayJob controller should
create a RayCluster instance If a job is appl
type: boolean
ttlSecondsAfterFinished:
description: TTLSecondsAfterFinished is the TTL to clean up RayCluster.
format: int32
Expand Down
1 change: 1 addition & 0 deletions ray-operator/config/samples/ray_v1alpha1_rayjob.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ kind: RayJob
metadata:
name: rayjob-sample
spec:
suspend: false
entrypoint: python /home/ray/samples/sample_code.py
# runtimeEnv decoded to '{
# "pip": [
Expand Down
128 changes: 97 additions & 31 deletions ray-operator/controllers/ray/rayjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

"github.com/ray-project/kuberay/ray-operator/controllers/ray/common"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
Expand Down Expand Up @@ -92,7 +93,7 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
if isJobPendingOrRunning(rayJobInstance.Status.JobStatus) {
rayDashboardClient := utils.GetRayDashboardClientFunc()
rayDashboardClient.InitClient(rayJobInstance.Status.DashboardURL)
err := rayDashboardClient.StopJob(rayJobInstance.Status.JobId, &r.Log)
err := rayDashboardClient.StopJob(ctx, rayJobInstance.Status.JobId, &r.Log)
if err != nil {
r.Log.Info("Failed to stop job", "error", err)
}
Expand Down Expand Up @@ -150,6 +151,20 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
err = r.updateState(ctx, rayJobInstance, nil, rayJobInstance.Status.JobStatus, rayv1alpha1.JobDeploymentStatusFailedToGetOrCreateRayCluster, err)
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
}
// If there is no cluster instance and no error suspend the job deployment
if rayClusterInstance == nil {
// Already suspended?
if rayJobInstance.Status.JobDeploymentStatus == rayv1alpha1.JobDeploymentStatusSuspended {
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
}
err = r.updateState(ctx, rayJobInstance, nil, rayJobInstance.Status.JobStatus, rayv1alpha1.JobDeploymentStatusSuspended, err)
if err != nil {
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
}
r.Log.Info("rayJob suspended", "RayJob", rayJobInstance.Name)
r.Recorder.Eventf(rayJobInstance, corev1.EventTypeNormal, "Suspended", "Suspended RayJob %s", rayJobInstance.Name)
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
}

// Always update RayClusterStatus along with jobStatus and jobDeploymentStatus updates.
rayJobInstance.Status.RayClusterStatus = rayClusterInstance.Status
Expand Down Expand Up @@ -178,7 +193,7 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
}

// Check the current status of ray jobs before submitting.
jobInfo, err := rayDashboardClient.GetJobInfo(rayJobInstance.Status.JobId)
jobInfo, err := rayDashboardClient.GetJobInfo(ctx, rayJobInstance.Status.JobId)
if err != nil {
err = r.updateState(ctx, rayJobInstance, jobInfo, rayJobInstance.Status.JobStatus, rayv1alpha1.JobDeploymentStatusFailedToGetJobStatus, err)
// Dashboard service in head pod takes time to start, it's possible we get connection refused error.
Expand All @@ -189,7 +204,7 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
r.Log.V(1).Info("RayJob information", "RayJob", rayJobInstance.Name, "jobInfo", jobInfo, "rayJobInstance", rayJobInstance.Status.JobStatus)
if jobInfo == nil {
// Submit the job if no id set
jobId, err := rayDashboardClient.SubmitJob(rayJobInstance, &r.Log)
jobId, err := rayDashboardClient.SubmitJob(ctx, rayJobInstance, &r.Log)
if err != nil {
r.Log.Error(err, "failed to submit job")
err = r.updateState(ctx, rayJobInstance, jobInfo, rayJobInstance.Status.JobStatus, rayv1alpha1.JobDeploymentStatusFailedJobDeploy, err)
Expand All @@ -213,9 +228,48 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
return ctrl.Result{}, err
}

// Job may takes long time to start and finish, let's just periodically requeue the job and check status.
if isJobPendingOrRunning(jobInfo.JobStatus) && rayJobInstance.Status.JobDeploymentStatus == rayv1alpha1.JobDeploymentStatusRunning {
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, nil
if rayJobInstance.Status.JobDeploymentStatus == rayv1alpha1.JobDeploymentStatusRunning {
// If suspend flag is set AND
// the RayJob is submitted against the RayCluster created by THIS job, then
// try to gracefully stop the Ray job and delete (suspend) the cluster
if rayJobInstance.Spec.Suspend && len(rayJobInstance.Spec.ClusterSelector) == 0 {
info, err := rayDashboardClient.GetJobInfo(ctx, rayJobInstance.Status.JobId)
if err != nil {
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
}
if !rayv1alpha1.IsJobTerminal(info.JobStatus) {
err := rayDashboardClient.StopJob(ctx, rayJobInstance.Status.JobId, &r.Log)
if err != nil {
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
}
}
if info.JobStatus != rayv1alpha1.JobStatusStopped {
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, nil
}

_, err = r.deleteCluster(ctx, rayJobInstance)
if err != nil && !errors.IsNotFound(err) {
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, nil
}
// Since RayCluster instance is gone, remove it status also
// on RayJob resource
rayJobInstance.Status.RayClusterStatus = rayv1alpha1.RayClusterStatus{}
rayJobInstance.Status.RayClusterName = ""
rayJobInstance.Status.DashboardURL = ""
rayJobInstance.Status.JobId = ""
rayJobInstance.Status.Message = ""
err = r.updateState(ctx, rayJobInstance, jobInfo, rayv1alpha1.JobStatusStopped, rayv1alpha1.JobDeploymentStatusSuspended, nil)
if err != nil {
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
}
r.Log.Info("rayJob suspended", "RayJob", rayJobInstance.Name)
r.Recorder.Eventf(rayJobInstance, corev1.EventTypeNormal, "Suspended", "Suspended RayJob %s", rayJobInstance.Name)
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, nil
// Job may takes long time to start and finish, let's just periodically requeue the job and check status.
}
if isJobPendingOrRunning(jobInfo.JobStatus) {
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, nil
}
}

// Let's use rayJobInstance.Status.JobStatus to make sure we only delete cluster after the CR is updated.
Expand All @@ -231,34 +285,38 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
return ctrl.Result{RequeueAfter: time.Duration(delta) * time.Second}, nil
}
}

r.Log.Info("shutdownAfterJobFinishes set to true, we will delete cluster",
"RayJob", rayJobInstance.Name, "clusterName", fmt.Sprintf("%s/%s", rayJobInstance.Namespace, rayJobInstance.Status.RayClusterName))
clusterIdentifier := types.NamespacedName{
Name: rayJobInstance.Status.RayClusterName,
Namespace: rayJobInstance.Namespace,
}
cluster := rayv1alpha1.RayCluster{}
if err := r.Get(ctx, clusterIdentifier, &cluster); err != nil {
if !errors.IsNotFound(err) {
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
}
r.Log.Info("The associated cluster has been already deleted and it can not be found", "RayCluster", clusterIdentifier)
} else {
if cluster.DeletionTimestamp != nil {
r.Log.Info("The cluster deletion is ongoing.", "rayjob", rayJobInstance.Name, "raycluster", cluster.Name)
} else {
if err := r.Delete(ctx, &cluster); err != nil {
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
}
r.Log.Info("The associated cluster is deleted", "RayCluster", clusterIdentifier)
r.Recorder.Eventf(rayJobInstance, corev1.EventTypeNormal, "Deleted", "Deleted cluster %s", rayJobInstance.Status.RayClusterName)
return ctrl.Result{Requeue: true}, nil
}
}
return r.deleteCluster(ctx, rayJobInstance)
}
}
return ctrl.Result{}, nil
}

func (r *RayJobReconciler) deleteCluster(ctx context.Context, rayJobInstance *rayv1alpha1.RayJob) (reconcile.Result, error) {
clusterIdentifier := types.NamespacedName{
Name: rayJobInstance.Status.RayClusterName,
Namespace: rayJobInstance.Namespace,
}
cluster := rayv1alpha1.RayCluster{}
if err := r.Get(ctx, clusterIdentifier, &cluster); err != nil {
if !errors.IsNotFound(err) {
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
}
r.Log.Info("The associated cluster has been already deleted and it can not be found", "RayCluster", clusterIdentifier)
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
} else {
if cluster.DeletionTimestamp != nil {
r.Log.Info("The cluster deletion is ongoing.", "rayjob", rayJobInstance.Name, "raycluster", cluster.Name)
} else {
if err := r.Delete(ctx, &cluster); err != nil {
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
}
r.Log.Info("The associated cluster is deleted", "RayCluster", clusterIdentifier)
r.Recorder.Eventf(rayJobInstance, corev1.EventTypeNormal, "Deleted", "Deleted cluster %s", rayJobInstance.Status.RayClusterName)
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, nil
}
}
return ctrl.Result{}, nil
}

Expand Down Expand Up @@ -343,7 +401,11 @@ func (r *RayJobReconciler) updateState(ctx context.Context, rayJob *rayv1alpha1.
if jobInfo != nil {
rayJob.Status.Message = jobInfo.Message
rayJob.Status.StartTime = utils.ConvertUnixTimeToMetav1Time(jobInfo.StartTime)
rayJob.Status.EndTime = utils.ConvertUnixTimeToMetav1Time(jobInfo.EndTime)
if jobInfo.StartTime >= jobInfo.EndTime {
rayJob.Status.EndTime = nil
} else {
rayJob.Status.EndTime = utils.ConvertUnixTimeToMetav1Time(jobInfo.EndTime)
}
}

// TODO (kevin85421): ObservedGeneration should be used to determine whether update this CR or not.
Expand Down Expand Up @@ -391,11 +453,15 @@ func (r *RayJobReconciler) getOrCreateRayClusterInstance(ctx context.Context, ra
return nil, err
}

// one special case is the job is complete status and cluster has been recycled.
// special case: is the job is complete status and cluster has been recycled.
if isJobSucceedOrFailed(rayJobInstance.Status.JobStatus) && rayJobInstance.Status.JobDeploymentStatus == rayv1alpha1.JobDeploymentStatusComplete {
r.Log.Info("The cluster has been recycled for the job, skip duplicate creation", "rayjob", rayJobInstance.Name)
return nil, err
}
// special case: don't create a cluster instance and don't return an error if the suspend flag of the job is true
if rayJobInstance.Spec.Suspend {
return nil, nil
}

r.Log.Info("RayCluster not found, creating rayCluster!", "raycluster", rayClusterNamespacedName)
rayClusterInstance, err = r.constructRayClusterForRayJob(rayJobInstance, rayClusterInstanceName)
Expand Down
Loading

0 comments on commit 9bc5d85

Please sign in to comment.