Skip to content

Commit

Permalink
[raycluster controller] Always honor maxReplicas (#662)
Browse files Browse the repository at this point in the history
* Honor max replicas

Signed-off-by: Dmitri Gekhtman <[email protected]>

* Tweak message.

Signed-off-by: Dmitri Gekhtman <[email protected]>

* Add to test logic.

Signed-off-by: Dmitri Gekhtman <[email protected]>

Signed-off-by: Dmitri Gekhtman <[email protected]>
  • Loading branch information
DmitriGekhtman authored Oct 28, 2022
1 parent 850fd48 commit 5e862be
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 4 deletions.
21 changes: 19 additions & 2 deletions ray-operator/controllers/ray/raycluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,23 @@ func (r *RayClusterReconciler) reconcilePods(instance *rayiov1alpha1.RayCluster)

// Reconcile worker pods now
for _, worker := range instance.Spec.WorkerGroupSpecs {
// workerReplicas will store the target number of pods for this worker group.
var workerReplicas int32
// Always honor MaxReplicas if it is set:
// If MaxReplicas is set and Replicas > MaxReplicas, use MaxReplicas as the
// effective target replica count and log the discrepancy.
// See https://github.com/ray-project/kuberay/issues/560.
if worker.MaxReplicas != nil && *worker.MaxReplicas < *worker.Replicas {
workerReplicas = *worker.MaxReplicas
r.Log.Info(
fmt.Sprintf(
"Replicas for worker group %s (%d) is greater than maxReplicas (%d). Using maxReplicas (%d) as the target replica count.",
worker.GroupName, *worker.Replicas, *worker.MaxReplicas, *worker.MaxReplicas,
),
)
} else {
workerReplicas = *worker.Replicas
}
workerPods := corev1.PodList{}
filterLabels = client.MatchingLabels{common.RayClusterLabelKey: instance.Name, common.RayNodeGroupLabelKey: worker.GroupName}
if err := r.List(context.TODO(), &workerPods, client.InNamespace(instance.Namespace), filterLabels); err != nil {
Expand Down Expand Up @@ -458,7 +475,7 @@ func (r *RayClusterReconciler) reconcilePods(instance *rayiov1alpha1.RayCluster)
}
}
r.updateLocalWorkersToDelete(&worker, runningPods.Items)
diff := *worker.Replicas - int32(len(runningPods.Items))
diff := workerReplicas - int32(len(runningPods.Items))

if PrioritizeWorkersToDelete {
// Always remove the specified WorkersToDelete - regardless of the value of Replicas.
Expand Down Expand Up @@ -518,7 +535,7 @@ func (r *RayClusterReconciler) reconcilePods(instance *rayiov1alpha1.RayCluster)
} else {
// diff < 0 and not the same absolute value as int32(len(worker.ScaleStrategy.WorkersToDelete)
// we need to scale down
workersToRemove := int32(len(runningPods.Items)) - *worker.Replicas
workersToRemove := int32(len(runningPods.Items)) - workerReplicas
randomlyRemovedWorkers := workersToRemove - int32(len(worker.ScaleStrategy.WorkersToDelete))
// we only need to scale down the workers in the ScaleStrategy
r.Log.Info("reconcilePods", "removing all the pods in the scaleStrategy of", worker.GroupName)
Expand Down
36 changes: 34 additions & 2 deletions ray-operator/controllers/ray/raycluster_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ var _ = Context("Inside the default namespace", func() {
{
Replicas: pointer.Int32Ptr(3),
MinReplicas: pointer.Int32Ptr(0),
MaxReplicas: pointer.Int32Ptr(10000),
MaxReplicas: pointer.Int32Ptr(4),
GroupName: "small-group",
RayStartParams: map[string]string{
"port": "6379",
Expand Down Expand Up @@ -150,7 +150,7 @@ var _ = Context("Inside the default namespace", func() {
Expect(svc.Spec.Selector[common.RayIDLabelKey]).Should(Equal(utils.GenerateIdentifier(myRayCluster.Name, rayiov1alpha1.HeadNode)))
})

It("should create more than 1 worker", func() {
It("should create 3 workers", func() {
Eventually(
listResourceFunc(ctx, &workerPods, filterLabels, &client.ListOptions{Namespace: "default"}),
time.Second*15, time.Millisecond*500).Should(Equal(3), fmt.Sprintf("workerGroup %v", workerPods.Items))
Expand Down Expand Up @@ -255,6 +255,38 @@ var _ = Context("Inside the default namespace", func() {
listResourceFunc(ctx, &workerPods, filterLabels, &client.ListOptions{Namespace: "default"}),
time.Second*15, time.Millisecond*500).Should(Equal(1), fmt.Sprintf("workerGroup %v", workerPods.Items))
})

It("should increase replicas past maxReplicas", func() {
// increasing replicas to 5, which is greater than maxReplicas (4)
err := retryOnOldRevision(DefaultAttempts, DefaultSleepDurationInSeconds, func() error {
Eventually(
getResourceFunc(ctx, client.ObjectKey{Name: myRayCluster.Name, Namespace: "default"}, myRayCluster),
time.Second*9, time.Millisecond*500).Should(BeNil(), "My raycluster = %v", myRayCluster)
rep := new(int32)
*rep = 5
myRayCluster.Spec.WorkerGroupSpecs[0].Replicas = rep

// Operator may update revision after we get cluster earlier. Update may result in 409 conflict error.
// We need to handle conflict error and retry the update.
return k8sClient.Update(ctx, myRayCluster)
})

Expect(err).NotTo(HaveOccurred(), "failed to update test RayCluster resource")
})

It("should scale to maxReplicas (4) workers", func() {
// retry listing pods, given that last update may not immediately happen.
Eventually(
listResourceFunc(ctx, &workerPods, filterLabels, &client.ListOptions{Namespace: "default"}),
time.Second*15, time.Millisecond*500).Should(Equal(4), fmt.Sprintf("workerGroup %v", workerPods.Items))
})

It("should countinue to have only maxReplicas (4) workers", func() {
// check that pod count stays at 4 for two seconds.
Consistently(
listResourceFunc(ctx, &workerPods, filterLabels, &client.ListOptions{Namespace: "default"}),
time.Second*2, time.Millisecond*200).Should(Equal(4), fmt.Sprintf("workerGroup %v", workerPods.Items))
})
})
})

Expand Down

0 comments on commit 5e862be

Please sign in to comment.