Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RayService object's Status is being updated due to frequent reconciliation #1065

Merged
merged 8 commits into from
May 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 64 additions & 3 deletions ray-operator/controllers/ray/rayservice_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ func (r *RayServiceReconciler) Reconcile(ctx context.Context, request ctrl.Reque
if rayServiceInstance, err = r.getRayServiceInstance(ctx, request); err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
}
originalRayServiceInstance := rayServiceInstance.DeepCopy()
r.cleanUpServeConfigCache(rayServiceInstance)

// TODO (kevin85421): ObservedGeneration should be used to determine whether to update this CR or not.
Expand Down Expand Up @@ -212,14 +213,74 @@ func (r *RayServiceReconciler) Reconcile(ctx context.Context, request ctrl.Reque
}

// Final status update for any CR modification.
if errStatus := r.Status().Update(ctx, rayServiceInstance); errStatus != nil {
logger.Error(errStatus, "Fail to update status of RayService", "rayServiceInstance", rayServiceInstance)
return ctrl.Result{RequeueAfter: ServiceDefaultRequeueDuration}, err
if r.inconsistentRayServiceStatuses(originalRayServiceInstance.Status, rayServiceInstance.Status) {
if errStatus := r.Status().Update(ctx, rayServiceInstance); errStatus != nil {
logger.Error(errStatus, "Failed to update RayService status", "rayServiceInstance", rayServiceInstance)
return ctrl.Result{RequeueAfter: ServiceDefaultRequeueDuration}, err
}
}

return ctrl.Result{RequeueAfter: ServiceDefaultRequeueDuration}, nil
}

// Checks whether the old and new RayServiceStatus are inconsistent by comparing different fields.
// If the only differences between the old and new status are the LastUpdateTime and HealthLastUpdateTime fields,
// the status update will not be triggered.
// The RayClusterStatus field is only for observability in RayService CR, and changes to it will not trigger the status update.
func (r *RayServiceReconciler) inconsistentRayServiceStatus(oldStatus rayv1alpha1.RayServiceStatus, newStatus rayv1alpha1.RayServiceStatus) bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add func comment to list what is decided to be inconsistent.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added. 36f1050

shrekris-anyscale marked this conversation as resolved.
Show resolved Hide resolved
if oldStatus.RayClusterName != newStatus.RayClusterName {
r.Log.Info(fmt.Sprintf("inconsistentRayServiceStatus RayService RayClusterName changed from %s to %s", oldStatus.RayClusterName, newStatus.RayClusterName))
return true
}

if oldStatus.DashboardStatus.IsHealthy != newStatus.DashboardStatus.IsHealthy {
r.Log.Info(fmt.Sprintf("inconsistentRayServiceStatus RayService DashboardStatus changed from %v to %v", oldStatus.DashboardStatus, newStatus.DashboardStatus))
return true
}

if oldStatus.ApplicationStatus.Status != newStatus.ApplicationStatus.Status ||
oldStatus.ApplicationStatus.Message != newStatus.ApplicationStatus.Message {
r.Log.Info(fmt.Sprintf("inconsistentRayServiceStatus RayService ApplicationStatus changed from %v to %v", oldStatus.ApplicationStatus, newStatus.ApplicationStatus))
return true
}

if len(oldStatus.ServeStatuses) != len(newStatus.ServeStatuses) {
r.Log.Info(fmt.Sprintf("inconsistentRayServiceStatus RayService number of ServeStatus changed from %v to %v", len(oldStatus.ServeStatuses), len(newStatus.ServeStatuses)))
return true
}

for i := 0; i < len(oldStatus.ServeStatuses); i++ {
if oldStatus.ServeStatuses[i].Name != newStatus.ServeStatuses[i].Name ||
oldStatus.ServeStatuses[i].Status != newStatus.ServeStatuses[i].Status ||
oldStatus.ServeStatuses[i].Message != newStatus.ServeStatuses[i].Message {
r.Log.Info(fmt.Sprintf("inconsistentRayServiceStatus RayService ServeDeploymentStatus changed from %v to %v", oldStatus.ServeStatuses[i], newStatus.ServeStatuses[i]))
return true
}
}

return false
}

