-
Notifications
You must be signed in to change notification settings - Fork 432
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[RayService] Allow updating WorkerGroupSpecs without rolling out new cluster #1734
Changes from 17 commits
534357d
3637689
3b3ca3f
19db35a
1f14e45
79fb7c7
c189a68
1ec5b4d
33622cd
47a4e4c
59bb53b
ff7c56b
0520aa4
e77a72d
b91e451
6576d73
d76c904
ce83a1f
16aa1ec
0517a10
e62ac34
65774d7
cb394c4
8136580
8b0056b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,6 +5,7 @@ import ( | |
"fmt" | ||
"os" | ||
"reflect" | ||
"strconv" | ||
"strings" | ||
"time" | ||
|
||
|
@@ -364,19 +365,32 @@ func (r *RayServiceReconciler) reconcileRayCluster(ctx context.Context, rayServi | |
return nil, nil, err | ||
} | ||
|
||
if r.shouldPrepareNewRayCluster(rayServiceInstance, activeRayCluster) { | ||
clusterAction := r.shouldPrepareNewRayCluster(rayServiceInstance, activeRayCluster) | ||
if clusterAction == RolloutNew { | ||
// For LLM serving, some users might not have sufficient GPU resources to run two RayClusters simultaneously. | ||
// Therefore, KubeRay offers ENABLE_ZERO_DOWNTIME as a feature flag for zero-downtime upgrades. | ||
enableZeroDowntime := true | ||
if s := os.Getenv(ENABLE_ZERO_DOWNTIME); strings.ToLower(s) == "false" { | ||
enableZeroDowntime = false | ||
} | ||
if enableZeroDowntime || !enableZeroDowntime && activeRayCluster == nil { | ||
r.markRestart(rayServiceInstance) | ||
// Add a pending cluster name. In the next reconcile loop, shouldPrepareNewRayCluster will return DoNothing and we will | ||
// actually create the pending RayCluster instance. | ||
r.markRestartAndAddPendingClusterName(rayServiceInstance) | ||
} else { | ||
r.Log.Info("Zero-downtime upgrade is disabled (ENABLE_ZERO_DOWNTIME: false). Skip preparing a new RayCluster.") | ||
} | ||
return activeRayCluster, nil, nil | ||
} else if clusterAction == Update { | ||
// Update the active cluster. | ||
r.Log.Info("Updating the active RayCluster instance.") | ||
if activeRayCluster, err = r.constructRayClusterForRayService(rayServiceInstance, activeRayCluster.Name); err != nil { | ||
return nil, nil, err | ||
} | ||
if err := r.updateRayClusterInstance(ctx, activeRayCluster); err != nil { | ||
return nil, nil, err | ||
} | ||
return activeRayCluster, nil, nil | ||
} | ||
|
||
if pendingRayCluster, err = r.createRayClusterInstanceIfNeeded(ctx, rayServiceInstance, pendingRayCluster); err != nil { | ||
|
@@ -468,67 +482,160 @@ func (r *RayServiceReconciler) cleanUpServeConfigCache(rayServiceInstance *rayv1 | |
} | ||
} | ||
|
||
type ClusterAction int | ||
|
||
const ( | ||
DoNothing ClusterAction = iota // value 0 | ||
Update // value 1 | ||
RolloutNew // value 2 | ||
) | ||
|
||
// shouldPrepareNewRayCluster checks if we need to generate a new pending cluster. | ||
func (r *RayServiceReconciler) shouldPrepareNewRayCluster(rayServiceInstance *rayv1.RayService, activeRayCluster *rayv1.RayCluster) bool { | ||
func (r *RayServiceReconciler) shouldPrepareNewRayCluster(rayServiceInstance *rayv1.RayService, activeRayCluster *rayv1.RayCluster) ClusterAction { | ||
// Prepare new RayCluster if: | ||
// 1. No active cluster and no pending cluster | ||
// 2. No pending cluster, and the active RayCluster has changed. | ||
if rayServiceInstance.Status.PendingServiceStatus.RayClusterName == "" { | ||
if activeRayCluster == nil { | ||
r.Log.Info("No active Ray cluster. RayService operator should prepare a new Ray cluster.") | ||
return true | ||
return RolloutNew | ||
} | ||
activeClusterHash := activeRayCluster.ObjectMeta.Annotations[utils.RayServiceClusterHashKey] | ||
goalClusterHash, err := generateRayClusterJsonHash(rayServiceInstance.Spec.RayClusterSpec) | ||
|
||
// Case 1: If everything is identical except for the Replicas and WorkersToDelete of | ||
// each WorkerGroup, then do nothing. | ||
activeClusterHash := activeRayCluster.ObjectMeta.Annotations[utils.HashWithoutReplicasAndWorkersToDeleteKey] | ||
goalClusterHash, err := generateHashWithoutReplicasAndWorkersToDelete(rayServiceInstance.Spec.RayClusterSpec) | ||
errContextFailedToSerialize := "Failed to serialize new RayCluster config. " + | ||
"Manual config updates will NOT be tracked accurately. " + | ||
"Please manually tear down the cluster and apply a new config." | ||
if err != nil { | ||
errContext := "Failed to serialize new RayCluster config. " + | ||
"Manual config updates will NOT be tracked accurately. " + | ||
"Please manually tear down the cluster and apply a new config." | ||
r.Log.Error(err, errContext) | ||
return true | ||
r.Log.Error(err, errContextFailedToSerialize) | ||
return DoNothing | ||
} | ||
|
||
if activeClusterHash != goalClusterHash { | ||
r.Log.Info("Active RayCluster config doesn't match goal config. " + | ||
"RayService operator should prepare a new Ray cluster.\n" + | ||
"* Active RayCluster config hash: " + activeClusterHash + "\n" + | ||
"* Goal RayCluster config hash: " + goalClusterHash) | ||
} else { | ||
r.Log.Info("Active Ray cluster config matches goal config.") | ||
if activeClusterHash == goalClusterHash { | ||
r.Log.Info("Active Ray cluster config matches goal config. No need to update RayCluster.") | ||
return DoNothing | ||
} | ||
|
||
// Case 2: Otherwise, if everything is identical except for the Replicas and WorkersToDelete of | ||
// the existing workergroups, and one or more new workergroups are added at the end, then update the cluster. | ||
|
||
activeClusterHash = activeRayCluster.ObjectMeta.Annotations[utils.HashWithoutReplicasAndWorkersToDeleteKey] | ||
kevin85421 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
activeClusterNumWorkerGroups, err := strconv.Atoi(activeRayCluster.ObjectMeta.Annotations[utils.NumWorkerGroupsKey]) | ||
if err != nil { | ||
r.Log.Error(err, errContextFailedToSerialize) | ||
kevin85421 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return DoNothing | ||
} | ||
goalNumWorkerGroups := len(rayServiceInstance.Spec.RayClusterSpec.WorkerGroupSpecs) | ||
|
||
kevin85421 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if goalNumWorkerGroups > activeClusterNumWorkerGroups { | ||
|
||
return activeClusterHash != goalClusterHash | ||
// Remove the new workergroup(s) from the end before calculating the hash. | ||
goalClusterSpec := rayServiceInstance.Spec.RayClusterSpec.DeepCopy() | ||
goalClusterSpec.WorkerGroupSpecs = goalClusterSpec.WorkerGroupSpecs[:activeClusterNumWorkerGroups] | ||
|
||
// Generate the hash of the old worker group specs. | ||
goalClusterHash, err = generateHashWithoutReplicasAndWorkersToDelete(*goalClusterSpec) | ||
if err != nil { | ||
r.Log.Error(err, errContextFailedToSerialize) | ||
return DoNothing | ||
} | ||
|
||
if activeClusterHash == goalClusterHash { | ||
r.Log.Info("Active Ray cluster config matches goal config, except that one or more entries were appended to WorkerGroupSpecs. Updating RayCluster.") | ||
kevin85421 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return Update | ||
} | ||
} | ||
|
||
// Case 3: Otherwise, rollout a new cluster. | ||
r.Log.Info("Active RayCluster config doesn't match goal config. " + | ||
"RayService operator should prepare a new Ray cluster.\n" + | ||
"* Active RayCluster config hash: " + activeClusterHash + "\n" + | ||
"* Goal RayCluster config hash: " + goalClusterHash) | ||
return RolloutNew | ||
} | ||
|
||
return false | ||
return DoNothing | ||
} | ||
|
||
// createRayClusterInstanceIfNeeded checks if we need to create a new RayCluster instance. If so, create one. | ||
func (r *RayServiceReconciler) createRayClusterInstanceIfNeeded(ctx context.Context, rayServiceInstance *rayv1.RayService, pendingRayCluster *rayv1.RayCluster) (*rayv1.RayCluster, error) { | ||
// Early return if no pending RayCluster needs to be created. | ||
if rayServiceInstance.Status.PendingServiceStatus.RayClusterName == "" { | ||
// No exist pending RayCluster and no need to create one. | ||
return nil, nil | ||
} | ||
|
||
// Create a new RayCluster if: | ||
// 1. No RayCluster pending. | ||
// 2. Config update for the pending cluster. | ||
equal, err := compareRayClusterJsonHash(pendingRayCluster.Spec, rayServiceInstance.Spec.RayClusterSpec) | ||
if err != nil { | ||
r.Log.Error(err, "Fail to generate hash for RayClusterSpec") | ||
return nil, err | ||
var clusterAction ClusterAction | ||
var err error | ||
|
||
if pendingRayCluster == nil { | ||
clusterAction = RolloutNew | ||
} else { | ||
clusterAction, err = getClusterAction(pendingRayCluster.Spec, rayServiceInstance.Spec.RayClusterSpec) | ||
if err != nil { | ||
r.Log.Error(err, "Fail to generate hash for RayClusterSpec") | ||
return nil, err | ||
} | ||
} | ||
|
||
if pendingRayCluster == nil || !equal { | ||
switch clusterAction { | ||
case RolloutNew: | ||
r.Log.Info("Creating a new pending RayCluster instance.") | ||
pendingRayCluster, err = r.createRayClusterInstance(ctx, rayServiceInstance, rayServiceInstance.Status.PendingServiceStatus.RayClusterName) | ||
if err != nil { | ||
case Update: | ||
r.Log.Info("Updating the pending RayCluster instance.") | ||
if pendingRayCluster, err = r.constructRayClusterForRayService(rayServiceInstance, pendingRayCluster.Name); err != nil { | ||
return nil, err | ||
} | ||
err = r.updateRayClusterInstance(ctx, pendingRayCluster) | ||
} | ||
|
||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
return pendingRayCluster, nil | ||
} | ||
|
||
// updateRayClusterInstance updates the RayCluster instance. | ||
func (r *RayServiceReconciler) updateRayClusterInstance(ctx context.Context, rayClusterInstance *rayv1.RayCluster) error { | ||
r.Log.V(1).Info("updateRayClusterInstance", "Name", rayClusterInstance.Name, "Namespace", rayClusterInstance.Namespace) | ||
// Printing the whole RayCluster is too noisy. Only print the spec. | ||
r.Log.V(1).Info("updateRayClusterInstance", "rayClusterInstance.Spec", rayClusterInstance.Spec) | ||
|
||
// Fetch the current state of the RayCluster | ||
currentRayCluster, err := r.getRayClusterByNamespacedName(ctx, client.ObjectKey{ | ||
Namespace: rayClusterInstance.Namespace, | ||
Name: rayClusterInstance.Name, | ||
}) | ||
if err != nil { | ||
r.Log.Error(err, "Failed to get the current state of RayCluster", "Namespace", rayClusterInstance.Namespace, "Name", rayClusterInstance.Name) | ||
return err | ||
} | ||
|
||
if currentRayCluster == nil { | ||
r.Log.Info("RayCluster not found, possibly deleted", "Namespace", rayClusterInstance.Namespace, "Name", rayClusterInstance.Name) | ||
return nil | ||
} | ||
|
||
// Update the fetched RayCluster with new changes | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We almost replace anything in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My first approach was to not get
After some research it seemed that a common approach was to first get the current object, then apply changes to the object, and then call Do you know what the best practice is? Is it better to use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Got it. This makes sense.
In my understanding, |
||
currentRayCluster.Spec = rayClusterInstance.Spec | ||
|
||
// Update the labels and annotations | ||
currentRayCluster.Labels = rayClusterInstance.Labels | ||
currentRayCluster.Annotations = rayClusterInstance.Annotations | ||
|
||
// Update the RayCluster | ||
if err = r.Update(ctx, currentRayCluster); err != nil { | ||
r.Log.Error(err, "Fail to update RayCluster "+currentRayCluster.Name) | ||
return err | ||
} | ||
|
||
r.Log.V(1).Info("updated RayCluster", "rayClusterInstance", currentRayCluster) | ||
return nil | ||
} | ||
|
||
// createRayClusterInstance deletes the old RayCluster instance if exists. Only when no existing RayCluster, create a new RayCluster instance. | ||
// One important part is that if this method deletes the old RayCluster, it will return instantly. It depends on the controller to call it again to generate the new RayCluster instance. | ||
func (r *RayServiceReconciler) createRayClusterInstance(ctx context.Context, rayServiceInstance *rayv1.RayService, rayClusterInstanceName string) (*rayv1.RayCluster, error) { | ||
|
@@ -591,14 +698,15 @@ func (r *RayServiceReconciler) constructRayClusterForRayService(rayService *rayv | |
rayClusterAnnotations[k] = v | ||
} | ||
rayClusterAnnotations[utils.EnableServeServiceKey] = utils.EnableServeServiceTrue | ||
rayClusterAnnotations[utils.RayServiceClusterHashKey], err = generateRayClusterJsonHash(rayService.Spec.RayClusterSpec) | ||
errContext := "Failed to serialize RayCluster config. " + | ||
"Manual config updates will NOT be tracked accurately. " + | ||
"Please tear down the cluster and apply a new config." | ||
rayClusterAnnotations[utils.HashWithoutReplicasAndWorkersToDeleteKey], err = generateHashWithoutReplicasAndWorkersToDelete(rayService.Spec.RayClusterSpec) | ||
if err != nil { | ||
errContext := "Failed to serialize RayCluster config. " + | ||
"Manual config updates will NOT be tracked accurately. " + | ||
"Please tear down the cluster and apply a new config." | ||
r.Log.Error(err, errContext) | ||
return nil, err | ||
} | ||
rayClusterAnnotations[utils.NumWorkerGroupsKey] = strconv.Itoa(len(rayService.Spec.RayClusterSpec.WorkerGroupSpecs)) | ||
|
||
rayCluster := &rayv1.RayCluster{ | ||
ObjectMeta: metav1.ObjectMeta{ | ||
|
@@ -862,7 +970,7 @@ func updateDashboardStatus(rayServiceClusterStatus *rayv1.RayServiceStatus, isHe | |
} | ||
} | ||
|
||
func (r *RayServiceReconciler) markRestart(rayServiceInstance *rayv1.RayService) { | ||
func (r *RayServiceReconciler) markRestartAndAddPendingClusterName(rayServiceInstance *rayv1.RayService) { | ||
// Generate RayCluster name for pending cluster. | ||
r.Log.V(1).Info("Current cluster is unhealthy, prepare to restart.", "Status", rayServiceInstance.Status) | ||
rayServiceInstance.Status.ServiceStatus = rayv1.Restarting | ||
|
@@ -1137,8 +1245,41 @@ func (r *RayServiceReconciler) labelHealthyServePods(ctx context.Context, rayClu | |
return nil | ||
} | ||
|
||
func generateRayClusterJsonHash(rayClusterSpec rayv1.RayClusterSpec) (string, error) { | ||
// Mute all fields that will not trigger new RayCluster preparation. For example, | ||
func getClusterAction(old_spec rayv1.RayClusterSpec, new_spec rayv1.RayClusterSpec) (ClusterAction, error) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This function's coding style ( There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good call, fixed e62ac34 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The commit still has some snake case style. Ex: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks, fixed 8b0056b I wonder if there's a linter that can check for this |
||
// Return the appropriate action based on the difference in the old and new RayCluster specs. | ||
|
||
// Case 1: If everything is identical except for the Replicas and WorkersToDelete of | ||
// each WorkerGroup, then do nothing. | ||
same_hash, err := compareRayClusterJsonHash(old_spec, new_spec, generateHashWithoutReplicasAndWorkersToDelete) | ||
kevin85421 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if err != nil { | ||
return DoNothing, err | ||
} | ||
if same_hash { | ||
return DoNothing, nil | ||
} | ||
|
||
// Case 2: Otherwise, if everything is identical except for the Replicas and WorkersToDelete of | ||
// the existing workergroups, and one or more new workergroups are added at the end, then update the cluster. | ||
new_spec_without_new_worker_groups := new_spec.DeepCopy() | ||
if len(new_spec.WorkerGroupSpecs) > len(old_spec.WorkerGroupSpecs) { | ||
// Remove the new worker groups from the new spec. | ||
new_spec_without_new_worker_groups.WorkerGroupSpecs = new_spec_without_new_worker_groups.WorkerGroupSpecs[:len(old_spec.WorkerGroupSpecs)] | ||
|
||
same_hash, err = compareRayClusterJsonHash(old_spec, *new_spec_without_new_worker_groups, generateHashWithoutReplicasAndWorkersToDelete) | ||
if err != nil { | ||
return DoNothing, err | ||
} | ||
if same_hash { | ||
return Update, nil | ||
} | ||
} | ||
|
||
// Case 3: Otherwise, rollout a new cluster. | ||
return RolloutNew, nil | ||
} | ||
|
||
func generateHashWithoutReplicasAndWorkersToDelete(rayClusterSpec rayv1.RayClusterSpec) (string, error) { | ||
// Mute certain fields that will not trigger new RayCluster preparation. For example, | ||
// Autoscaler will update `Replicas` and `WorkersToDelete` when scaling up/down. | ||
updatedRayClusterSpec := rayClusterSpec.DeepCopy() | ||
for i := 0; i < len(updatedRayClusterSpec.WorkerGroupSpecs); i++ { | ||
|
@@ -1150,13 +1291,13 @@ func generateRayClusterJsonHash(rayClusterSpec rayv1.RayClusterSpec) (string, er | |
return utils.GenerateJsonHash(updatedRayClusterSpec) | ||
} | ||
|
||
func compareRayClusterJsonHash(spec1 rayv1.RayClusterSpec, spec2 rayv1.RayClusterSpec) (bool, error) { | ||
hash1, err1 := generateRayClusterJsonHash(spec1) | ||
func compareRayClusterJsonHash(spec1 rayv1.RayClusterSpec, spec2 rayv1.RayClusterSpec, hashFunc func(rayv1.RayClusterSpec) (string, error)) (bool, error) { | ||
hash1, err1 := hashFunc(spec1) | ||
if err1 != nil { | ||
return false, err1 | ||
} | ||
|
||
hash2, err2 := generateRayClusterJsonHash(spec2) | ||
hash2, err2 := hashFunc(spec2) | ||
if err2 != nil { | ||
return false, err2 | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What will happen if there is inconsistency between the RayCluster and RayService's RayClusterSpec? For example, what if a worker group's replica count is set to 0 in RayClusterSpec, but the Ray Autoscaler has already scaled it up to 10? Updating the RayCluster may cause a lot of running Pods to be killed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't do any special handling for this case. So that means in this case, the user-specified replica count will take precedence over the Ray Autoscaler's case.
One alternative would be to never update the
replicas
andworkerstodelete
when updating the RayCluster. The downside is that the user can never overridereplicas
.My guess is that the current approach (the first approach) is better, because the user should always have a way to set
replicas
, and this inconsistent case is an edge case, not the common case. But what are your thoughts?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From users' perspectives, it is much better to disregard the users' settings regarding
replicas
than to delete a Pod that has any running Ray task or actor.On second thought, if Ray Autoscaling is enabled, Ray Autoscaler is the only decision maker to delete Ray Pods after #1253. Hence, users can increase the number of Pods, but can't delete Pods by updating
replicas
.Users can directly update RayCluster. In addition, setting
replicas
for existing worker groups is not common. Most RayService users use Ray Autoscaler as well. If users still need to manually updatereplicas
with Ray Autoscaling, Ray Autoscaler needs to be improved.