Skip to content

Commit

Permalink
[RayJob] Fix RayJob status reconciliation (#1539)
Browse files Browse the repository at this point in the history
  • Loading branch information
astefanutti authored Oct 24, 2023
1 parent 4432b78 commit 528abc3
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 47 deletions.
45 changes: 34 additions & 11 deletions ray-operator/controllers/ray/rayjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
"k8s.io/utils/pointer"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
Expand Down Expand Up @@ -154,8 +155,9 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
}
}

// Set rayClusterName and rayJobId first, to avoid duplicate submission
err = r.setRayJobIdAndRayClusterNameIfNeed(ctx, rayJobInstance)
// Set rayClusterName and rayJobId first, to avoid duplicate submission.
// Initialize the job status to Pending and deployment status to Initializing.
err = r.initRayJobStatusIfNeed(ctx, rayJobInstance)
if err != nil {
r.Log.Error(err, "failed to set jobId or rayCluster name", "RayJob", request.NamespacedName)
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
Expand Down Expand Up @@ -305,7 +307,9 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)

// Let's use rayJobInstance.Status.JobStatus to make sure we only delete cluster after the CR is updated.
if isJobSucceedOrFailed(rayJobInstance.Status.JobStatus) && rayJobInstance.Status.JobDeploymentStatus == rayv1.JobDeploymentStatusRunning {
if rayJobInstance.Spec.ShutdownAfterJobFinishes {
if rayJobInstance.Spec.ShutdownAfterJobFinishes && len(rayJobInstance.Spec.ClusterSelector) == 0 {
// the RayJob is submitted against the RayCluster created by THIS job, so we can tear that
// RayCluster down.
if rayJobInstance.Spec.TTLSecondsAfterFinished != nil {
r.Log.V(3).Info("TTLSecondsAfterSetting", "end_time", rayJobInstance.Status.EndTime.Time, "now", time.Now(), "ttl", *rayJobInstance.Spec.TTLSecondsAfterFinished)
ttlDuration := time.Duration(*rayJobInstance.Spec.TTLSecondsAfterFinished) * time.Second
Expand All @@ -324,7 +328,14 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
}
}
}
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, nil

if isJobPendingOrRunning(rayJobInstance.Status.JobStatus) {
// Requeue the RayJob to poll its status from the running Ray job
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, nil
}
// Otherwise only reconcile the RayJob upon new events for watched resources
// to avoid infinite reconciliation.
return ctrl.Result{}, nil
}

// getOrCreateK8sJob creates a Kubernetes Job for the Ray Job if it doesn't exist, otherwise returns the existing one. It returns the Job name and a boolean indicating whether the Job was created.
Expand All @@ -341,7 +352,7 @@ func (r *RayJobReconciler) getOrCreateK8sJob(ctx context.Context, rayJobInstance
r.Log.Error(err, "failed to get submitter template")
return "", false, err
}
return r.createNewK8sJob(ctx, rayJobInstance, submitterTemplate, rayClusterInstance)
return r.createNewK8sJob(ctx, rayJobInstance, submitterTemplate)
}

// Some other error occurred while trying to get the Job
Expand Down Expand Up @@ -393,19 +404,27 @@ func (r *RayJobReconciler) getSubmitterTemplate(rayJobInstance *rayv1.RayJob, ra
}

// createNewK8sJob creates a new Kubernetes Job. It returns the Job's name and a boolean indicating whether a new Job was created.
func (r *RayJobReconciler) createNewK8sJob(ctx context.Context, rayJobInstance *rayv1.RayJob, submitterTemplate v1.PodTemplateSpec, rayClusterInstance *rayv1.RayCluster) (string, bool, error) {
func (r *RayJobReconciler) createNewK8sJob(ctx context.Context, rayJobInstance *rayv1.RayJob, submitterTemplate v1.PodTemplateSpec) (string, bool, error) {
job := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: rayJobInstance.Name,
Namespace: rayJobInstance.Namespace,
Labels: map[string]string{
common.KubernetesCreatedByLabelKey: common.ComponentName,
},
},
Spec: batchv1.JobSpec{
Template: submitterTemplate,
// Reduce the number of retries, which defaults to 6, so the ray job submission command
// is attempted 3 times at the maximum, but still mitigates the case of unrecoverable
// application-level errors, where the maximum number of retries is reached, and the job
// completion time increases with no benefits, but wasted resource cycles.
BackoffLimit: pointer.Int32(2),
Template: submitterTemplate,
},
}

