Skip to content

Commit

Permalink
Fix a potential reconcile issue for RayService and allow config unhea…
Browse files Browse the repository at this point in the history
…lth time threshold in CR (#384)

* Improve

* update

* Improve unhealthy to read spec

* fix go mod

* fix lint

* fix lint
  • Loading branch information
brucez-anyscale authored Jul 16, 2022
1 parent 9de2fb0 commit 781eed1
Show file tree
Hide file tree
Showing 7 changed files with 55 additions and 28 deletions.
6 changes: 4 additions & 2 deletions ray-operator/apis/ray/v1alpha1/rayservice_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ const (
// RayServiceSpec defines the desired state of RayService
type RayServiceSpec struct {
// Important: Run "make" to regenerate code after modifying this file
ServeDeploymentGraphSpec ServeDeploymentGraphSpec `json:"serveDeploymentGraphConfig,omitempty"`
RayClusterSpec RayClusterSpec `json:"rayClusterConfig,omitempty"`
ServeDeploymentGraphSpec ServeDeploymentGraphSpec `json:"serveDeploymentGraphConfig,omitempty"`
RayClusterSpec RayClusterSpec `json:"rayClusterConfig,omitempty"`
ServiceUnhealthySecondThreshold *int32 `json:"serviceUnhealthySecondThreshold,omitempty"`
DeploymentUnhealthySecondThreshold *int32 `json:"deploymentUnhealthySecondThreshold,omitempty"`
}

type ServeDeploymentGraphSpec struct {
Expand Down
10 changes: 10 additions & 0 deletions ray-operator/apis/ray/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions ray-operator/config/crd/bases/ray.io_rayservices.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ spec:
spec:
description: RayServiceSpec defines the desired state of RayService
properties:
deploymentUnhealthySecondThreshold:
format: int32
type: integer
rayClusterConfig:
description: 'EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN!
NOTE: json tags are required.'
Expand Down Expand Up @@ -11538,6 +11541,9 @@ spec:
required:
- importPath
type: object
serviceUnhealthySecondThreshold:
format: int32
type: integer
type: object
status:
description: RayServiceStatuses defines the observed state of RayService
Expand Down
52 changes: 32 additions & 20 deletions ray-operator/controllers/ray/rayservice_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,14 @@ import (

var (
// This variable is mutable for unit testing purpose.
ServeDeploymentUnhealthySecondThreshold = 60.0
ServiceUnhealthySecondThreshold = 60.0 // Serve deployment related health check.
)

const (
ServiceDefaultRequeueDuration = 2 * time.Second
ServiceRestartRequeueDuration = 10 * time.Second
DashboardUnhealthySecondThreshold = 60.0
servicePortName = "dashboard-agent"
ServiceDefaultRequeueDuration = 2 * time.Second
ServiceRestartRequeueDuration = 10 * time.Second
DeploymentUnhealthySecondThreshold = 60.0 // Dashboard agent related health check.
servicePortName = "dashboard-agent"
)

// RayServiceReconciler reconciles a RayService object
Expand Down Expand Up @@ -109,7 +109,7 @@ func (r *RayServiceReconciler) Reconcile(ctx context.Context, request ctrl.Reque
var pendingRayClusterInstance *rayv1alpha1.RayCluster
if activeRayClusterInstance, pendingRayClusterInstance, err = r.reconcileRayCluster(ctx, rayServiceInstance); err != nil {
err = r.updateState(ctx, rayServiceInstance, rayv1alpha1.FailedToGetOrCreateRayCluster, err)
return ctrl.Result{}, client.IgnoreNotFound(err)
return ctrl.Result{RequeueAfter: ServiceDefaultRequeueDuration}, client.IgnoreNotFound(err)
}

// Check if we need to create pending RayCluster.
Expand Down Expand Up @@ -511,7 +511,12 @@ func (r *RayServiceReconciler) updateServeDeployment(rayServiceInstance *rayv1al
}

// getAndCheckServeStatus get app and serve deployments statuses, update the health timestamp and check if RayCluster is overall healthy.
func (r *RayServiceReconciler) getAndCheckServeStatus(dashboardClient utils.RayDashboardClientInterface, rayServiceServeStatus *rayv1alpha1.RayServiceStatus) (bool, error) {
func (r *RayServiceReconciler) getAndCheckServeStatus(dashboardClient utils.RayDashboardClientInterface, rayServiceServeStatus *rayv1alpha1.RayServiceStatus, unhealthySecondThreshold *int32) (bool, error) {
serviceUnhealthySecondThreshold := ServiceUnhealthySecondThreshold
if unhealthySecondThreshold != nil {
serviceUnhealthySecondThreshold = float64(*unhealthySecondThreshold)
}

var serveStatuses *utils.ServeDeploymentStatuses
var err error
if serveStatuses, err = dashboardClient.GetDeploymentsStatus(); err != nil {
Expand All @@ -536,7 +541,7 @@ func (r *RayServiceReconciler) getAndCheckServeStatus(dashboardClient utils.RayD
if prevStatus.Status != "HEALTHY" {
serveStatuses.DeploymentStatuses[i].HealthLastUpdateTime = prevStatus.HealthLastUpdateTime

if prevStatus.HealthLastUpdateTime != nil && time.Since(prevStatus.HealthLastUpdateTime.Time).Seconds() > ServeDeploymentUnhealthySecondThreshold {
if prevStatus.HealthLastUpdateTime != nil && time.Since(prevStatus.HealthLastUpdateTime.Time).Seconds() > serviceUnhealthySecondThreshold {
isHealthy = false
}
}
Expand All @@ -552,7 +557,7 @@ func (r *RayServiceReconciler) getAndCheckServeStatus(dashboardClient utils.RayD
if rayServiceServeStatus.ApplicationStatus.Status != "HEALTHY" {
serveStatuses.ApplicationStatus.HealthLastUpdateTime = rayServiceServeStatus.ApplicationStatus.HealthLastUpdateTime

if rayServiceServeStatus.ApplicationStatus.HealthLastUpdateTime != nil && time.Since(rayServiceServeStatus.ApplicationStatus.HealthLastUpdateTime.Time).Seconds() > ServeDeploymentUnhealthySecondThreshold {
if rayServiceServeStatus.ApplicationStatus.HealthLastUpdateTime != nil && time.Since(rayServiceServeStatus.ApplicationStatus.HealthLastUpdateTime.Time).Seconds() > serviceUnhealthySecondThreshold {
isHealthy = false
}
}
Expand Down Expand Up @@ -592,15 +597,19 @@ func (r *RayServiceReconciler) generateConfigKeyPrefix(rayServiceInstance *rayv1
}

// Return true if healthy, otherwise false.
func (r *RayServiceReconciler) updateAndCheckDashboardStatus(rayServiceClusterStatus *rayv1alpha1.RayServiceStatus, isHealthy bool) bool {
func (r *RayServiceReconciler) updateAndCheckDashboardStatus(rayServiceClusterStatus *rayv1alpha1.RayServiceStatus, isHealthy bool, unhealthyThreshold *int32) bool {
timeNow := metav1.Now()
rayServiceClusterStatus.DashboardStatus.LastUpdateTime = &timeNow
rayServiceClusterStatus.DashboardStatus.IsHealthy = isHealthy
if rayServiceClusterStatus.DashboardStatus.HealthLastUpdateTime.IsZero() || isHealthy {
rayServiceClusterStatus.DashboardStatus.HealthLastUpdateTime = &timeNow
}

return time.Since(rayServiceClusterStatus.DashboardStatus.HealthLastUpdateTime.Time).Seconds() <= DashboardUnhealthySecondThreshold
deploymentUnhealthySecondThreshold := DeploymentUnhealthySecondThreshold
if unhealthyThreshold != nil {
deploymentUnhealthySecondThreshold = float64(*unhealthyThreshold)
}
return time.Since(rayServiceClusterStatus.DashboardStatus.HealthLastUpdateTime.Time).Seconds() <= deploymentUnhealthySecondThreshold
}

func (r *RayServiceReconciler) markRestart(rayServiceInstance *rayv1alpha1.RayService) {
Expand Down Expand Up @@ -722,20 +731,20 @@ func (r *RayServiceReconciler) updateStatusForActiveCluster(ctx context.Context,
rayServiceStatus := &rayServiceInstance.Status.ActiveServiceStatus

if clientURL, err = r.fetchDashboardAgentURL(ctx, rayClusterInstance); err != nil || clientURL == "" {
r.updateAndCheckDashboardStatus(rayServiceStatus, false)
r.updateAndCheckDashboardStatus(rayServiceStatus, false, rayServiceInstance.Spec.DeploymentUnhealthySecondThreshold)
return err
}

rayDashboardClient := utils.GetRayDashboardClientFunc()
rayDashboardClient.InitClient(clientURL)

var isHealthy bool
if isHealthy, err = r.getAndCheckServeStatus(rayDashboardClient, rayServiceStatus); err != nil {
r.updateAndCheckDashboardStatus(rayServiceStatus, false)
if isHealthy, err = r.getAndCheckServeStatus(rayDashboardClient, rayServiceStatus, rayServiceInstance.Spec.ServiceUnhealthySecondThreshold); err != nil {
r.updateAndCheckDashboardStatus(rayServiceStatus, false, rayServiceInstance.Spec.DeploymentUnhealthySecondThreshold)
return err
}

r.updateAndCheckDashboardStatus(rayServiceStatus, true)
r.updateAndCheckDashboardStatus(rayServiceStatus, true, rayServiceInstance.Spec.DeploymentUnhealthySecondThreshold)

logger.Info("Check serve health", "isHealthy", isHealthy)

Expand All @@ -756,7 +765,7 @@ func (r *RayServiceReconciler) reconcileServe(ctx context.Context, rayServiceIns
}

if clientURL, err = r.fetchDashboardAgentURL(ctx, rayClusterInstance); err != nil || clientURL == "" {
if !r.updateAndCheckDashboardStatus(rayServiceStatus, false) {
if !r.updateAndCheckDashboardStatus(rayServiceStatus, false, rayServiceInstance.Spec.DeploymentUnhealthySecondThreshold) {
logger.Info("Dashboard is unhealthy, restart the cluster.")
r.markRestart(rayServiceInstance)
}
Expand All @@ -771,7 +780,7 @@ func (r *RayServiceReconciler) reconcileServe(ctx context.Context, rayServiceIns

if shouldUpdate {
if err = r.updateServeDeployment(rayServiceInstance, rayDashboardClient, rayClusterInstance.Name); err != nil {
if !r.updateAndCheckDashboardStatus(rayServiceStatus, false) {
if !r.updateAndCheckDashboardStatus(rayServiceStatus, false, rayServiceInstance.Spec.DeploymentUnhealthySecondThreshold) {
logger.Info("Dashboard is unhealthy, restart the cluster.")
r.markRestart(rayServiceInstance)
}
Expand All @@ -784,16 +793,16 @@ func (r *RayServiceReconciler) reconcileServe(ctx context.Context, rayServiceIns
}

var isHealthy bool
if isHealthy, err = r.getAndCheckServeStatus(rayDashboardClient, rayServiceStatus); err != nil {
if !r.updateAndCheckDashboardStatus(rayServiceStatus, false) {
if isHealthy, err = r.getAndCheckServeStatus(rayDashboardClient, rayServiceStatus, nil); err != nil {
if !r.updateAndCheckDashboardStatus(rayServiceStatus, false, rayServiceInstance.Spec.DeploymentUnhealthySecondThreshold) {
logger.Info("Dashboard is unhealthy, restart the cluster.")
r.markRestart(rayServiceInstance)
}
err = r.updateState(ctx, rayServiceInstance, rayv1alpha1.FailedToGetServeDeploymentStatus, err)
return ctrl.Result{}, false, err
}

r.updateAndCheckDashboardStatus(rayServiceStatus, true)
r.updateAndCheckDashboardStatus(rayServiceStatus, true, rayServiceInstance.Spec.DeploymentUnhealthySecondThreshold)

logger.Info("Check serve health", "isHealthy", isHealthy, "isActive", isActive)

Expand Down Expand Up @@ -832,6 +841,9 @@ func (r *RayServiceReconciler) labelHealthyServePods(ctx context.Context, rayClu
httpProxyClient.InitClient()
for _, pod := range allPods.Items {
httpProxyClient.SetHostIp(pod.Status.PodIP)
if pod.Labels == nil {
pod.Labels = make(map[string]string)
}
if httpProxyClient.CheckHealth() == nil {
pod.Labels[common.RayClusterServingServiceLabelKey] = common.EnableRayClusterServingServiceTrue
} else {
Expand Down
6 changes: 3 additions & 3 deletions ray-operator/controllers/ray/rayservice_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,15 +316,15 @@ var _ = Context("Inside the default namespace", func() {

It("Should detect unhealthy status and try to switch to new RayCluster.", func() {
// Set a wrong serve status with unhealthy.
orignialServeDeploymentUnhealthySecondThreshold := ServeDeploymentUnhealthySecondThreshold
ServeDeploymentUnhealthySecondThreshold = 5
orignialServeDeploymentUnhealthySecondThreshold := ServiceUnhealthySecondThreshold
ServiceUnhealthySecondThreshold = 5
fakeRayDashboardClient.SetServeStatus(generateServeStatus(metav1.NewTime(time.Now().Add(time.Duration(-5)*time.Minute)), "UNHEALTHY"))

Eventually(
getPreparingRayClusterNameFunc(ctx, myRayService),
time.Second*60, time.Millisecond*500).Should(Not(BeEmpty()), "My new RayCluster name = %v", myRayService.Status.PendingServiceStatus.RayClusterName)

ServeDeploymentUnhealthySecondThreshold = orignialServeDeploymentUnhealthySecondThreshold
ServiceUnhealthySecondThreshold = orignialServeDeploymentUnhealthySecondThreshold
pendingRayClusterName := myRayService.Status.PendingServiceStatus.RayClusterName
fakeRayDashboardClient.SetServeStatus(generateServeStatus(metav1.Now(), "HEALTHY"))

Expand Down
1 change: 0 additions & 1 deletion ray-operator/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ go 1.17

require (
github.com/go-logr/logr v1.2.0
github.com/mitchellh/hashstructure/v2 v2.0.2
github.com/onsi/ginkgo v1.16.5
github.com/onsi/gomega v1.17.0
github.com/orcaman/concurrent-map v1.0.0
Expand Down
2 changes: 0 additions & 2 deletions ray-operator/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -329,8 +329,6 @@ github.com/mitchellh/go-homedir v1.0.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrk
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
github.com/mitchellh/go-testing-interface v1.0.0/go.mod h1:kRemZodwjscx+RGhAo8eIhFbs2+BFgRtFPeD/KE+zxI=
github.com/mitchellh/gox v0.4.0/go.mod h1:Sd9lOJ0+aimLBi73mGofS1ycjY8lL3uZM3JPS42BGNg=
github.com/mitchellh/hashstructure/v2 v2.0.2 h1:vGKWl0YJqUNxE8d+h8f6NJLcCJrgbhC4NcD46KavDd4=
github.com/mitchellh/hashstructure/v2 v2.0.2/go.mod h1:MG3aRVU/N29oo/V/IhBX8GR/zz4kQkprJgF2EVszyDE=
github.com/mitchellh/iochan v1.0.0/go.mod h1:JwYml1nuB7xOzsp52dPpHFffvOCDupsG0QubkSMEySY=
github.com/mitchellh/mapstructure v0.0.0-20160808181253-ca63d7c062ee/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
Expand Down

0 comments on commit 781eed1

Please sign in to comment.