Skip to content

Commit

Permalink
RayCluster updates status frequently (#1211)
Browse files Browse the repository at this point in the history
RayCluster updates status frequently
  • Loading branch information
kevin85421 authored Jul 5, 2023
1 parent 2c97ac3 commit 15daa54
Show file tree
Hide file tree
Showing 2 changed files with 253 additions and 31 deletions.
108 changes: 79 additions & 29 deletions ray-operator/controllers/ray/raycluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"os"
"reflect"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -198,6 +199,9 @@ func (r *RayClusterReconciler) eventReconcile(ctx context.Context, request ctrl.
}

func (r *RayClusterReconciler) rayClusterReconcile(ctx context.Context, request ctrl.Request, instance *rayv1alpha1.RayCluster) (ctrl.Result, error) {
// Please do NOT modify `originalRayClusterInstance` in the following code.
originalRayClusterInstance := instance.DeepCopy()

_ = r.Log.WithValues("raycluster", request.NamespacedName)
r.Log.Info("reconciling RayCluster", "cluster name", request.Name)

Expand Down Expand Up @@ -247,19 +251,26 @@ func (r *RayClusterReconciler) rayClusterReconcile(ctx context.Context, request
r.Recorder.Event(instance, corev1.EventTypeWarning, string(rayv1alpha1.PodReconciliationError), err.Error())
return ctrl.Result{RequeueAfter: DefaultRequeueDuration}, err
}
// update the status if needed
if err := r.updateStatus(ctx, instance); err != nil {
if errors.IsNotFound(err) {
r.Log.Info("Update status not found error", "cluster name", request.Name)
} else {
r.Log.Error(err, "Update status error", "cluster name", request.Name)

// Calculate the new status for the RayCluster. Note that the function will deep copy `instance` instead of mutating it.
newInstance, err := r.calculateStatus(ctx, instance)
if err != nil {
r.Log.Info("Got error when calculating new status", "cluster name", request.Name, "error", err)
return ctrl.Result{RequeueAfter: DefaultRequeueDuration}, err
}

// Check if need to update the status.
if r.inconsistentRayClusterStatus(originalRayClusterInstance.Status, newInstance.Status) {
r.Log.Info("rayClusterReconcile", "Update CR status", request.Name, "status", newInstance.Status)
if err := r.Status().Update(ctx, newInstance); err != nil {
r.Log.Info("Got error when updating status", "cluster name", request.Name, "error", err, "RayCluster", newInstance)
return ctrl.Result{RequeueAfter: DefaultRequeueDuration}, err
}
}

// Unconditionally requeue after the number of seconds specified in the
// environment variable RAYCLUSTER_DEFAULT_REQUEUE_SECONDS_ENV. If the
// environment variable is not set, requeue after the default value.
var requeueAfterSeconds int
requeueAfterSeconds, err := strconv.Atoi(os.Getenv(common.RAYCLUSTER_DEFAULT_REQUEUE_SECONDS_ENV))
if err != nil {
r.Log.Info(fmt.Sprintf("Environment variable %s is not set, using default value of %d seconds", common.RAYCLUSTER_DEFAULT_REQUEUE_SECONDS_ENV, common.RAYCLUSTER_DEFAULT_REQUEUE_SECONDS), "cluster name", request.Name)
Expand All @@ -269,6 +280,37 @@ func (r *RayClusterReconciler) rayClusterReconcile(ctx context.Context, request
return ctrl.Result{RequeueAfter: time.Duration(requeueAfterSeconds) * time.Second}, nil
}

// Checks whether the old and new RayClusterStatus are inconsistent by comparing different fields. If the only
// differences between the old and new status are the `LastUpdateTime` and `ObservedGeneration` fields, the
// status update will not be triggered.
//
// TODO (kevin85421): The field `ObservedGeneration` is not being well-maintained at the moment. In the future,
// this field should be used to determine whether to update this CR or not.
func (r *RayClusterReconciler) inconsistentRayClusterStatus(oldStatus rayv1alpha1.RayClusterStatus, newStatus rayv1alpha1.RayClusterStatus) bool {
if oldStatus.State != newStatus.State || oldStatus.Reason != newStatus.Reason {
r.Log.Info("inconsistentRayClusterStatus", "detect inconsistency", fmt.Sprintf(
"old State: %s, new State: %s, old Reason: %s, new Reason: %s",
oldStatus.State, newStatus.State, oldStatus.Reason, newStatus.Reason))
return true
}
if oldStatus.AvailableWorkerReplicas != newStatus.AvailableWorkerReplicas || oldStatus.DesiredWorkerReplicas != newStatus.DesiredWorkerReplicas ||
oldStatus.MinWorkerReplicas != newStatus.MinWorkerReplicas || oldStatus.MaxWorkerReplicas != newStatus.MaxWorkerReplicas {
r.Log.Info("inconsistentRayClusterStatus", "detect inconsistency", fmt.Sprintf(
"old AvailableWorkerReplicas: %d, new AvailableWorkerReplicas: %d, old DesiredWorkerReplicas: %d, new DesiredWorkerReplicas: %d, "+
"old MinWorkerReplicas: %d, new MinWorkerReplicas: %d, old MaxWorkerReplicas: %d, new MaxWorkerReplicas: %d",
oldStatus.AvailableWorkerReplicas, newStatus.AvailableWorkerReplicas, oldStatus.DesiredWorkerReplicas, newStatus.DesiredWorkerReplicas,
oldStatus.MinWorkerReplicas, newStatus.MinWorkerReplicas, oldStatus.MaxWorkerReplicas, newStatus.MaxWorkerReplicas))
return true
}
if !reflect.DeepEqual(oldStatus.Endpoints, newStatus.Endpoints) || !reflect.DeepEqual(oldStatus.Head, newStatus.Head) {
r.Log.Info("inconsistentRayClusterStatus", "detect inconsistency", fmt.Sprintf(
"old Endpoints: %v, new Endpoints: %v, old Head: %v, new Head: %v",
oldStatus.Endpoints, newStatus.Endpoints, oldStatus.Head, newStatus.Head))
return true
}
return false
}

func (r *RayClusterReconciler) reconcileIngress(ctx context.Context, instance *rayv1alpha1.RayCluster) error {
if instance.Spec.HeadGroupSpec.EnableIngress == nil || !*instance.Spec.HeadGroupSpec.EnableIngress {
return nil
Expand Down Expand Up @@ -841,50 +883,50 @@ func (r *RayClusterReconciler) SetupWithManager(mgr ctrl.Manager, reconcileConcu
Complete(r)
}

func (r *RayClusterReconciler) updateStatus(ctx context.Context, instance *rayv1alpha1.RayCluster) error {
func (r *RayClusterReconciler) calculateStatus(ctx context.Context, instance *rayv1alpha1.RayCluster) (*rayv1alpha1.RayCluster, error) {
// Deep copy the instance, so we don't mutate the original object.
newInstance := instance.DeepCopy()

// TODO (kevin85421): ObservedGeneration should be used to determine whether to update this CR or not.
instance.Status.ObservedGeneration = instance.ObjectMeta.Generation
newInstance.Status.ObservedGeneration = newInstance.ObjectMeta.Generation

runtimePods := corev1.PodList{}
filterLabels := client.MatchingLabels{common.RayClusterLabelKey: instance.Name}
if err := r.List(ctx, &runtimePods, client.InNamespace(instance.Namespace), filterLabels); err != nil {
return err
filterLabels := client.MatchingLabels{common.RayClusterLabelKey: newInstance.Name}
if err := r.List(ctx, &runtimePods, client.InNamespace(newInstance.Namespace), filterLabels); err != nil {
return nil, err
}

instance.Status.AvailableWorkerReplicas = utils.CalculateAvailableReplicas(runtimePods)
instance.Status.DesiredWorkerReplicas = utils.CalculateDesiredReplicas(instance)
instance.Status.MinWorkerReplicas = utils.CalculateMinReplicas(instance)
instance.Status.MaxWorkerReplicas = utils.CalculateMaxReplicas(instance)
newInstance.Status.AvailableWorkerReplicas = utils.CalculateAvailableReplicas(runtimePods)
newInstance.Status.DesiredWorkerReplicas = utils.CalculateDesiredReplicas(newInstance)
newInstance.Status.MinWorkerReplicas = utils.CalculateMinReplicas(newInstance)
newInstance.Status.MaxWorkerReplicas = utils.CalculateMaxReplicas(newInstance)

// validation for the RayStartParam for the state.
isValid, err := common.ValidateHeadRayStartParams(instance.Spec.HeadGroupSpec)
isValid, err := common.ValidateHeadRayStartParams(newInstance.Spec.HeadGroupSpec)
if err != nil {
r.Recorder.Event(instance, corev1.EventTypeWarning, string(rayv1alpha1.RayConfigError), err.Error())
r.Recorder.Event(newInstance, corev1.EventTypeWarning, string(rayv1alpha1.RayConfigError), err.Error())
}
// only in invalid status that we update the status to unhealthy.
if !isValid {
instance.Status.State = rayv1alpha1.Unhealthy
newInstance.Status.State = rayv1alpha1.Unhealthy
} else {
if utils.CheckAllPodsRunning(runtimePods) {
instance.Status.State = rayv1alpha1.Ready
newInstance.Status.State = rayv1alpha1.Ready
}
}

if err := r.updateEndpoints(ctx, instance); err != nil {
return err
if err := r.updateEndpoints(ctx, newInstance); err != nil {
return nil, err
}

if err := r.updateHeadInfo(ctx, instance); err != nil {
return err
if err := r.updateHeadInfo(ctx, newInstance); err != nil {
return nil, err
}

timeNow := metav1.Now()
instance.Status.LastUpdateTime = &timeNow
if err := r.Status().Update(ctx, instance); err != nil {
return err
}
newInstance.Status.LastUpdateTime = &timeNow

return nil
return newInstance, nil
}

// Best effort to obtain the ip of the head node.
Expand Down Expand Up @@ -1113,11 +1155,19 @@ func (r *RayClusterReconciler) reconcileAutoscalerRoleBinding(ctx context.Contex
}

func (r *RayClusterReconciler) updateClusterState(ctx context.Context, instance *rayv1alpha1.RayCluster, clusterState rayv1alpha1.ClusterState) error {
if instance.Status.State == clusterState {
return nil
}
instance.Status.State = clusterState
r.Log.Info("updateClusterState", "Update CR Status.State", clusterState)
return r.Status().Update(ctx, instance)
}

func (r *RayClusterReconciler) updateClusterReason(ctx context.Context, instance *rayv1alpha1.RayCluster, clusterReason string) error {
if instance.Status.Reason == clusterReason {
return nil
}
instance.Status.Reason = clusterReason
r.Log.Info("updateClusterReason", "Update CR Status.Reason", clusterReason)
return r.Status().Update(ctx, instance)
}
176 changes: 174 additions & 2 deletions ray-operator/controllers/ray/raycluster_controller_fake_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"context"
"fmt"
"testing"
"time"

"github.com/ray-project/kuberay/ray-operator/controllers/ray/common"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
Expand Down Expand Up @@ -1349,9 +1350,180 @@ func TestUpdateStatusObservedGeneration(t *testing.T) {
}

// Compare the values of `Generation` and `ObservedGeneration` to check if they match.
err = testRayClusterReconciler.updateStatus(ctx, testRayCluster)
newInstance, err := testRayClusterReconciler.calculateStatus(ctx, testRayCluster)
assert.Nil(t, err)
err = fakeClient.Get(ctx, namespacedName, &cluster)
assert.Nil(t, err)
assert.Equal(t, cluster.ObjectMeta.Generation, cluster.Status.ObservedGeneration)
assert.Equal(t, cluster.ObjectMeta.Generation, newInstance.Status.ObservedGeneration)
}

func TestReconcile_UpdateClusterState(t *testing.T) {
setupTest(t)
defer tearDown(t)
newScheme := runtime.NewScheme()
_ = rayv1alpha1.AddToScheme(newScheme)

fakeClient := clientFake.NewClientBuilder().WithScheme(newScheme).WithRuntimeObjects(testRayCluster).Build()
ctx := context.Background()

namespacedName := types.NamespacedName{
Name: instanceName,
Namespace: namespaceStr,
}
cluster := rayv1alpha1.RayCluster{}
err := fakeClient.Get(ctx, namespacedName, &cluster)
assert.Nil(t, err, "Fail to get RayCluster")
assert.Empty(t, cluster.Status.State, "Cluster state should be empty")

testRayClusterReconciler := &RayClusterReconciler{
Client: fakeClient,
Recorder: &record.FakeRecorder{},
Scheme: scheme.Scheme,
Log: ctrl.Log.WithName("controllers").WithName("RayCluster"),
}

state := rayv1alpha1.Ready
err = testRayClusterReconciler.updateClusterState(ctx, testRayCluster, state)
assert.Nil(t, err, "Fail to update cluster state")

err = fakeClient.Get(ctx, namespacedName, &cluster)
assert.Nil(t, err, "Fail to get RayCluster after updating state")
assert.Equal(t, cluster.Status.State, state, "Cluster state should be updated")
}

func TestInconsistentRayClusterStatus(t *testing.T) {
newScheme := runtime.NewScheme()
_ = rayv1alpha1.AddToScheme(newScheme)
fakeClient := clientFake.NewClientBuilder().WithScheme(newScheme).WithRuntimeObjects().Build()
r := &RayClusterReconciler{
Client: fakeClient,
Recorder: &record.FakeRecorder{},
Scheme: scheme.Scheme,
Log: ctrl.Log.WithName("controllers").WithName("RayCluster"),
}

// Mock data
timeNow := metav1.Now()
oldStatus := rayv1alpha1.RayClusterStatus{
State: rayv1alpha1.Ready,
AvailableWorkerReplicas: 1,
DesiredWorkerReplicas: 1,
MinWorkerReplicas: 1,
MaxWorkerReplicas: 10,
LastUpdateTime: &timeNow,
Endpoints: map[string]string{
"client": "10001",
"dashboard": "8265",
"gcs": "6379",
"metrics": "8080",
},
Head: rayv1alpha1.HeadInfo{
PodIP: "10.244.0.6",
ServiceIP: "10.96.140.249",
},
ObservedGeneration: 1,
Reason: "test reason",
}

// `inconsistentRayClusterStatus` is used to check whether the old and new RayClusterStatus are inconsistent
// by comparing different fields. If the only differences between the old and new status are the `LastUpdateTime`
// and `ObservedGeneration` fields (Case 9 and Case 10), the status update will not be triggered.

// Case 1: `State` is different => return true
newStatus := oldStatus.DeepCopy()
newStatus.State = rayv1alpha1.Failed
assert.True(t, r.inconsistentRayClusterStatus(oldStatus, *newStatus))

// Case 2: `Reason` is different => return true
newStatus = oldStatus.DeepCopy()
newStatus.Reason = "new reason"
assert.True(t, r.inconsistentRayClusterStatus(oldStatus, *newStatus))

// Case 3: `AvailableWorkerReplicas` is different => return true
newStatus = oldStatus.DeepCopy()
newStatus.AvailableWorkerReplicas = oldStatus.AvailableWorkerReplicas + 1
assert.True(t, r.inconsistentRayClusterStatus(oldStatus, *newStatus))

// Case 4: `DesiredWorkerReplicas` is different => return true
newStatus = oldStatus.DeepCopy()
newStatus.DesiredWorkerReplicas = oldStatus.DesiredWorkerReplicas + 1
assert.True(t, r.inconsistentRayClusterStatus(oldStatus, *newStatus))

// Case 5: `MinWorkerReplicas` is different => return true
newStatus = oldStatus.DeepCopy()
newStatus.MinWorkerReplicas = oldStatus.MinWorkerReplicas + 1
assert.True(t, r.inconsistentRayClusterStatus(oldStatus, *newStatus))

// Case 6: `MaxWorkerReplicas` is different => return true
newStatus = oldStatus.DeepCopy()
newStatus.MaxWorkerReplicas = oldStatus.MaxWorkerReplicas + 1
assert.True(t, r.inconsistentRayClusterStatus(oldStatus, *newStatus))

// Case 7: `Endpoints` is different => return true
newStatus = oldStatus.DeepCopy()
newStatus.Endpoints["fakeEndpoint"] = "10009"
assert.True(t, r.inconsistentRayClusterStatus(oldStatus, *newStatus))

// Case 8: `Head` is different => return true
newStatus = oldStatus.DeepCopy()
newStatus.Head.PodIP = "test head pod ip"
assert.True(t, r.inconsistentRayClusterStatus(oldStatus, *newStatus))

// Case 9: `LastUpdateTime` is different => return false
newStatus = oldStatus.DeepCopy()
newStatus.LastUpdateTime = &metav1.Time{Time: timeNow.Add(time.Hour)}
assert.False(t, r.inconsistentRayClusterStatus(oldStatus, *newStatus))

// Case 10: `ObservedGeneration` is different => return false
newStatus = oldStatus.DeepCopy()
newStatus.ObservedGeneration = oldStatus.ObservedGeneration + 1
assert.False(t, r.inconsistentRayClusterStatus(oldStatus, *newStatus))
}

func TestCalculateStatus(t *testing.T) {
setupTest(t)
defer tearDown(t)

// Create a new scheme with CRDs, Pod, Service schemes.
newScheme := runtime.NewScheme()
_ = rayv1alpha1.AddToScheme(newScheme)
_ = corev1.AddToScheme(newScheme)

// Mock data
headServiceIP := "aaa.bbb.ccc.ddd"
headService, err := common.BuildServiceForHeadPod(*testRayCluster, nil, nil)
assert.Nil(t, err, "Failed to build head service.")
headService.Spec.ClusterIP = headServiceIP
headPod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "headNode",
Namespace: namespaceStr,
Labels: map[string]string{
common.RayClusterLabelKey: instanceName,
common.RayNodeTypeLabelKey: string(rayv1alpha1.HeadNode),
},
},
Status: corev1.PodStatus{
PodIP: headNodeIP,
},
}
runtimeObjects := []runtime.Object{headPod, headService}

// Initialize a fake client with newScheme and runtimeObjects.
fakeClient := clientFake.NewClientBuilder().WithScheme(newScheme).WithRuntimeObjects(runtimeObjects...).Build()
ctx := context.Background()

// Initialize a RayCluster reconciler.
r := &RayClusterReconciler{
Client: fakeClient,
Recorder: &record.FakeRecorder{},
Scheme: scheme.Scheme,
Log: ctrl.Log.WithName("controllers").WithName("RayCluster"),
}

// Test head information
newInstance, err := r.calculateStatus(ctx, testRayCluster)
assert.Nil(t, err)
assert.Equal(t, headNodeIP, newInstance.Status.Head.PodIP)
assert.Equal(t, headServiceIP, newInstance.Status.Head.ServiceIP)
}

0 comments on commit 15daa54

Please sign in to comment.