Skip to content

Commit

Permalink
Add unit tests for raycluster_controller reconcilePods function (ray-…
Browse files Browse the repository at this point in the history
…project#219)

* Write raycluster_controller reconcilePods unit tests

* Improve unit tests code

* Fix lint issue and Improve unit tests code

* Run goimports

* Fix a bug of workersToDelete update and update unit tests

* Fix a bug of workersToDelete update and update unit tests

* Update unit tests log

* Update workersToDelete local var to avoid unuseful kube api server delete call

* Remove code of updating instance spec workersToDelete

Co-authored-by: Taikun Liu <[email protected]>
  • Loading branch information
Waynegates and Taikun Liu authored Apr 5, 2022
1 parent 355ca24 commit 7c267f1
Show file tree
Hide file tree
Showing 2 changed files with 579 additions and 4 deletions.
26 changes: 22 additions & 4 deletions ray-operator/controllers/raycluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ func (r *RayClusterReconciler) reconcilePods(instance *rayiov1alpha1.RayCluster)
}
}
// Reconcile worker pods now
for index, worker := range instance.Spec.WorkerGroupSpecs {
for _, worker := range instance.Spec.WorkerGroupSpecs {
workerPods := corev1.PodList{}
filterLabels = client.MatchingLabels{common.RayClusterLabelKey: instance.Name, common.RayNodeGroupLabelKey: worker.GroupName}
if err := r.List(context.TODO(), &workerPods, client.InNamespace(instance.Namespace), filterLabels); err != nil {
Expand All @@ -248,6 +248,7 @@ func (r *RayClusterReconciler) reconcilePods(instance *rayiov1alpha1.RayCluster)
runningPods.Items = append(runningPods.Items, aPod)
}
}
r.updateLocalWorkersToDelete(&worker, runningPods.Items)
diff := *worker.Replicas - int32(len(runningPods.Items))

if PrioritizeWorkersToDelete {
Expand All @@ -265,7 +266,7 @@ func (r *RayClusterReconciler) reconcilePods(instance *rayiov1alpha1.RayCluster)
}
log.Info("reconcilePods", "unable to delete worker ", pod.Name)
} else {
diff--
diff++
r.Recorder.Eventf(instance, v1.EventTypeNormal, "Deleted", "Deleted pod %s", pod.Name)
}
}
Expand Down Expand Up @@ -304,7 +305,6 @@ func (r *RayClusterReconciler) reconcilePods(instance *rayiov1alpha1.RayCluster)
}
r.Recorder.Eventf(instance, v1.EventTypeNormal, "Deleted", "Deleted pod %s", pod.Name)
}
instance.Spec.WorkerGroupSpecs[index].ScaleStrategy.WorkersToDelete = []string{}
continue
} else {
// diff < 0 and not the same absolute value as int32(len(worker.ScaleStrategy.WorkersToDelete)
Expand All @@ -326,7 +326,6 @@ func (r *RayClusterReconciler) reconcilePods(instance *rayiov1alpha1.RayCluster)
}
r.Recorder.Eventf(instance, v1.EventTypeNormal, "Deleted", "Deleted pod %s", pod.Name)
}
instance.Spec.WorkerGroupSpecs[index].ScaleStrategy.WorkersToDelete = []string{}

// remove the remaining pods not part of the scaleStrategy
i := 0
Expand Down Expand Up @@ -361,6 +360,25 @@ func (r *RayClusterReconciler) reconcilePods(instance *rayiov1alpha1.RayCluster)
return nil
}

func (r *RayClusterReconciler) updateLocalWorkersToDelete(worker *rayiov1alpha1.WorkerGroupSpec, runningItems []v1.Pod) {
var actualWorkersToDelete []string
itemMap := make(map[string]int)

// Create a map for quick lookup.
for _, item := range runningItems {
itemMap[item.Name] = 1
}

// Build actualWorkersToDelete to only include running items.
for _, workerToDelete := range worker.ScaleStrategy.WorkersToDelete {
if _, ok := itemMap[workerToDelete]; ok {
actualWorkersToDelete = append(actualWorkersToDelete, workerToDelete)
}
}

worker.ScaleStrategy.WorkersToDelete = actualWorkersToDelete
}

func (r *RayClusterReconciler) createHeadIngress(ingress *networkingv1.Ingress, instance *rayiov1alpha1.RayCluster) error {
// making sure the name is valid
ingress.Name = utils.CheckName(ingress.Name)
Expand Down
Loading

0 comments on commit 7c267f1

Please sign in to comment.