// Determine whether to update the status of the RayService instance.
func (r *RayServiceReconciler) inconsistentRayServiceStatuses(oldStatus rayv1alpha1.RayServiceStatuses, newStatus rayv1alpha1.RayServiceStatuses) bool {
shrekris-anyscale marked this conversation as resolved.
Show resolved Hide resolved
if oldStatus.ServiceStatus != newStatus.ServiceStatus {
r.Log.Info(fmt.Sprintf("inconsistentRayServiceStatus RayService ServiceStatus changed from %s to %s", oldStatus.ServiceStatus, newStatus.ServiceStatus))
return true
}

if r.inconsistentRayServiceStatus(oldStatus.ActiveServiceStatus, newStatus.ActiveServiceStatus) {
r.Log.Info("inconsistentRayServiceStatus RayService ActiveServiceStatus changed")
return true
}

if r.inconsistentRayServiceStatus(oldStatus.PendingServiceStatus, newStatus.PendingServiceStatus) {
r.Log.Info("inconsistentRayServiceStatus RayService PendingServiceStatus changed")
return true
}

return false
}

// SetupWithManager sets up the controller with the Manager.
func (r *RayServiceReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
Expand Down
88 changes: 88 additions & 0 deletions ray-operator/controllers/ray/rayservice_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,94 @@ var _ = Context("Inside the default namespace", func() {
time.Second*15, time.Millisecond*500).Should(Equal(initialPendingClusterName), "New active RayCluster name = %v", myRayService.Status.ActiveServiceStatus.RayClusterName)
})

It("Status should be updated if the differences are not only LastUpdateTime and HealthLastUpdateTime fields.", func() {
// Make sure (1) Dashboard client is healthy (2) All the three Ray Serve deployments in the active RayCluster are HEALTHY.
initialClusterName, _ := getRayClusterNameFunc(ctx, myRayService)()
Eventually(
checkServiceHealth(ctx, myRayService),
time.Second*3, time.Millisecond*500).Should(BeTrue(), "myRayService status = %v", myRayService.Status)

// ServiceUnhealthySecondThreshold is a global variable in rayservice_controller.go.
// If the time elapsed since the last update of the service HEALTHY status exceeds ServiceUnhealthySecondThreshold seconds,
// the RayService controller will consider the active RayCluster as unhealthy and prepare a new RayCluster.
orignalServeDeploymentUnhealthySecondThreshold := ServiceUnhealthySecondThreshold
ServiceUnhealthySecondThreshold = 500

// Only update the LastUpdateTime and HealthLastUpdateTime fields in the active RayCluster.
oldTime := myRayService.Status.ActiveServiceStatus.ServeStatuses[0].HealthLastUpdateTime.DeepCopy()
newTime := oldTime.Add(time.Duration(5) * time.Minute) // 300 seconds
fakeRayDashboardClient.SetServeStatus(generateServeStatus(metav1.NewTime(newTime), "UNHEALTHY"))

// Confirm not switch to a new RayCluster because ServiceUnhealthySecondThreshold is 500 seconds.
Consistently(
getRayClusterNameFunc(ctx, myRayService),
time.Second*3, time.Millisecond*500).Should(Equal(initialClusterName), "Active RayCluster name = %v", myRayService.Status.ActiveServiceStatus.RayClusterName)

// Check if all the ServeStatuses[i].Status are UNHEALTHY.
checkAllServeStatusesUnhealthy := func(ctx context.Context, rayService *rayiov1alpha1.RayService) bool {
if err := k8sClient.Get(ctx, client.ObjectKey{Name: rayService.Name, Namespace: rayService.Namespace}, rayService); err != nil {
return false
}
for _, serveStatus := range rayService.Status.ActiveServiceStatus.ServeStatuses {
if serveStatus.Status != "UNHEALTHY" {
return false
}
}
return true
}

// The status update not only includes the LastUpdateTime and HealthLastUpdateTime fields, but also the ServeStatuses[i].Status field.
// Hence, all the ServeStatuses[i].Status should be updated to UNHEALTHY.
//
// Note: LastUpdateTime/HealthLastUpdateTime will be overwritten via metav1.Now() in rayservice_controller.go.
// Hence, we cannot use `newTime`` to check whether the status is updated or not.
Eventually(
checkAllServeStatusesUnhealthy(ctx, myRayService),
time.Second*3, time.Millisecond*500).Should(BeTrue(), "myRayService status = %v", myRayService.Status)

fakeRayDashboardClient.SetServeStatus(generateServeStatus(metav1.NewTime(newTime), "HEALTHY"))

// Confirm not switch to a new RayCluster because ServiceUnhealthySecondThreshold is 500 seconds.
Consistently(
getRayClusterNameFunc(ctx, myRayService),
time.Second*3, time.Millisecond*500).Should(Equal(initialClusterName), "Active RayCluster name = %v", myRayService.Status.ActiveServiceStatus.RayClusterName)

// The status update not only includes the LastUpdateTime and HealthLastUpdateTime fields, but also the ServeStatuses[i].Status field.
// Hence, the status should be updated.
Eventually(
checkServiceHealth(ctx, myRayService),
time.Second*3, time.Millisecond*500).Should(BeTrue(), "myRayService status = %v", myRayService.Status)
ServiceUnhealthySecondThreshold = orignalServeDeploymentUnhealthySecondThreshold
})

