Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add unit tests for raycluster_controller reconcilePods function #219

Merged
26 changes: 22 additions & 4 deletions ray-operator/controllers/raycluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ func (r *RayClusterReconciler) reconcilePods(instance *rayiov1alpha1.RayCluster)
}
}
// Reconcile worker pods now
for index, worker := range instance.Spec.WorkerGroupSpecs {
for _, worker := range instance.Spec.WorkerGroupSpecs {
workerPods := corev1.PodList{}
filterLabels = client.MatchingLabels{common.RayClusterLabelKey: instance.Name, common.RayNodeGroupLabelKey: worker.GroupName}
if err := r.List(context.TODO(), &workerPods, client.InNamespace(instance.Namespace), filterLabels); err != nil {
Expand All @@ -248,6 +248,7 @@ func (r *RayClusterReconciler) reconcilePods(instance *rayiov1alpha1.RayCluster)
runningPods.Items = append(runningPods.Items, aPod)
}
}
r.updateLocalWorkersToDelete(&worker, runningPods.Items)
diff := *worker.Replicas - int32(len(runningPods.Items))

if PrioritizeWorkersToDelete {
Expand All @@ -265,7 +266,7 @@ func (r *RayClusterReconciler) reconcilePods(instance *rayiov1alpha1.RayCluster)
}
log.Info("reconcilePods", "unable to delete worker ", pod.Name)
} else {
diff--
diff++
r.Recorder.Eventf(instance, v1.EventTypeNormal, "Deleted", "Deleted pod %s", pod.Name)
}
}
Expand Down Expand Up @@ -304,7 +305,6 @@ func (r *RayClusterReconciler) reconcilePods(instance *rayiov1alpha1.RayCluster)
}
r.Recorder.Eventf(instance, v1.EventTypeNormal, "Deleted", "Deleted pod %s", pod.Name)
}
instance.Spec.WorkerGroupSpecs[index].ScaleStrategy.WorkersToDelete = []string{}
continue
} else {
// diff < 0 and not the same absolute value as int32(len(worker.ScaleStrategy.WorkersToDelete)
Expand All @@ -326,7 +326,6 @@ func (r *RayClusterReconciler) reconcilePods(instance *rayiov1alpha1.RayCluster)
}
r.Recorder.Eventf(instance, v1.EventTypeNormal, "Deleted", "Deleted pod %s", pod.Name)
}
instance.Spec.WorkerGroupSpecs[index].ScaleStrategy.WorkersToDelete = []string{}

// remove the remaining pods not part of the scaleStrategy
i := 0
Expand Down Expand Up @@ -361,6 +360,25 @@ func (r *RayClusterReconciler) reconcilePods(instance *rayiov1alpha1.RayCluster)
return nil
}

func (r *RayClusterReconciler) updateLocalWorkersToDelete(worker *rayiov1alpha1.WorkerGroupSpec, runningItems []v1.Pod) {
var actualWorkersToDelete []string
itemMap := make(map[string]int)

// Create a map for quick lookup.
for _, item := range runningItems {
itemMap[item.Name] = 1
}

// Build actualWorkersToDelete to only include running items.
for _, workerToDelete := range worker.ScaleStrategy.WorkersToDelete {
if _, ok := itemMap[workerToDelete]; ok {
actualWorkersToDelete = append(actualWorkersToDelete, workerToDelete)
}
}

worker.ScaleStrategy.WorkersToDelete = actualWorkersToDelete
}

func (r *RayClusterReconciler) createHeadIngress(ingress *networkingv1.Ingress, instance *rayiov1alpha1.RayCluster) error {
// making sure the name is valid
ingress.Name = utils.CheckName(ingress.Name)
Expand Down
Loading