Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RayCluster updates status frequently #1211

Merged
merged 6 commits into from
Jul 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 79 additions & 29 deletions ray-operator/controllers/ray/raycluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"os"
"reflect"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -198,6 +199,9 @@ func (r *RayClusterReconciler) eventReconcile(ctx context.Context, request ctrl.
}

func (r *RayClusterReconciler) rayClusterReconcile(ctx context.Context, request ctrl.Request, instance *rayv1alpha1.RayCluster) (ctrl.Result, error) {
// Please do NOT modify `originalRayClusterInstance` in the following code.
originalRayClusterInstance := instance.DeepCopy()

_ = r.Log.WithValues("raycluster", request.NamespacedName)
r.Log.Info("reconciling RayCluster", "cluster name", request.Name)

Expand Down Expand Up @@ -247,19 +251,26 @@ func (r *RayClusterReconciler) rayClusterReconcile(ctx context.Context, request
r.Recorder.Event(instance, corev1.EventTypeWarning, string(rayv1alpha1.PodReconciliationError), err.Error())
return ctrl.Result{RequeueAfter: DefaultRequeueDuration}, err
}
// update the status if needed
if err := r.updateStatus(ctx, instance); err != nil {
if errors.IsNotFound(err) {
r.Log.Info("Update status not found error", "cluster name", request.Name)
} else {
r.Log.Error(err, "Update status error", "cluster name", request.Name)

// Calculate the new status for the RayCluster. Note that the function will deep copy `instance` instead of mutating it.
newInstance, err := r.calculateStatus(ctx, instance)
if err != nil {
r.Log.Info("Got error when calculating new status", "cluster name", request.Name, "error", err)
return ctrl.Result{RequeueAfter: DefaultRequeueDuration}, err
}

// Check if need to update the status.
if r.inconsistentRayClusterStatus(originalRayClusterInstance.Status, newInstance.Status) {
r.Log.Info("rayClusterReconcile", "Update CR status", request.Name, "status", newInstance.Status)
if err := r.Status().Update(ctx, newInstance); err != nil {
r.Log.Info("Got error when updating status", "cluster name", request.Name, "error", err, "RayCluster", newInstance)
return ctrl.Result{RequeueAfter: DefaultRequeueDuration}, err
}
}

// Unconditionally requeue after the number of seconds specified in the
// environment variable RAYCLUSTER_DEFAULT_REQUEUE_SECONDS_ENV. If the
// environment variable is not set, requeue after the default value.
var requeueAfterSeconds int
requeueAfterSeconds, err := strconv.Atoi(os.Getenv(common.RAYCLUSTER_DEFAULT_REQUEUE_SECONDS_ENV))
if err != nil {
r.Log.Info(fmt.Sprintf("Environment variable %s is not set, using default value of %d seconds", common.RAYCLUSTER_DEFAULT_REQUEUE_SECONDS_ENV, common.RAYCLUSTER_DEFAULT_REQUEUE_SECONDS), "cluster name", request.Name)
Expand All @@ -269,6 +280,37 @@ func (r *RayClusterReconciler) rayClusterReconcile(ctx context.Context, request
return ctrl.Result{RequeueAfter: time.Duration(requeueAfterSeconds) * time.Second}, nil
}

// Checks whether the old and new RayClusterStatus are inconsistent by comparing different fields. If the only
// differences between the old and new status are the `LastUpdateTime` and `ObservedGeneration` fields, the
// status update will not be triggered.
//
// TODO (kevin85421): The field `ObservedGeneration` is not being well-maintained at the moment. In the future,
// this field should be used to determine whether to update this CR or not.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we have GH issue for this ObservedGeneration fix?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. I will open an issue for it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Open an issue #1217.

func (r *RayClusterReconciler) inconsistentRayClusterStatus(oldStatus rayv1alpha1.RayClusterStatus, newStatus rayv1alpha1.RayClusterStatus) bool {
if oldStatus.State != newStatus.State || oldStatus.Reason != newStatus.Reason {
r.Log.Info("inconsistentRayClusterStatus", "detect inconsistency", fmt.Sprintf(
"old State: %s, new State: %s, old Reason: %s, new Reason: %s",
oldStatus.State, newStatus.State, oldStatus.Reason, newStatus.Reason))
return true
}
if oldStatus.AvailableWorkerReplicas != newStatus.AvailableWorkerReplicas || oldStatus.DesiredWorkerReplicas != newStatus.DesiredWorkerReplicas ||
oldStatus.MinWorkerReplicas != newStatus.MinWorkerReplicas || oldStatus.MaxWorkerReplicas != newStatus.MaxWorkerReplicas {
r.Log.Info("inconsistentRayClusterStatus", "detect inconsistency", fmt.Sprintf(
"old AvailableWorkerReplicas: %d, new AvailableWorkerReplicas: %d, old DesiredWorkerReplicas: %d, new DesiredWorkerReplicas: %d, "+
"old MinWorkerReplicas: %d, new MinWorkerReplicas: %d, old MaxWorkerReplicas: %d, new MaxWorkerReplicas: %d",
oldStatus.AvailableWorkerReplicas, newStatus.AvailableWorkerReplicas, oldStatus.DesiredWorkerReplicas, newStatus.DesiredWorkerReplicas,
oldStatus.MinWorkerReplicas, newStatus.MinWorkerReplicas, oldStatus.MaxWorkerReplicas, newStatus.MaxWorkerReplicas))
return true
}
if !reflect.DeepEqual(oldStatus.Endpoints, newStatus.Endpoints) || !reflect.DeepEqual(oldStatus.Head, newStatus.Head) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any specific reason to not include this as part of if statement in line 296?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can merge L290, L296, and L305 into a single if statement; however, the resulting log message will be very long and difficult for users to read.

r.Log.Info("inconsistentRayClusterStatus", "detect inconsistency", fmt.Sprintf(
"old Endpoints: %v, new Endpoints: %v, old Head: %v, new Head: %v",
oldStatus.Endpoints, newStatus.Endpoints, oldStatus.Head, newStatus.Head))
return true
}
return false
}

func (r *RayClusterReconciler) reconcileIngress(ctx context.Context, instance *rayv1alpha1.RayCluster) error {
if instance.Spec.HeadGroupSpec.EnableIngress == nil || !*instance.Spec.HeadGroupSpec.EnableIngress {
return nil
Expand Down Expand Up @@ -841,50 +883,50 @@ func (r *RayClusterReconciler) SetupWithManager(mgr ctrl.Manager, reconcileConcu
Complete(r)
}

func (r *RayClusterReconciler) updateStatus(ctx context.Context, instance *rayv1alpha1.RayCluster) error {
func (r *RayClusterReconciler) calculateStatus(ctx context.Context, instance *rayv1alpha1.RayCluster) (*rayv1alpha1.RayCluster, error) {
// Deep copy the instance, so we don't mutate the original object.
newInstance := instance.DeepCopy()

// TODO (kevin85421): ObservedGeneration should be used to determine whether to update this CR or not.
instance.Status.ObservedGeneration = instance.ObjectMeta.Generation
newInstance.Status.ObservedGeneration = newInstance.ObjectMeta.Generation

runtimePods := corev1.PodList{}
filterLabels := client.MatchingLabels{common.RayClusterLabelKey: instance.Name}
if err := r.List(ctx, &runtimePods, client.InNamespace(instance.Namespace), filterLabels); err != nil {
return err
filterLabels := client.MatchingLabels{common.RayClusterLabelKey: newInstance.Name}
if err := r.List(ctx, &runtimePods, client.InNamespace(newInstance.Namespace), filterLabels); err != nil {
return nil, err
}

instance.Status.AvailableWorkerReplicas = utils.CalculateAvailableReplicas(runtimePods)
instance.Status.DesiredWorkerReplicas = utils.CalculateDesiredReplicas(instance)
instance.Status.MinWorkerReplicas = utils.CalculateMinReplicas(instance)
instance.Status.MaxWorkerReplicas = utils.CalculateMaxReplicas(instance)
newInstance.Status.AvailableWorkerReplicas = utils.CalculateAvailableReplicas(runtimePods)
newInstance.Status.DesiredWorkerReplicas = utils.CalculateDesiredReplicas(newInstance)
newInstance.Status.MinWorkerReplicas = utils.CalculateMinReplicas(newInstance)
newInstance.Status.MaxWorkerReplicas = utils.CalculateMaxReplicas(newInstance)

// validation for the RayStartParam for the state.
isValid, err := common.ValidateHeadRayStartParams(instance.Spec.HeadGroupSpec)
isValid, err := common.ValidateHeadRayStartParams(newInstance.Spec.HeadGroupSpec)
if err != nil {
r.Recorder.Event(instance, corev1.EventTypeWarning, string(rayv1alpha1.RayConfigError), err.Error())
r.Recorder.Event(newInstance, corev1.EventTypeWarning, string(rayv1alpha1.RayConfigError), err.Error())
}
// only in invalid status that we update the status to unhealthy.
if !isValid {
instance.Status.State = rayv1alpha1.Unhealthy
newInstance.Status.State = rayv1alpha1.Unhealthy
} else {
if utils.CheckAllPodsRunning(runtimePods) {
instance.Status.State = rayv1alpha1.Ready
newInstance.Status.State = rayv1alpha1.Ready
}
}

if err := r.updateEndpoints(ctx, instance); err != nil {
return err
if err := r.updateEndpoints(ctx, newInstance); err != nil {
return nil, err
}

if err := r.updateHeadInfo(ctx, instance); err != nil {
return err
if err := r.updateHeadInfo(ctx, newInstance); err != nil {
return nil, err
}

timeNow := metav1.Now()
instance.Status.LastUpdateTime = &timeNow
if err := r.Status().Update(ctx, instance); err != nil {
return err
}
newInstance.Status.LastUpdateTime = &timeNow

return nil
return newInstance, nil
}

// Best effort to obtain the ip of the head node.
Expand Down Expand Up @@ -1113,11 +1155,19 @@ func (r *RayClusterReconciler) reconcileAutoscalerRoleBinding(ctx context.Contex
}

func (r *RayClusterReconciler) updateClusterState(ctx context.Context, instance *rayv1alpha1.RayCluster, clusterState rayv1alpha1.ClusterState) error {
if instance.Status.State == clusterState {
return nil
}
instance.Status.State = clusterState
r.Log.Info("updateClusterState", "Update CR Status.State", clusterState)
return r.Status().Update(ctx, instance)
}

func (r *RayClusterReconciler) updateClusterReason(ctx context.Context, instance *rayv1alpha1.RayCluster, clusterReason string) error {
if instance.Status.Reason == clusterReason {
return nil
}
instance.Status.Reason = clusterReason
r.Log.Info("updateClusterReason", "Update CR Status.Reason", clusterReason)
return r.Status().Update(ctx, instance)
}
176 changes: 174 additions & 2 deletions ray-operator/controllers/ray/raycluster_controller_fake_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"context"
"fmt"
"testing"
"time"

"github.com/ray-project/kuberay/ray-operator/controllers/ray/common"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
Expand Down Expand Up @@ -1349,9 +1350,180 @@ func TestUpdateStatusObservedGeneration(t *testing.T) {
}

// Compare the values of `Generation` and `ObservedGeneration` to check if they match.
err = testRayClusterReconciler.updateStatus(ctx, testRayCluster)
newInstance, err := testRayClusterReconciler.calculateStatus(ctx, testRayCluster)
assert.Nil(t, err)
err = fakeClient.Get(ctx, namespacedName, &cluster)
assert.Nil(t, err)
assert.Equal(t, cluster.ObjectMeta.Generation, cluster.Status.ObservedGeneration)
assert.Equal(t, cluster.ObjectMeta.Generation, newInstance.Status.ObservedGeneration)
}

func TestReconcile_UpdateClusterState(t *testing.T) {
setupTest(t)
defer tearDown(t)
newScheme := runtime.NewScheme()
_ = rayv1alpha1.AddToScheme(newScheme)

fakeClient := clientFake.NewClientBuilder().WithScheme(newScheme).WithRuntimeObjects(testRayCluster).Build()
ctx := context.Background()

namespacedName := types.NamespacedName{
Name: instanceName,
Namespace: namespaceStr,
}
cluster := rayv1alpha1.RayCluster{}
err := fakeClient.Get(ctx, namespacedName, &cluster)
assert.Nil(t, err, "Fail to get RayCluster")
assert.Empty(t, cluster.Status.State, "Cluster state should be empty")

testRayClusterReconciler := &RayClusterReconciler{
Client: fakeClient,
Recorder: &record.FakeRecorder{},
Scheme: scheme.Scheme,
Log: ctrl.Log.WithName("controllers").WithName("RayCluster"),
}

state := rayv1alpha1.Ready
err = testRayClusterReconciler.updateClusterState(ctx, testRayCluster, state)
assert.Nil(t, err, "Fail to update cluster state")

err = fakeClient.Get(ctx, namespacedName, &cluster)
assert.Nil(t, err, "Fail to get RayCluster after updating state")
assert.Equal(t, cluster.Status.State, state, "Cluster state should be updated")
}

func TestInconsistentRayClusterStatus(t *testing.T) {
newScheme := runtime.NewScheme()
_ = rayv1alpha1.AddToScheme(newScheme)
fakeClient := clientFake.NewClientBuilder().WithScheme(newScheme).WithRuntimeObjects().Build()
r := &RayClusterReconciler{
Client: fakeClient,
Recorder: &record.FakeRecorder{},
Scheme: scheme.Scheme,
Log: ctrl.Log.WithName("controllers").WithName("RayCluster"),
}

// Mock data
timeNow := metav1.Now()
oldStatus := rayv1alpha1.RayClusterStatus{
State: rayv1alpha1.Ready,
AvailableWorkerReplicas: 1,
DesiredWorkerReplicas: 1,
MinWorkerReplicas: 1,
MaxWorkerReplicas: 10,
LastUpdateTime: &timeNow,
Endpoints: map[string]string{
"client": "10001",
"dashboard": "8265",
"gcs": "6379",
"metrics": "8080",
},
Head: rayv1alpha1.HeadInfo{
PodIP: "10.244.0.6",
ServiceIP: "10.96.140.249",
},
ObservedGeneration: 1,
Reason: "test reason",
}

// `inconsistentRayClusterStatus` is used to check whether the old and new RayClusterStatus are inconsistent
// by comparing different fields. If the only differences between the old and new status are the `LastUpdateTime`
// and `ObservedGeneration` fields (Case 9 and Case 10), the status update will not be triggered.

// Case 1: `State` is different => return true
newStatus := oldStatus.DeepCopy()
newStatus.State = rayv1alpha1.Failed
assert.True(t, r.inconsistentRayClusterStatus(oldStatus, *newStatus))

// Case 2: `Reason` is different => return true
newStatus = oldStatus.DeepCopy()
newStatus.Reason = "new reason"
assert.True(t, r.inconsistentRayClusterStatus(oldStatus, *newStatus))

// Case 3: `AvailableWorkerReplicas` is different => return true
newStatus = oldStatus.DeepCopy()
newStatus.AvailableWorkerReplicas = oldStatus.AvailableWorkerReplicas + 1
assert.True(t, r.inconsistentRayClusterStatus(oldStatus, *newStatus))

// Case 4: `DesiredWorkerReplicas` is different => return true
newStatus = oldStatus.DeepCopy()
newStatus.DesiredWorkerReplicas = oldStatus.DesiredWorkerReplicas + 1
assert.True(t, r.inconsistentRayClusterStatus(oldStatus, *newStatus))

// Case 5: `MinWorkerReplicas` is different => return true
newStatus = oldStatus.DeepCopy()
newStatus.MinWorkerReplicas = oldStatus.MinWorkerReplicas + 1
assert.True(t, r.inconsistentRayClusterStatus(oldStatus, *newStatus))

// Case 6: `MaxWorkerReplicas` is different => return true
newStatus = oldStatus.DeepCopy()
newStatus.MaxWorkerReplicas = oldStatus.MaxWorkerReplicas + 1
assert.True(t, r.inconsistentRayClusterStatus(oldStatus, *newStatus))

// Case 7: `Endpoints` is different => return true
newStatus = oldStatus.DeepCopy()
newStatus.Endpoints["fakeEndpoint"] = "10009"
assert.True(t, r.inconsistentRayClusterStatus(oldStatus, *newStatus))

// Case 8: `Head` is different => return true
newStatus = oldStatus.DeepCopy()
newStatus.Head.PodIP = "test head pod ip"
assert.True(t, r.inconsistentRayClusterStatus(oldStatus, *newStatus))

// Case 9: `LastUpdateTime` is different => return false
newStatus = oldStatus.DeepCopy()
newStatus.LastUpdateTime = &metav1.Time{Time: timeNow.Add(time.Hour)}
assert.False(t, r.inconsistentRayClusterStatus(oldStatus, *newStatus))

// Case 10: `ObservedGeneration` is different => return false
newStatus = oldStatus.DeepCopy()
newStatus.ObservedGeneration = oldStatus.ObservedGeneration + 1
assert.False(t, r.inconsistentRayClusterStatus(oldStatus, *newStatus))
}

func TestCalculateStatus(t *testing.T) {
setupTest(t)
defer tearDown(t)

// Create a new scheme with CRDs, Pod, Service schemes.
newScheme := runtime.NewScheme()
_ = rayv1alpha1.AddToScheme(newScheme)
_ = corev1.AddToScheme(newScheme)

// Mock data
headServiceIP := "aaa.bbb.ccc.ddd"
headService, err := common.BuildServiceForHeadPod(*testRayCluster, nil, nil)
assert.Nil(t, err, "Failed to build head service.")
headService.Spec.ClusterIP = headServiceIP
headPod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "headNode",
Namespace: namespaceStr,
Labels: map[string]string{
common.RayClusterLabelKey: instanceName,
common.RayNodeTypeLabelKey: string(rayv1alpha1.HeadNode),
},
},
Status: corev1.PodStatus{
PodIP: headNodeIP,
},
}
runtimeObjects := []runtime.Object{headPod, headService}

// Initialize a fake client with newScheme and runtimeObjects.
fakeClient := clientFake.NewClientBuilder().WithScheme(newScheme).WithRuntimeObjects(runtimeObjects...).Build()
ctx := context.Background()

// Initialize a RayCluster reconciler.
r := &RayClusterReconciler{
Client: fakeClient,
Recorder: &record.FakeRecorder{},
Scheme: scheme.Scheme,
Log: ctrl.Log.WithName("controllers").WithName("RayCluster"),
}

// Test head information
newInstance, err := r.calculateStatus(ctx, testRayCluster)
assert.Nil(t, err)
assert.Equal(t, headNodeIP, newInstance.Status.Head.PodIP)
assert.Equal(t, headServiceIP, newInstance.Status.Head.ServiceIP)
}