It("Status should not be updated if the only differences are the LastUpdateTime and HealthLastUpdateTime fields.", func() {
// Make sure (1) Dashboard client is healthy (2) All the three Ray Serve deployments in the active RayCluster are HEALTHY.
initialClusterName, _ := getRayClusterNameFunc(ctx, myRayService)()
Eventually(
checkServiceHealth(ctx, myRayService),
time.Second*3, time.Millisecond*500).Should(BeTrue(), "myRayService status = %v", myRayService.Status)

// Only update the LastUpdateTime and HealthLastUpdateTime fields in the active RayCluster.
oldTime := myRayService.Status.ActiveServiceStatus.ServeStatuses[0].HealthLastUpdateTime.DeepCopy()
newTime := oldTime.Add(time.Duration(5) * time.Minute) // 300 seconds
fakeRayDashboardClient.SetServeStatus(generateServeStatus(metav1.NewTime(newTime), "HEALTHY"))

// Confirm not switch to a new RayCluster
Consistently(
getRayClusterNameFunc(ctx, myRayService),
time.Second*3, time.Millisecond*500).Should(Equal(initialClusterName), "Active RayCluster name = %v", myRayService.Status.ActiveServiceStatus.RayClusterName)

// The status is still the same as before.
Eventually(
checkServiceHealth(ctx, myRayService),
time.Second*3, time.Millisecond*500).Should(BeTrue(), "myRayService status = %v", myRayService.Status)

// Status should not be updated if the only differences are the LastUpdateTime and HealthLastUpdateTime fields.
// Unlike the test "Status should be updated if the differences are not only LastUpdateTime and HealthLastUpdateTime fields.",
// the status update will not be triggered, so we can check whether the LastUpdateTime/HealthLastUpdateTime fields are updated or not by `oldTime`.
Expect(myRayService.Status.ActiveServiceStatus.ServeStatuses[0].HealthLastUpdateTime).Should(Equal(oldTime), "myRayService status = %v", myRayService.Status)
})