// Set the ownership in order to do the garbage collection by k8s.
if err := ctrl.SetControllerReference(rayClusterInstance, job, r.Scheme); err != nil {
if err := ctrl.SetControllerReference(rayJobInstance, job, r.Scheme); err != nil {
r.Log.Error(err, "failed to set controller reference")
return "", false, err
}
Expand Down Expand Up @@ -463,10 +482,11 @@ func (r *RayJobReconciler) SetupWithManager(mgr ctrl.Manager) error {
For(&rayv1.RayJob{}).
Owns(&rayv1.RayCluster{}).
Owns(&corev1.Service{}).
Owns(&batchv1.Job{}).
Complete(r)
}

func (r *RayJobReconciler) setRayJobIdAndRayClusterNameIfNeed(ctx context.Context, rayJob *rayv1.RayJob) error {
func (r *RayJobReconciler) initRayJobStatusIfNeed(ctx context.Context, rayJob *rayv1.RayJob) error {
shouldUpdateStatus := false
if rayJob.Status.JobId == "" {
shouldUpdateStatus = true
Expand All @@ -488,13 +508,16 @@ func (r *RayJobReconciler) setRayJobIdAndRayClusterNameIfNeed(ctx context.Contex
return fmt.Errorf("failed to get cluster name in ClusterSelector map, the default key is %v", RayJobDefaultClusterSelectorKey)
}
rayJob.Status.RayClusterName = useValue
rayJob.Spec.ShutdownAfterJobFinishes = false
return nil
} else {
rayJob.Status.RayClusterName = utils.GenerateRayClusterName(rayJob.Name)
}
}

if rayJob.Status.JobStatus == "" {
shouldUpdateStatus = true
rayJob.Status.JobStatus = rayv1.JobStatusPending
}

if shouldUpdateStatus {
return r.updateState(ctx, rayJob, nil, rayJob.Status.JobStatus, rayv1.JobDeploymentStatusInitializing, nil)
}
Expand Down
93 changes: 57 additions & 36 deletions ray-operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,25 @@ import (
"strings"

"github.com/go-logr/zapr"
"go.uber.org/zap"
"gopkg.in/natefinch/lumberjack.v2"

routev1 "github.com/openshift/api/route/v1"
"github.com/ray-project/kuberay/ray-operator/controllers/ray"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/common"

routev1 "github.com/openshift/api/route/v1"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"gopkg.in/natefinch/lumberjack.v2"

batchv1 "k8s.io/api/batch/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/selection"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/healthz"
k8szap "sigs.k8s.io/controller-runtime/pkg/log/zap"

rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
Expand All @@ -39,7 +43,8 @@ var (
func init() {
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
utilruntime.Must(rayv1.AddToScheme(scheme))
utilruntime.Must(routev1.AddToScheme(scheme))
utilruntime.Must(routev1.Install(scheme))
utilruntime.Must(batchv1.AddToScheme(scheme))
batchscheduler.AddToScheme(scheme)
// +kubebuilder:scaffold:scheme
}
Expand Down Expand Up @@ -116,7 +121,7 @@ func main() {
setupLog.Info("Feature flag enable-batch-scheduler is enabled.")
}

watchNamespaces := strings.Split(watchNamespace, ",")
// Manager options
options := ctrl.Options{
Scheme: scheme,
MetricsBindAddress: metricsAddr,
Expand All @@ -126,50 +131,66 @@ func main() {
LeaderElectionID: "ray-operator-leader",
}

if len(watchNamespaces) == 1 { // It is not possible for len(watchNamespaces) == 0 to be true. The length of `strings.Split("", ",")`` is still 1.
// Manager Cache
// Set the informers label selectors to narrow the scope of the resources being watched and cached.
// This improves the scalability of the system, both for KubeRay itself by reducing the size of the
// informers cache, and for the API server / etcd, by reducing the number of watch events.
// For example, KubeRay is only interested in the batch Jobs it creates when reconciling RayJobs,
// so the controller sets the app.kubernetes.io/created-by=kuberay-operator label on any Job it creates,
// and that label is provided to the manager cache as a selector for Job resources.
selectorsByObject, err := cacheSelectors()
exitOnError(err, "unable to create cache selectors")
if watchNamespaces := strings.Split(watchNamespace, ","); len(watchNamespaces) == 1 { // It is not possible for len(watchNamespaces) == 0 to be true. The length of `strings.Split("", ",")` is still 1.
options.Namespace = watchNamespaces[0]
if watchNamespaces[0] == "" {
setupLog.Info("Flag watchNamespace is not set. Watch custom resources in all namespaces.")
} else {
setupLog.Info(fmt.Sprintf("Only watch custom resources in the namespace: %s", watchNamespaces[0]))
}
options.NewCache = cache.BuilderWithOptions(cache.Options{SelectorsByObject: selectorsByObject})
} else {
options.NewCache = cache.MultiNamespacedCacheBuilder(watchNamespaces)
options.NewCache = func(config *rest.Config, opts cache.Options) (cache.Cache, error) {
opts.SelectorsByObject = selectorsByObject
return cache.MultiNamespacedCacheBuilder(watchNamespaces)(config, opts)
}
setupLog.Info(fmt.Sprintf("Only watch custom resources in multiple namespaces: %v", watchNamespaces))
}

setupLog.Info("Setup manager")
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), options)
if err != nil {
setupLog.Error(err, "unable to start manager")
os.Exit(1)
}
exitOnError(err, "unable to start manager")

exitOnError(ray.NewReconciler(mgr).SetupWithManager(mgr, reconcileConcurrency),
"unable to create controller", "controller", "RayCluster")
exitOnError(ray.NewRayServiceReconciler(mgr).SetupWithManager(mgr),
"unable to create controller", "controller", "RayService")
exitOnError(ray.NewRayJobReconciler(mgr).SetupWithManager(mgr),
"unable to create controller", "controller", "RayJob")

if err = ray.NewReconciler(mgr).SetupWithManager(mgr, reconcileConcurrency); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "RayCluster")
os.Exit(1)
}
if err = ray.NewRayServiceReconciler(mgr).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "RayService")
os.Exit(1)
}
if err = ray.NewRayJobReconciler(mgr).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "RayJob")
os.Exit(1)
}
// +kubebuilder:scaffold:builder

if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {
setupLog.Error(err, "unable to set up health check")
os.Exit(1)
}
if err := mgr.AddReadyzCheck("readyz", healthz.Ping); err != nil {
setupLog.Error(err, "unable to set up ready check")
os.Exit(1)
}
exitOnError(mgr.AddHealthzCheck("healthz", healthz.Ping), "unable to set up health check")
exitOnError(mgr.AddReadyzCheck("readyz", healthz.Ping), "unable to set up ready check")

setupLog.Info("starting manager")
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
setupLog.Error(err, "problem running manager")
exitOnError(mgr.Start(ctrl.SetupSignalHandler()), "problem running manager")
}

func cacheSelectors() (cache.SelectorsByObject, error) {
label, err := labels.NewRequirement(common.KubernetesCreatedByLabelKey, selection.Equals, []string{common.ComponentName})
if err != nil {
return nil, err
}
selector := labels.NewSelector().Add(*label)

return cache.SelectorsByObject{
&batchv1.Job{}: {Label: selector},
}, nil
}

func exitOnError(err error, msg string, keysAndValues ...interface{}) {
if err != nil {
setupLog.Error(err, msg, keysAndValues)
os.Exit(1)
}
}

0 comments on commit 528abc3

Please sign in to comment.