Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RayService] Allow updating WorkerGroupSpecs without rolling out new cluster #1734

Merged
merged 25 commits into from
Dec 26, 2023
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
534357d
Add ClusterAction for active and pending cluster
Dec 11, 2023
3637689
Merge branch 'master' of https://github.com/ray-project/kuberay into …
Dec 11, 2023
3b3ca3f
Fix unit tests
Dec 11, 2023
19db35a
Fix unit test
Dec 11, 2023
1f14e45
Update spec before calling k8s client update
Dec 12, 2023
79fb7c7
Fix TestGetClusterAction
Dec 12, 2023
c189a68
Pull RayCluster before updating
Dec 12, 2023
1ec5b4d
Update ray-operator/controllers/ray/rayservice_controller.go
architkulkarni Dec 12, 2023
33622cd
Lint
Dec 12, 2023
47a4e4c
Merge branch 'worker-group' of https://github.com/ray-project/kuberay…
Dec 12, 2023
59bb53b
Add integration test for active RayCluster in `rayservice_controller_…
Dec 12, 2023
ff7c56b
Add test for updating pending cluster in `rayservice_controller_test.go`
Dec 12, 2023
0520aa4
Don't print the whole RayCluster
Dec 14, 2023
e77a72d
Use `getRayClusterByNamespacedName`
Dec 14, 2023
b91e451
return DoNothing if failed to serialize
Dec 14, 2023
6576d73
Only update if workergroups are the same and new ones appended
Dec 19, 2023
d76c904
Fix unit tests
Dec 19, 2023
ce83a1f
Remove redundant hash calculation
Dec 20, 2023
16aa1ec
Add log for number of worker groups
Dec 20, 2023
0517a10
Fix Ray Cluster -> RayCluster
Dec 20, 2023
e62ac34
Change snake_case to CamelCase in hash function
Dec 20, 2023
65774d7
Delete unnecessary and broken test for updating minreplicas
Dec 26, 2023
cb394c4
Use oldNumWorkerGroupSpecs + 1 instead of hardcoding 2 in unit test
Dec 26, 2023
8136580
Use oldNumWorkerGroupSpecs + 1 in remaining unit test
Dec 26, 2023
8b0056b
Fix snake case
architkulkarni Dec 26, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
221 changes: 181 additions & 40 deletions ray-operator/controllers/ray/rayservice_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"os"
"reflect"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -364,19 +365,32 @@ func (r *RayServiceReconciler) reconcileRayCluster(ctx context.Context, rayServi
return nil, nil, err
}

if r.shouldPrepareNewRayCluster(rayServiceInstance, activeRayCluster) {
clusterAction := r.shouldPrepareNewRayCluster(rayServiceInstance, activeRayCluster)
if clusterAction == RolloutNew {
// For LLM serving, some users might not have sufficient GPU resources to run two RayClusters simultaneously.
// Therefore, KubeRay offers ENABLE_ZERO_DOWNTIME as a feature flag for zero-downtime upgrades.
enableZeroDowntime := true
if s := os.Getenv(ENABLE_ZERO_DOWNTIME); strings.ToLower(s) == "false" {
enableZeroDowntime = false
}
if enableZeroDowntime || !enableZeroDowntime && activeRayCluster == nil {
r.markRestart(rayServiceInstance)
// Add a pending cluster name. In the next reconcile loop, shouldPrepareNewRayCluster will return DoNothing and we will
// actually create the pending RayCluster instance.
r.markRestartAndAddPendingClusterName(rayServiceInstance)
} else {
r.Log.Info("Zero-downtime upgrade is disabled (ENABLE_ZERO_DOWNTIME: false). Skip preparing a new RayCluster.")
}
return activeRayCluster, nil, nil
} else if clusterAction == Update {
// Update the active cluster.
r.Log.Info("Updating the active RayCluster instance.")
if activeRayCluster, err = r.constructRayClusterForRayService(rayServiceInstance, activeRayCluster.Name); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will happen if there is inconsistency between the RayCluster and RayService's RayClusterSpec? For example, what if a worker group's replica count is set to 0 in RayClusterSpec, but the Ray Autoscaler has already scaled it up to 10? Updating the RayCluster may cause a lot of running Pods to be killed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't do any special handling for this case. So that means in this case, the user-specified replica count will take precedence over the Ray Autoscaler's case.

One alternative would be to never update the replicas and workerstodelete when updating the RayCluster. The downside is that the user can never override replicas.

My guess is that the current approach (the first approach) is better, because the user should always have a way to set replicas, and this inconsistent case is an edge case, not the common case. But what are your thoughts?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My guess is that the current approach (the first approach) is better

From users' perspectives, it is much better to disregard the users' settings regarding replicas than to delete a Pod that has any running Ray task or actor.

On second thought, if Ray Autoscaling is enabled, Ray Autoscaler is the only decision maker to delete Ray Pods after #1253. Hence, users can increase the number of Pods, but can't delete Pods by updating replicas.

the user should always have a way to set replicas,

Users can directly update RayCluster. In addition, setting replicas for existing worker groups is not common. Most RayService users use Ray Autoscaler as well. If users still need to manually update replicas with Ray Autoscaling, Ray Autoscaler needs to be improved.

return nil, nil, err
}
if err := r.updateRayClusterInstance(ctx, activeRayCluster); err != nil {
return nil, nil, err
}
return activeRayCluster, nil, nil
}

if pendingRayCluster, err = r.createRayClusterInstanceIfNeeded(ctx, rayServiceInstance, pendingRayCluster); err != nil {
Expand Down Expand Up @@ -468,67 +482,160 @@ func (r *RayServiceReconciler) cleanUpServeConfigCache(rayServiceInstance *rayv1
}
}

type ClusterAction int

const (
DoNothing ClusterAction = iota // value 0
Update // value 1
RolloutNew // value 2
)

// shouldPrepareNewRayCluster checks if we need to generate a new pending cluster.
func (r *RayServiceReconciler) shouldPrepareNewRayCluster(rayServiceInstance *rayv1.RayService, activeRayCluster *rayv1.RayCluster) bool {
func (r *RayServiceReconciler) shouldPrepareNewRayCluster(rayServiceInstance *rayv1.RayService, activeRayCluster *rayv1.RayCluster) ClusterAction {
// Prepare new RayCluster if:
// 1. No active cluster and no pending cluster
// 2. No pending cluster, and the active RayCluster has changed.
if rayServiceInstance.Status.PendingServiceStatus.RayClusterName == "" {
if activeRayCluster == nil {
r.Log.Info("No active Ray cluster. RayService operator should prepare a new Ray cluster.")
return true
return RolloutNew
}
activeClusterHash := activeRayCluster.ObjectMeta.Annotations[utils.RayServiceClusterHashKey]
goalClusterHash, err := generateRayClusterJsonHash(rayServiceInstance.Spec.RayClusterSpec)

// Case 1: If everything is identical except for the Replicas and WorkersToDelete of
// each WorkerGroup, then do nothing.
activeClusterHash := activeRayCluster.ObjectMeta.Annotations[utils.HashWithoutReplicasAndWorkersToDeleteKey]
goalClusterHash, err := generateHashWithoutReplicasAndWorkersToDelete(rayServiceInstance.Spec.RayClusterSpec)
errContextFailedToSerialize := "Failed to serialize new RayCluster config. " +
"Manual config updates will NOT be tracked accurately. " +
"Please manually tear down the cluster and apply a new config."
if err != nil {
errContext := "Failed to serialize new RayCluster config. " +
"Manual config updates will NOT be tracked accurately. " +
"Please manually tear down the cluster and apply a new config."
r.Log.Error(err, errContext)
return true
r.Log.Error(err, errContextFailedToSerialize)
return DoNothing
}

if activeClusterHash != goalClusterHash {
r.Log.Info("Active RayCluster config doesn't match goal config. " +
"RayService operator should prepare a new Ray cluster.\n" +
"* Active RayCluster config hash: " + activeClusterHash + "\n" +
"* Goal RayCluster config hash: " + goalClusterHash)
} else {
r.Log.Info("Active Ray cluster config matches goal config.")
if activeClusterHash == goalClusterHash {
r.Log.Info("Active Ray cluster config matches goal config. No need to update RayCluster.")
return DoNothing
}

// Case 2: Otherwise, if everything is identical except for the Replicas and WorkersToDelete of
// the existing workergroups, and one or more new workergroups are added at the end, then update the cluster.

activeClusterHash = activeRayCluster.ObjectMeta.Annotations[utils.HashWithoutReplicasAndWorkersToDeleteKey]
kevin85421 marked this conversation as resolved.
Show resolved Hide resolved
activeClusterNumWorkerGroups, err := strconv.Atoi(activeRayCluster.ObjectMeta.Annotations[utils.NumWorkerGroupsKey])
if err != nil {
r.Log.Error(err, errContextFailedToSerialize)
kevin85421 marked this conversation as resolved.
Show resolved Hide resolved
return DoNothing
}
goalNumWorkerGroups := len(rayServiceInstance.Spec.RayClusterSpec.WorkerGroupSpecs)

kevin85421 marked this conversation as resolved.
Show resolved Hide resolved
if goalNumWorkerGroups > activeClusterNumWorkerGroups {

return activeClusterHash != goalClusterHash
// Remove the new workergroup(s) from the end before calculating the hash.
goalClusterSpec := rayServiceInstance.Spec.RayClusterSpec.DeepCopy()
goalClusterSpec.WorkerGroupSpecs = goalClusterSpec.WorkerGroupSpecs[:activeClusterNumWorkerGroups]

// Generate the hash of the old worker group specs.
goalClusterHash, err = generateHashWithoutReplicasAndWorkersToDelete(*goalClusterSpec)
if err != nil {
r.Log.Error(err, errContextFailedToSerialize)
return DoNothing
}

if activeClusterHash == goalClusterHash {
r.Log.Info("Active Ray cluster config matches goal config, except that one or more entries were appended to WorkerGroupSpecs. Updating RayCluster.")
kevin85421 marked this conversation as resolved.
Show resolved Hide resolved
return Update
}
}

// Case 3: Otherwise, rollout a new cluster.
r.Log.Info("Active RayCluster config doesn't match goal config. " +
"RayService operator should prepare a new Ray cluster.\n" +
"* Active RayCluster config hash: " + activeClusterHash + "\n" +
"* Goal RayCluster config hash: " + goalClusterHash)
return RolloutNew
}

return false
return DoNothing
}

// createRayClusterInstanceIfNeeded checks if we need to create a new RayCluster instance. If so, create one.
func (r *RayServiceReconciler) createRayClusterInstanceIfNeeded(ctx context.Context, rayServiceInstance *rayv1.RayService, pendingRayCluster *rayv1.RayCluster) (*rayv1.RayCluster, error) {
// Early return if no pending RayCluster needs to be created.
if rayServiceInstance.Status.PendingServiceStatus.RayClusterName == "" {
// No exist pending RayCluster and no need to create one.
return nil, nil
}

// Create a new RayCluster if:
// 1. No RayCluster pending.
// 2. Config update for the pending cluster.
equal, err := compareRayClusterJsonHash(pendingRayCluster.Spec, rayServiceInstance.Spec.RayClusterSpec)
if err != nil {
r.Log.Error(err, "Fail to generate hash for RayClusterSpec")
return nil, err
var clusterAction ClusterAction
var err error

if pendingRayCluster == nil {
clusterAction = RolloutNew
} else {
clusterAction, err = getClusterAction(pendingRayCluster.Spec, rayServiceInstance.Spec.RayClusterSpec)
if err != nil {
r.Log.Error(err, "Fail to generate hash for RayClusterSpec")
return nil, err
}
}

if pendingRayCluster == nil || !equal {
switch clusterAction {
case RolloutNew:
r.Log.Info("Creating a new pending RayCluster instance.")
pendingRayCluster, err = r.createRayClusterInstance(ctx, rayServiceInstance, rayServiceInstance.Status.PendingServiceStatus.RayClusterName)
if err != nil {
case Update:
r.Log.Info("Updating the pending RayCluster instance.")
if pendingRayCluster, err = r.constructRayClusterForRayService(rayServiceInstance, pendingRayCluster.Name); err != nil {
return nil, err
}
err = r.updateRayClusterInstance(ctx, pendingRayCluster)
}

if err != nil {
return nil, err
}

return pendingRayCluster, nil
}

// updateRayClusterInstance updates the RayCluster instance.
func (r *RayServiceReconciler) updateRayClusterInstance(ctx context.Context, rayClusterInstance *rayv1.RayCluster) error {
r.Log.V(1).Info("updateRayClusterInstance", "Name", rayClusterInstance.Name, "Namespace", rayClusterInstance.Namespace)
// Printing the whole RayCluster is too noisy. Only print the spec.
r.Log.V(1).Info("updateRayClusterInstance", "rayClusterInstance.Spec", rayClusterInstance.Spec)

// Fetch the current state of the RayCluster
currentRayCluster, err := r.getRayClusterByNamespacedName(ctx, client.ObjectKey{
Namespace: rayClusterInstance.Namespace,
Name: rayClusterInstance.Name,
})
if err != nil {
r.Log.Error(err, "Failed to get the current state of RayCluster", "Namespace", rayClusterInstance.Namespace, "Name", rayClusterInstance.Name)
return err
}

if currentRayCluster == nil {
r.Log.Info("RayCluster not found, possibly deleted", "Namespace", rayClusterInstance.Namespace, "Name", rayClusterInstance.Name)
return nil
}

// Update the fetched RayCluster with new changes
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We almost replace anything in currentRayCluster. Why do we need to get currentRayCluster? Do we need any information from it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My first approach was to not get currentRayCluster, but then I got this error when calling r.Update():

rayclusters.ray.io \"rayservice-sample-raycluster-qb8z4\" is invalid: metadata.resourceVersion: Invalid value: 0x0: must be specified for an update

After some research it seemed that a common approach was to first get the current object, then apply changes to the object, and then call Update().

Do you know what the best practice is? Is it better to use Patch() here, or is there some third approach?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rayclusters.ray.io "rayservice-sample-raycluster-qb8z4" is invalid: metadata.resourceVersion: Invalid value: 0x0: must be specified for an update

Got it. This makes sense.

Do you know what the best practice is? Is it better to use Patch() here, or is there some third approach?

In my understanding, Patch isn't protected by the Kubernetes optimistic concurrency model. I don't understand the use case for Patch. We should avoid using Patch until we understand it more.

currentRayCluster.Spec = rayClusterInstance.Spec

// Update the labels and annotations
currentRayCluster.Labels = rayClusterInstance.Labels
currentRayCluster.Annotations = rayClusterInstance.Annotations

// Update the RayCluster
if err = r.Update(ctx, currentRayCluster); err != nil {
r.Log.Error(err, "Fail to update RayCluster "+currentRayCluster.Name)
return err
}

r.Log.V(1).Info("updated RayCluster", "rayClusterInstance", currentRayCluster)
return nil
}

// createRayClusterInstance deletes the old RayCluster instance if exists. Only when no existing RayCluster, create a new RayCluster instance.
// One important part is that if this method deletes the old RayCluster, it will return instantly. It depends on the controller to call it again to generate the new RayCluster instance.
func (r *RayServiceReconciler) createRayClusterInstance(ctx context.Context, rayServiceInstance *rayv1.RayService, rayClusterInstanceName string) (*rayv1.RayCluster, error) {
Expand Down Expand Up @@ -591,14 +698,15 @@ func (r *RayServiceReconciler) constructRayClusterForRayService(rayService *rayv
rayClusterAnnotations[k] = v
}
rayClusterAnnotations[utils.EnableServeServiceKey] = utils.EnableServeServiceTrue
rayClusterAnnotations[utils.RayServiceClusterHashKey], err = generateRayClusterJsonHash(rayService.Spec.RayClusterSpec)
errContext := "Failed to serialize RayCluster config. " +
"Manual config updates will NOT be tracked accurately. " +
"Please tear down the cluster and apply a new config."
rayClusterAnnotations[utils.HashWithoutReplicasAndWorkersToDeleteKey], err = generateHashWithoutReplicasAndWorkersToDelete(rayService.Spec.RayClusterSpec)
if err != nil {
errContext := "Failed to serialize RayCluster config. " +
"Manual config updates will NOT be tracked accurately. " +
"Please tear down the cluster and apply a new config."
r.Log.Error(err, errContext)
return nil, err
}
rayClusterAnnotations[utils.NumWorkerGroupsKey] = strconv.Itoa(len(rayService.Spec.RayClusterSpec.WorkerGroupSpecs))

rayCluster := &rayv1.RayCluster{
ObjectMeta: metav1.ObjectMeta{
Expand Down Expand Up @@ -862,7 +970,7 @@ func updateDashboardStatus(rayServiceClusterStatus *rayv1.RayServiceStatus, isHe
}
}

func (r *RayServiceReconciler) markRestart(rayServiceInstance *rayv1.RayService) {
func (r *RayServiceReconciler) markRestartAndAddPendingClusterName(rayServiceInstance *rayv1.RayService) {
// Generate RayCluster name for pending cluster.
r.Log.V(1).Info("Current cluster is unhealthy, prepare to restart.", "Status", rayServiceInstance.Status)
rayServiceInstance.Status.ServiceStatus = rayv1.Restarting
Expand Down Expand Up @@ -1137,8 +1245,41 @@ func (r *RayServiceReconciler) labelHealthyServePods(ctx context.Context, rayClu
return nil
}

func generateRayClusterJsonHash(rayClusterSpec rayv1.RayClusterSpec) (string, error) {
// Mute all fields that will not trigger new RayCluster preparation. For example,
func getClusterAction(old_spec rayv1.RayClusterSpec, new_spec rayv1.RayClusterSpec) (ClusterAction, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function's coding style (old_spec -> oldSpec) is inconsistent.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call, fixed e62ac34

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The commit still has some snake case style. Ex: newSpec_without_new_worker_groups.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, fixed 8b0056b

I wonder if there's a linter that can check for this

// Return the appropriate action based on the difference in the old and new RayCluster specs.

// Case 1: If everything is identical except for the Replicas and WorkersToDelete of
// each WorkerGroup, then do nothing.
same_hash, err := compareRayClusterJsonHash(old_spec, new_spec, generateHashWithoutReplicasAndWorkersToDelete)
kevin85421 marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return DoNothing, err
}
if same_hash {
return DoNothing, nil
}

// Case 2: Otherwise, if everything is identical except for the Replicas and WorkersToDelete of
// the existing workergroups, and one or more new workergroups are added at the end, then update the cluster.
new_spec_without_new_worker_groups := new_spec.DeepCopy()
if len(new_spec.WorkerGroupSpecs) > len(old_spec.WorkerGroupSpecs) {
// Remove the new worker groups from the new spec.
new_spec_without_new_worker_groups.WorkerGroupSpecs = new_spec_without_new_worker_groups.WorkerGroupSpecs[:len(old_spec.WorkerGroupSpecs)]

same_hash, err = compareRayClusterJsonHash(old_spec, *new_spec_without_new_worker_groups, generateHashWithoutReplicasAndWorkersToDelete)
if err != nil {
return DoNothing, err
}
if same_hash {
return Update, nil
}
}

// Case 3: Otherwise, rollout a new cluster.
return RolloutNew, nil
}

func generateHashWithoutReplicasAndWorkersToDelete(rayClusterSpec rayv1.RayClusterSpec) (string, error) {
// Mute certain fields that will not trigger new RayCluster preparation. For example,
// Autoscaler will update `Replicas` and `WorkersToDelete` when scaling up/down.
updatedRayClusterSpec := rayClusterSpec.DeepCopy()
for i := 0; i < len(updatedRayClusterSpec.WorkerGroupSpecs); i++ {
Expand All @@ -1150,13 +1291,13 @@ func generateRayClusterJsonHash(rayClusterSpec rayv1.RayClusterSpec) (string, er
return utils.GenerateJsonHash(updatedRayClusterSpec)
}

func compareRayClusterJsonHash(spec1 rayv1.RayClusterSpec, spec2 rayv1.RayClusterSpec) (bool, error) {
hash1, err1 := generateRayClusterJsonHash(spec1)
func compareRayClusterJsonHash(spec1 rayv1.RayClusterSpec, spec2 rayv1.RayClusterSpec, hashFunc func(rayv1.RayClusterSpec) (string, error)) (bool, error) {
hash1, err1 := hashFunc(spec1)
if err1 != nil {
return false, err1
}

hash2, err2 := generateRayClusterJsonHash(spec2)
hash2, err2 := hashFunc(spec2)
if err2 != nil {
return false, err2
}
Expand Down
Loading
Loading