Skip to content

Commit

Permalink
RayService object's Status is being updated due to frequent reconcili…
Browse files Browse the repository at this point in the history
…ation (#1065)

RayService object's Status is being updated due to frequent reconciliation
  • Loading branch information
kevin85421 authored May 8, 2023
1 parent ba814ef commit 795db0d
Show file tree
Hide file tree
Showing 3 changed files with 266 additions and 3 deletions.
67 changes: 64 additions & 3 deletions ray-operator/controllers/ray/rayservice_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ func (r *RayServiceReconciler) Reconcile(ctx context.Context, request ctrl.Reque
if rayServiceInstance, err = r.getRayServiceInstance(ctx, request); err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
}
originalRayServiceInstance := rayServiceInstance.DeepCopy()
r.cleanUpServeConfigCache(rayServiceInstance)

// TODO (kevin85421): ObservedGeneration should be used to determine whether to update this CR or not.
Expand Down Expand Up @@ -212,14 +213,74 @@ func (r *RayServiceReconciler) Reconcile(ctx context.Context, request ctrl.Reque
}

// Final status update for any CR modification.
if errStatus := r.Status().Update(ctx, rayServiceInstance); errStatus != nil {
logger.Error(errStatus, "Fail to update status of RayService", "rayServiceInstance", rayServiceInstance)
return ctrl.Result{RequeueAfter: ServiceDefaultRequeueDuration}, err
if r.inconsistentRayServiceStatuses(originalRayServiceInstance.Status, rayServiceInstance.Status) {
if errStatus := r.Status().Update(ctx, rayServiceInstance); errStatus != nil {
logger.Error(errStatus, "Failed to update RayService status", "rayServiceInstance", rayServiceInstance)
return ctrl.Result{RequeueAfter: ServiceDefaultRequeueDuration}, err
}
}

return ctrl.Result{RequeueAfter: ServiceDefaultRequeueDuration}, nil
}

// Checks whether the old and new RayServiceStatus are inconsistent by comparing different fields.
// If the only differences between the old and new status are the LastUpdateTime and HealthLastUpdateTime fields,
// the status update will not be triggered.
// The RayClusterStatus field is only for observability in RayService CR, and changes to it will not trigger the status update.
func (r *RayServiceReconciler) inconsistentRayServiceStatus(oldStatus rayv1alpha1.RayServiceStatus, newStatus rayv1alpha1.RayServiceStatus) bool {
if oldStatus.RayClusterName != newStatus.RayClusterName {
r.Log.Info(fmt.Sprintf("inconsistentRayServiceStatus RayService RayClusterName changed from %s to %s", oldStatus.RayClusterName, newStatus.RayClusterName))
return true
}

if oldStatus.DashboardStatus.IsHealthy != newStatus.DashboardStatus.IsHealthy {
r.Log.Info(fmt.Sprintf("inconsistentRayServiceStatus RayService DashboardStatus changed from %v to %v", oldStatus.DashboardStatus, newStatus.DashboardStatus))
return true
}

if oldStatus.ApplicationStatus.Status != newStatus.ApplicationStatus.Status ||
oldStatus.ApplicationStatus.Message != newStatus.ApplicationStatus.Message {
r.Log.Info(fmt.Sprintf("inconsistentRayServiceStatus RayService ApplicationStatus changed from %v to %v", oldStatus.ApplicationStatus, newStatus.ApplicationStatus))
return true
}

if len(oldStatus.ServeStatuses) != len(newStatus.ServeStatuses) {
r.Log.Info(fmt.Sprintf("inconsistentRayServiceStatus RayService number of ServeStatus changed from %v to %v", len(oldStatus.ServeStatuses), len(newStatus.ServeStatuses)))
return true
}

for i := 0; i < len(oldStatus.ServeStatuses); i++ {
if oldStatus.ServeStatuses[i].Name != newStatus.ServeStatuses[i].Name ||
oldStatus.ServeStatuses[i].Status != newStatus.ServeStatuses[i].Status ||
oldStatus.ServeStatuses[i].Message != newStatus.ServeStatuses[i].Message {
r.Log.Info(fmt.Sprintf("inconsistentRayServiceStatus RayService ServeDeploymentStatus changed from %v to %v", oldStatus.ServeStatuses[i], newStatus.ServeStatuses[i]))
return true
}
}

return false
}

// Determine whether to update the status of the RayService instance.
func (r *RayServiceReconciler) inconsistentRayServiceStatuses(oldStatus rayv1alpha1.RayServiceStatuses, newStatus rayv1alpha1.RayServiceStatuses) bool {
if oldStatus.ServiceStatus != newStatus.ServiceStatus {
r.Log.Info(fmt.Sprintf("inconsistentRayServiceStatus RayService ServiceStatus changed from %s to %s", oldStatus.ServiceStatus, newStatus.ServiceStatus))
return true
}

if r.inconsistentRayServiceStatus(oldStatus.ActiveServiceStatus, newStatus.ActiveServiceStatus) {
r.Log.Info("inconsistentRayServiceStatus RayService ActiveServiceStatus changed")
return true
}

if r.inconsistentRayServiceStatus(oldStatus.PendingServiceStatus, newStatus.PendingServiceStatus) {
r.Log.Info("inconsistentRayServiceStatus RayService PendingServiceStatus changed")
return true
}

return false
}

// SetupWithManager sets up the controller with the Manager.
func (r *RayServiceReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
Expand Down
88 changes: 88 additions & 0 deletions ray-operator/controllers/ray/rayservice_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,94 @@ var _ = Context("Inside the default namespace", func() {
time.Second*15, time.Millisecond*500).Should(Equal(initialPendingClusterName), "New active RayCluster name = %v", myRayService.Status.ActiveServiceStatus.RayClusterName)
})

It("Status should be updated if the differences are not only LastUpdateTime and HealthLastUpdateTime fields.", func() {
// Make sure (1) Dashboard client is healthy (2) All the three Ray Serve deployments in the active RayCluster are HEALTHY.
initialClusterName, _ := getRayClusterNameFunc(ctx, myRayService)()
Eventually(
checkServiceHealth(ctx, myRayService),
time.Second*3, time.Millisecond*500).Should(BeTrue(), "myRayService status = %v", myRayService.Status)

// ServiceUnhealthySecondThreshold is a global variable in rayservice_controller.go.
// If the time elapsed since the last update of the service HEALTHY status exceeds ServiceUnhealthySecondThreshold seconds,
// the RayService controller will consider the active RayCluster as unhealthy and prepare a new RayCluster.
orignalServeDeploymentUnhealthySecondThreshold := ServiceUnhealthySecondThreshold
ServiceUnhealthySecondThreshold = 500

// Only update the LastUpdateTime and HealthLastUpdateTime fields in the active RayCluster.
oldTime := myRayService.Status.ActiveServiceStatus.ServeStatuses[0].HealthLastUpdateTime.DeepCopy()
newTime := oldTime.Add(time.Duration(5) * time.Minute) // 300 seconds
fakeRayDashboardClient.SetServeStatus(generateServeStatus(metav1.NewTime(newTime), "UNHEALTHY"))

// Confirm not switch to a new RayCluster because ServiceUnhealthySecondThreshold is 500 seconds.
Consistently(
getRayClusterNameFunc(ctx, myRayService),
time.Second*3, time.Millisecond*500).Should(Equal(initialClusterName), "Active RayCluster name = %v", myRayService.Status.ActiveServiceStatus.RayClusterName)

// Check if all the ServeStatuses[i].Status are UNHEALTHY.
checkAllServeStatusesUnhealthy := func(ctx context.Context, rayService *rayiov1alpha1.RayService) bool {
if err := k8sClient.Get(ctx, client.ObjectKey{Name: rayService.Name, Namespace: rayService.Namespace}, rayService); err != nil {
return false
}
for _, serveStatus := range rayService.Status.ActiveServiceStatus.ServeStatuses {
if serveStatus.Status != "UNHEALTHY" {
return false
}
}
return true
}

// The status update not only includes the LastUpdateTime and HealthLastUpdateTime fields, but also the ServeStatuses[i].Status field.
// Hence, all the ServeStatuses[i].Status should be updated to UNHEALTHY.
//
// Note: LastUpdateTime/HealthLastUpdateTime will be overwritten via metav1.Now() in rayservice_controller.go.
// Hence, we cannot use `newTime`` to check whether the status is updated or not.
Eventually(
checkAllServeStatusesUnhealthy(ctx, myRayService),
time.Second*3, time.Millisecond*500).Should(BeTrue(), "myRayService status = %v", myRayService.Status)

fakeRayDashboardClient.SetServeStatus(generateServeStatus(metav1.NewTime(newTime), "HEALTHY"))

// Confirm not switch to a new RayCluster because ServiceUnhealthySecondThreshold is 500 seconds.
Consistently(
getRayClusterNameFunc(ctx, myRayService),
time.Second*3, time.Millisecond*500).Should(Equal(initialClusterName), "Active RayCluster name = %v", myRayService.Status.ActiveServiceStatus.RayClusterName)

// The status update not only includes the LastUpdateTime and HealthLastUpdateTime fields, but also the ServeStatuses[i].Status field.
// Hence, the status should be updated.
Eventually(
checkServiceHealth(ctx, myRayService),
time.Second*3, time.Millisecond*500).Should(BeTrue(), "myRayService status = %v", myRayService.Status)
ServiceUnhealthySecondThreshold = orignalServeDeploymentUnhealthySecondThreshold
})

It("Status should not be updated if the only differences are the LastUpdateTime and HealthLastUpdateTime fields.", func() {
// Make sure (1) Dashboard client is healthy (2) All the three Ray Serve deployments in the active RayCluster are HEALTHY.
initialClusterName, _ := getRayClusterNameFunc(ctx, myRayService)()
Eventually(
checkServiceHealth(ctx, myRayService),
time.Second*3, time.Millisecond*500).Should(BeTrue(), "myRayService status = %v", myRayService.Status)

// Only update the LastUpdateTime and HealthLastUpdateTime fields in the active RayCluster.
oldTime := myRayService.Status.ActiveServiceStatus.ServeStatuses[0].HealthLastUpdateTime.DeepCopy()
newTime := oldTime.Add(time.Duration(5) * time.Minute) // 300 seconds
fakeRayDashboardClient.SetServeStatus(generateServeStatus(metav1.NewTime(newTime), "HEALTHY"))

// Confirm not switch to a new RayCluster
Consistently(
getRayClusterNameFunc(ctx, myRayService),
time.Second*3, time.Millisecond*500).Should(Equal(initialClusterName), "Active RayCluster name = %v", myRayService.Status.ActiveServiceStatus.RayClusterName)

// The status is still the same as before.
Eventually(
checkServiceHealth(ctx, myRayService),
time.Second*3, time.Millisecond*500).Should(BeTrue(), "myRayService status = %v", myRayService.Status)

// Status should not be updated if the only differences are the LastUpdateTime and HealthLastUpdateTime fields.
// Unlike the test "Status should be updated if the differences are not only LastUpdateTime and HealthLastUpdateTime fields.",
// the status update will not be triggered, so we can check whether the LastUpdateTime/HealthLastUpdateTime fields are updated or not by `oldTime`.
Expect(myRayService.Status.ActiveServiceStatus.ServeStatuses[0].HealthLastUpdateTime).Should(Equal(oldTime), "myRayService status = %v", myRayService.Status)
})

It("Update workerGroup.replicas in RayService and should not switch to new Ray Cluster", func() {
// Certain field updates should not trigger new RayCluster preparation, such as updates
// to `Replicas` and `WorkersToDelete` triggered by the autoscaler during scaling up/down.
Expand Down
114 changes: 114 additions & 0 deletions ray-operator/controllers/ray/rayservice_controller_unit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ import (
"github.com/ray-project/kuberay/ray-operator/apis/ray/v1alpha1"
"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/pointer"
ctrl "sigs.k8s.io/controller-runtime"
)

func TestGenerateRayClusterJsonHash(t *testing.T) {
Expand Down Expand Up @@ -60,3 +62,115 @@ func TestCompareRayClusterJsonHash(t *testing.T) {
assert.Nil(t, err)
assert.True(t, equal)
}

func TestInconsistentRayServiceStatuses(t *testing.T) {
r := &RayServiceReconciler{
Log: ctrl.Log.WithName("controllers").WithName("RayService"),
}

timeNow := metav1.Now()
oldStatus := v1alpha1.RayServiceStatuses{
ActiveServiceStatus: v1alpha1.RayServiceStatus{
RayClusterName: "new-cluster",
DashboardStatus: v1alpha1.DashboardStatus{
IsHealthy: true,
LastUpdateTime: &timeNow,
HealthLastUpdateTime: &timeNow,
},
ApplicationStatus: v1alpha1.AppStatus{
Status: "running",
Message: "OK",
LastUpdateTime: &timeNow,
HealthLastUpdateTime: &timeNow,
},
ServeStatuses: []v1alpha1.ServeDeploymentStatus{
{
Name: "serve-1",
Status: "unhealthy",
Message: "error",
LastUpdateTime: &timeNow,
HealthLastUpdateTime: &timeNow,
},
},
},
PendingServiceStatus: v1alpha1.RayServiceStatus{
RayClusterName: "old-cluster",
DashboardStatus: v1alpha1.DashboardStatus{
IsHealthy: true,
LastUpdateTime: &timeNow,
HealthLastUpdateTime: &timeNow,
},
ApplicationStatus: v1alpha1.AppStatus{
Status: "stopped",
Message: "stopped",
LastUpdateTime: &timeNow,
HealthLastUpdateTime: &timeNow,
},
ServeStatuses: []v1alpha1.ServeDeploymentStatus{
{
Name: "serve-1",
Status: "healthy",
Message: "Serve is healthy",
LastUpdateTime: &timeNow,
HealthLastUpdateTime: &timeNow,
},
},
},
ServiceStatus: v1alpha1.WaitForDashboard,
}

// Test 1: Update ServiceStatus only.
newStatus := oldStatus.DeepCopy()
newStatus.ServiceStatus = v1alpha1.WaitForServeDeploymentReady
assert.True(t, r.inconsistentRayServiceStatuses(oldStatus, *newStatus))

// Test 2: Test RayServiceStatus
newStatus = oldStatus.DeepCopy()
newStatus.ActiveServiceStatus.DashboardStatus.LastUpdateTime = &metav1.Time{Time: timeNow.Add(1)}
assert.False(t, r.inconsistentRayServiceStatuses(oldStatus, *newStatus))

newStatus.ActiveServiceStatus.DashboardStatus.IsHealthy = !oldStatus.ActiveServiceStatus.DashboardStatus.IsHealthy
assert.True(t, r.inconsistentRayServiceStatuses(oldStatus, *newStatus))
}

func TestInconsistentRayServiceStatus(t *testing.T) {
timeNow := metav1.Now()
oldStatus := v1alpha1.RayServiceStatus{
RayClusterName: "cluster-1",
DashboardStatus: v1alpha1.DashboardStatus{
IsHealthy: true,
LastUpdateTime: &timeNow,
HealthLastUpdateTime: &timeNow,
},
ApplicationStatus: v1alpha1.AppStatus{
Status: "running",
Message: "Application is running",
LastUpdateTime: &timeNow,
HealthLastUpdateTime: &timeNow,
},
ServeStatuses: []v1alpha1.ServeDeploymentStatus{
{
Name: "serve-1",
Status: "healthy",
Message: "Serve is healthy",
LastUpdateTime: &timeNow,
HealthLastUpdateTime: &timeNow,
},
},
}

r := &RayServiceReconciler{
Log: ctrl.Log.WithName("controllers").WithName("RayService"),
}

// Test 1: Only LastUpdateTime and HealthLastUpdateTime are updated.
newStatus := oldStatus.DeepCopy()
newStatus.DashboardStatus.LastUpdateTime = &metav1.Time{Time: timeNow.Add(1)}
assert.False(t, r.inconsistentRayServiceStatus(oldStatus, *newStatus))

// Test 2: Not only LastUpdateTime and HealthLastUpdateTime are updated.
newStatus = oldStatus.DeepCopy()
newStatus.DashboardStatus.LastUpdateTime = &metav1.Time{Time: timeNow.Add(1)}
newStatus.DashboardStatus.IsHealthy = !oldStatus.DashboardStatus.IsHealthy
assert.True(t, r.inconsistentRayServiceStatus(oldStatus, *newStatus))
}

0 comments on commit 795db0d

Please sign in to comment.