-
Notifications
You must be signed in to change notification settings - Fork 442
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
RayCluster updates status frequently #1211
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,6 +4,7 @@ import ( | |
"context" | ||
"fmt" | ||
"os" | ||
"reflect" | ||
"strconv" | ||
"strings" | ||
"time" | ||
|
@@ -198,6 +199,9 @@ func (r *RayClusterReconciler) eventReconcile(ctx context.Context, request ctrl. | |
} | ||
|
||
func (r *RayClusterReconciler) rayClusterReconcile(ctx context.Context, request ctrl.Request, instance *rayv1alpha1.RayCluster) (ctrl.Result, error) { | ||
// Please do NOT modify `originalRayClusterInstance` in the following code. | ||
originalRayClusterInstance := instance.DeepCopy() | ||
|
||
_ = r.Log.WithValues("raycluster", request.NamespacedName) | ||
r.Log.Info("reconciling RayCluster", "cluster name", request.Name) | ||
|
||
|
@@ -247,19 +251,26 @@ func (r *RayClusterReconciler) rayClusterReconcile(ctx context.Context, request | |
r.Recorder.Event(instance, corev1.EventTypeWarning, string(rayv1alpha1.PodReconciliationError), err.Error()) | ||
return ctrl.Result{RequeueAfter: DefaultRequeueDuration}, err | ||
} | ||
// update the status if needed | ||
if err := r.updateStatus(ctx, instance); err != nil { | ||
if errors.IsNotFound(err) { | ||
r.Log.Info("Update status not found error", "cluster name", request.Name) | ||
} else { | ||
r.Log.Error(err, "Update status error", "cluster name", request.Name) | ||
|
||
// Calculate the new status for the RayCluster. Note that the function will deep copy `instance` instead of mutating it. | ||
newInstance, err := r.calculateStatus(ctx, instance) | ||
if err != nil { | ||
r.Log.Info("Got error when calculating new status", "cluster name", request.Name, "error", err) | ||
return ctrl.Result{RequeueAfter: DefaultRequeueDuration}, err | ||
} | ||
|
||
// Check if need to update the status. | ||
if r.inconsistentRayClusterStatus(originalRayClusterInstance.Status, newInstance.Status) { | ||
r.Log.Info("rayClusterReconcile", "Update CR status", request.Name, "status", newInstance.Status) | ||
if err := r.Status().Update(ctx, newInstance); err != nil { | ||
r.Log.Info("Got error when updating status", "cluster name", request.Name, "error", err, "RayCluster", newInstance) | ||
return ctrl.Result{RequeueAfter: DefaultRequeueDuration}, err | ||
} | ||
} | ||
|
||
// Unconditionally requeue after the number of seconds specified in the | ||
// environment variable RAYCLUSTER_DEFAULT_REQUEUE_SECONDS_ENV. If the | ||
// environment variable is not set, requeue after the default value. | ||
var requeueAfterSeconds int | ||
requeueAfterSeconds, err := strconv.Atoi(os.Getenv(common.RAYCLUSTER_DEFAULT_REQUEUE_SECONDS_ENV)) | ||
if err != nil { | ||
r.Log.Info(fmt.Sprintf("Environment variable %s is not set, using default value of %d seconds", common.RAYCLUSTER_DEFAULT_REQUEUE_SECONDS_ENV, common.RAYCLUSTER_DEFAULT_REQUEUE_SECONDS), "cluster name", request.Name) | ||
|
@@ -269,6 +280,37 @@ func (r *RayClusterReconciler) rayClusterReconcile(ctx context.Context, request | |
return ctrl.Result{RequeueAfter: time.Duration(requeueAfterSeconds) * time.Second}, nil | ||
} | ||
|
||
// Checks whether the old and new RayClusterStatus are inconsistent by comparing different fields. If the only | ||
// differences between the old and new status are the `LastUpdateTime` and `ObservedGeneration` fields, the | ||
// status update will not be triggered. | ||
// | ||
// TODO (kevin85421): The field `ObservedGeneration` is not being well-maintained at the moment. In the future, | ||
// this field should be used to determine whether to update this CR or not. | ||
func (r *RayClusterReconciler) inconsistentRayClusterStatus(oldStatus rayv1alpha1.RayClusterStatus, newStatus rayv1alpha1.RayClusterStatus) bool { | ||
if oldStatus.State != newStatus.State || oldStatus.Reason != newStatus.Reason { | ||
r.Log.Info("inconsistentRayClusterStatus", "detect inconsistency", fmt.Sprintf( | ||
"old State: %s, new State: %s, old Reason: %s, new Reason: %s", | ||
oldStatus.State, newStatus.State, oldStatus.Reason, newStatus.Reason)) | ||
return true | ||
} | ||
if oldStatus.AvailableWorkerReplicas != newStatus.AvailableWorkerReplicas || oldStatus.DesiredWorkerReplicas != newStatus.DesiredWorkerReplicas || | ||
oldStatus.MinWorkerReplicas != newStatus.MinWorkerReplicas || oldStatus.MaxWorkerReplicas != newStatus.MaxWorkerReplicas { | ||
r.Log.Info("inconsistentRayClusterStatus", "detect inconsistency", fmt.Sprintf( | ||
"old AvailableWorkerReplicas: %d, new AvailableWorkerReplicas: %d, old DesiredWorkerReplicas: %d, new DesiredWorkerReplicas: %d, "+ | ||
"old MinWorkerReplicas: %d, new MinWorkerReplicas: %d, old MaxWorkerReplicas: %d, new MaxWorkerReplicas: %d", | ||
oldStatus.AvailableWorkerReplicas, newStatus.AvailableWorkerReplicas, oldStatus.DesiredWorkerReplicas, newStatus.DesiredWorkerReplicas, | ||
oldStatus.MinWorkerReplicas, newStatus.MinWorkerReplicas, oldStatus.MaxWorkerReplicas, newStatus.MaxWorkerReplicas)) | ||
return true | ||
} | ||
if !reflect.DeepEqual(oldStatus.Endpoints, newStatus.Endpoints) || !reflect.DeepEqual(oldStatus.Head, newStatus.Head) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. any specific reason to not include this as part of if statement in line There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can merge L290, L296, and L305 into a single |
||
r.Log.Info("inconsistentRayClusterStatus", "detect inconsistency", fmt.Sprintf( | ||
"old Endpoints: %v, new Endpoints: %v, old Head: %v, new Head: %v", | ||
oldStatus.Endpoints, newStatus.Endpoints, oldStatus.Head, newStatus.Head)) | ||
return true | ||
} | ||
return false | ||
} | ||
|
||
func (r *RayClusterReconciler) reconcileIngress(ctx context.Context, instance *rayv1alpha1.RayCluster) error { | ||
if instance.Spec.HeadGroupSpec.EnableIngress == nil || !*instance.Spec.HeadGroupSpec.EnableIngress { | ||
return nil | ||
|
@@ -841,50 +883,50 @@ func (r *RayClusterReconciler) SetupWithManager(mgr ctrl.Manager, reconcileConcu | |
Complete(r) | ||
} | ||
|
||
func (r *RayClusterReconciler) updateStatus(ctx context.Context, instance *rayv1alpha1.RayCluster) error { | ||
func (r *RayClusterReconciler) calculateStatus(ctx context.Context, instance *rayv1alpha1.RayCluster) (*rayv1alpha1.RayCluster, error) { | ||
// Deep copy the instance, so we don't mutate the original object. | ||
newInstance := instance.DeepCopy() | ||
|
||
// TODO (kevin85421): ObservedGeneration should be used to determine whether to update this CR or not. | ||
instance.Status.ObservedGeneration = instance.ObjectMeta.Generation | ||
newInstance.Status.ObservedGeneration = newInstance.ObjectMeta.Generation | ||
|
||
runtimePods := corev1.PodList{} | ||
filterLabels := client.MatchingLabels{common.RayClusterLabelKey: instance.Name} | ||
if err := r.List(ctx, &runtimePods, client.InNamespace(instance.Namespace), filterLabels); err != nil { | ||
return err | ||
filterLabels := client.MatchingLabels{common.RayClusterLabelKey: newInstance.Name} | ||
if err := r.List(ctx, &runtimePods, client.InNamespace(newInstance.Namespace), filterLabels); err != nil { | ||
return nil, err | ||
} | ||
|
||
instance.Status.AvailableWorkerReplicas = utils.CalculateAvailableReplicas(runtimePods) | ||
instance.Status.DesiredWorkerReplicas = utils.CalculateDesiredReplicas(instance) | ||
instance.Status.MinWorkerReplicas = utils.CalculateMinReplicas(instance) | ||
instance.Status.MaxWorkerReplicas = utils.CalculateMaxReplicas(instance) | ||
newInstance.Status.AvailableWorkerReplicas = utils.CalculateAvailableReplicas(runtimePods) | ||
newInstance.Status.DesiredWorkerReplicas = utils.CalculateDesiredReplicas(newInstance) | ||
newInstance.Status.MinWorkerReplicas = utils.CalculateMinReplicas(newInstance) | ||
newInstance.Status.MaxWorkerReplicas = utils.CalculateMaxReplicas(newInstance) | ||
|
||
// validation for the RayStartParam for the state. | ||
isValid, err := common.ValidateHeadRayStartParams(instance.Spec.HeadGroupSpec) | ||
isValid, err := common.ValidateHeadRayStartParams(newInstance.Spec.HeadGroupSpec) | ||
if err != nil { | ||
r.Recorder.Event(instance, corev1.EventTypeWarning, string(rayv1alpha1.RayConfigError), err.Error()) | ||
r.Recorder.Event(newInstance, corev1.EventTypeWarning, string(rayv1alpha1.RayConfigError), err.Error()) | ||
} | ||
// only in invalid status that we update the status to unhealthy. | ||
if !isValid { | ||
instance.Status.State = rayv1alpha1.Unhealthy | ||
newInstance.Status.State = rayv1alpha1.Unhealthy | ||
} else { | ||
if utils.CheckAllPodsRunning(runtimePods) { | ||
instance.Status.State = rayv1alpha1.Ready | ||
newInstance.Status.State = rayv1alpha1.Ready | ||
} | ||
} | ||
|
||
if err := r.updateEndpoints(ctx, instance); err != nil { | ||
return err | ||
if err := r.updateEndpoints(ctx, newInstance); err != nil { | ||
return nil, err | ||
} | ||
|
||
if err := r.updateHeadInfo(ctx, instance); err != nil { | ||
return err | ||
if err := r.updateHeadInfo(ctx, newInstance); err != nil { | ||
return nil, err | ||
} | ||
|
||
timeNow := metav1.Now() | ||
instance.Status.LastUpdateTime = &timeNow | ||
if err := r.Status().Update(ctx, instance); err != nil { | ||
return err | ||
} | ||
newInstance.Status.LastUpdateTime = &timeNow | ||
|
||
return nil | ||
return newInstance, nil | ||
} | ||
|
||
// Best effort to obtain the ip of the head node. | ||
|
@@ -1113,11 +1155,19 @@ func (r *RayClusterReconciler) reconcileAutoscalerRoleBinding(ctx context.Contex | |
} | ||
|
||
func (r *RayClusterReconciler) updateClusterState(ctx context.Context, instance *rayv1alpha1.RayCluster, clusterState rayv1alpha1.ClusterState) error { | ||
if instance.Status.State == clusterState { | ||
return nil | ||
} | ||
instance.Status.State = clusterState | ||
r.Log.Info("updateClusterState", "Update CR Status.State", clusterState) | ||
return r.Status().Update(ctx, instance) | ||
} | ||
|
||
func (r *RayClusterReconciler) updateClusterReason(ctx context.Context, instance *rayv1alpha1.RayCluster, clusterReason string) error { | ||
if instance.Status.Reason == clusterReason { | ||
return nil | ||
} | ||
instance.Status.Reason = clusterReason | ||
r.Log.Info("updateClusterReason", "Update CR Status.Reason", clusterReason) | ||
return r.Status().Update(ctx, instance) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we have GH issue for this
ObservedGeneration
fix?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question. I will open an issue for it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Open an issue #1217.