It("Update workerGroup.replicas in RayService and should not switch to new Ray Cluster", func() {
// Certain field updates should not trigger new RayCluster preparation, such as updates
// to `Replicas` and `WorkersToDelete` triggered by the autoscaler during scaling up/down.
Expand Down
114 changes: 114 additions & 0 deletions ray-operator/controllers/ray/rayservice_controller_unit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ import (
"github.com/ray-project/kuberay/ray-operator/apis/ray/v1alpha1"
"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/pointer"
ctrl "sigs.k8s.io/controller-runtime"
)

func TestGenerateRayClusterJsonHash(t *testing.T) {
Expand Down Expand Up @@ -60,3 +62,115 @@ func TestCompareRayClusterJsonHash(t *testing.T) {
assert.Nil(t, err)
assert.True(t, equal)
}

func TestInconsistentRayServiceStatuses(t *testing.T) {
r := &RayServiceReconciler{
Log: ctrl.Log.WithName("controllers").WithName("RayService"),
}

timeNow := metav1.Now()
oldStatus := v1alpha1.RayServiceStatuses{
ActiveServiceStatus: v1alpha1.RayServiceStatus{
RayClusterName: "new-cluster",
DashboardStatus: v1alpha1.DashboardStatus{
IsHealthy: true,
LastUpdateTime: &timeNow,
HealthLastUpdateTime: &timeNow,
},
ApplicationStatus: v1alpha1.AppStatus{
Status: "running",
Message: "OK",
LastUpdateTime: &timeNow,
HealthLastUpdateTime: &timeNow,
},
ServeStatuses: []v1alpha1.ServeDeploymentStatus{
{
Name: "serve-1",
Status: "unhealthy",
Message: "error",
LastUpdateTime: &timeNow,
HealthLastUpdateTime: &timeNow,
},
},
},
PendingServiceStatus: v1alpha1.RayServiceStatus{
RayClusterName: "old-cluster",
DashboardStatus: v1alpha1.DashboardStatus{
IsHealthy: true,
LastUpdateTime: &timeNow,
HealthLastUpdateTime: &timeNow,
},
ApplicationStatus: v1alpha1.AppStatus{
Status: "stopped",
Message: "stopped",
LastUpdateTime: &timeNow,
HealthLastUpdateTime: &timeNow,
},
ServeStatuses: []v1alpha1.ServeDeploymentStatus{
{
Name: "serve-1",
Status: "healthy",
Message: "Serve is healthy",
LastUpdateTime: &timeNow,
HealthLastUpdateTime: &timeNow,
},
},
},
ServiceStatus: v1alpha1.WaitForDashboard,
}

// Test 1: Update ServiceStatus only.
newStatus := oldStatus.DeepCopy()
newStatus.ServiceStatus = v1alpha1.WaitForServeDeploymentReady
assert.True(t, r.inconsistentRayServiceStatuses(oldStatus, *newStatus))

// Test 2: Test RayServiceStatus
newStatus = oldStatus.DeepCopy()
newStatus.ActiveServiceStatus.DashboardStatus.LastUpdateTime = &metav1.Time{Time: timeNow.Add(1)}
assert.False(t, r.inconsistentRayServiceStatuses(oldStatus, *newStatus))

newStatus.ActiveServiceStatus.DashboardStatus.IsHealthy = !oldStatus.ActiveServiceStatus.DashboardStatus.IsHealthy
assert.True(t, r.inconsistentRayServiceStatuses(oldStatus, *newStatus))
}

func TestInconsistentRayServiceStatus(t *testing.T) {
timeNow := metav1.Now()
oldStatus := v1alpha1.RayServiceStatus{
RayClusterName: "cluster-1",
DashboardStatus: v1alpha1.DashboardStatus{
IsHealthy: true,
LastUpdateTime: &timeNow,
HealthLastUpdateTime: &timeNow,
},
ApplicationStatus: v1alpha1.AppStatus{
Status: "running",
Message: "Application is running",
LastUpdateTime: &timeNow,
HealthLastUpdateTime: &timeNow,
},
ServeStatuses: []v1alpha1.ServeDeploymentStatus{
{
Name: "serve-1",
Status: "healthy",
Message: "Serve is healthy",
LastUpdateTime: &timeNow,
HealthLastUpdateTime: &timeNow,
},
},
}

r := &RayServiceReconciler{
Log: ctrl.Log.WithName("controllers").WithName("RayService"),
}

// Test 1: Only LastUpdateTime and HealthLastUpdateTime are updated.
newStatus := oldStatus.DeepCopy()
newStatus.DashboardStatus.LastUpdateTime = &metav1.Time{Time: timeNow.Add(1)}
assert.False(t, r.inconsistentRayServiceStatus(oldStatus, *newStatus))

// Test 2: Not only LastUpdateTime and HealthLastUpdateTime are updated.
newStatus = oldStatus.DeepCopy()
newStatus.DashboardStatus.LastUpdateTime = &metav1.Time{Time: timeNow.Add(1)}
newStatus.DashboardStatus.IsHealthy = !oldStatus.DashboardStatus.IsHealthy
assert.True(t, r.inconsistentRayServiceStatus(oldStatus, *newStatus))
}