Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GCS FT] Consider the case of sidecar containers #1386

Merged
merged 5 commits into from
Sep 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 106 additions & 44 deletions ray-operator/controllers/ray/raycluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,33 +431,20 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv
// Reconcile head Pod
if len(headPods.Items) == 1 {
headPod := headPods.Items[0]
r.Log.Info("reconcilePods", "Found 1 head Pod", headPod.Name)
// TODO (kevin85421): Consider deleting a head Pod if its Ray container restarts excessively, as this
// might suggest an unhealthy Kubernetes node. Deleting and then recreating the head Pod might allow
// it to be scheduled on a different node. However, it's aggressive to delete a head Pod that is not
// in a terminated state (i.e., `Failed` or `Succeeded`). We should only delete a head Pod when GCS
// fault tolerance is enabled, and drain the head Pod before deleting it.
if headPod.Status.Phase == corev1.PodRunning || headPod.Status.Phase == corev1.PodPending {
r.Log.Info("reconcilePods", "The head pod is Running or Pending... checking workers", headPod.Name)
} else {
if headPod.Spec.RestartPolicy == corev1.RestartPolicyAlways {
// Based on my observation, a Pod with `RestartPolicy: Always` will never be in the terminated states (i.e., `Failed` or `Succeeded`).
// However, I couldn't find any well-defined behavior in the Kubernetes documentation, so I can't guarantee that the status transition
// from `Running` to `Failed / Succeeded` and back to `Running` won't occur when we kill the main process (i.e., `ray start` in KubeRay)
// in the head Pod. Therefore, I've added this check as a safeguard.
message := fmt.Sprintf(
"The status of the head Pod %s is %s. However, KubeRay will not delete the Pod because its restartPolicy is set to 'Always' "+
"and it should be able to restart automatically.", headPod.Name, headPod.Status.Phase)
r.Log.Info(message)
return fmt.Errorf(message)
}
message := fmt.Sprintf("The status of the head Pod %s is %s which is a terminal state. It is not expected that the head pod ever be in a terminal state, so KubeRay will delete the Pod and recreate the head Pod in the next reconciliation.", headPod.Name, headPod.Status.Phase)
r.Log.Info(message)
r.Log.Info("reconcilePods", "Found 1 head Pod", headPod.Name, "Pod status", headPod.Status.Phase,
"Pod restart policy", headPod.Spec.RestartPolicy,
"Ray container terminated status", getRayContainerStateTerminated(headPod))

shouldDelete, reason := shouldDeletePod(headPod, rayv1alpha1.HeadNode)
r.Log.Info("reconcilePods", "head Pod", headPod.Name, "shouldDelete", shouldDelete, "reason", reason)
if shouldDelete {
if err := r.Delete(ctx, &headPod); err != nil {
return err
}
r.Recorder.Eventf(instance, corev1.EventTypeNormal, "Deleted", "Deleted head Pod %s; status: %s", headPod.Name, headPod.Status.Phase)
return fmt.Errorf(message)
r.Recorder.Eventf(instance, corev1.EventTypeNormal, "Deleted",
"Deleted head Pod %s; Pod status: %s; Pod restart policy: %s; Ray container terminated status: %v",
headPod.Name, headPod.Status.Phase, headPod.Spec.RestartPolicy, getRayContainerStateTerminated(headPod))
return fmt.Errorf(reason)
}
} else if len(headPods.Items) == 0 {
// Create head Pod if it does not exist.
Expand Down Expand Up @@ -551,27 +538,22 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv
}

// Delete unhealthy worker Pods
deletedWorkers := make(map[string]struct{})
deleted := struct{}{}
numDeletedUnhealthyWorkerPods := 0
for _, workerPod := range workerPods.Items {
// TODO (kevin85421): Consider deleting a worker Pod if its Ray container restarts excessively,
// as this could suggest an unhealthy Kubernetes node. Deleting and then recreating the worker Pod
// might allow it to be scheduled on a different node. Compared to deleting a head Pod, removing a
// worker Pod is less aggressive and aligns more closely with the behavior of the Ray Autoscaler.
// Nevertheless, we should still carefully drain the node before deleting the worker Pod. Enabling
// GCS fault tolerance might not be necessary when deleting worker Pods. Note that the Ray Autoscaler
// will not delete any worker Pods that have never been registered with the Ray cluster. Therefore,
// we may need to address the Ray Autoscaler's blind spots.

shouldDelete, reason := shouldDeletePod(workerPod, rayv1alpha1.WorkerNode)
r.Log.Info("reconcilePods", "worker Pod", workerPod.Name, "shouldDelete", shouldDelete, "reason", reason)
// TODO (kevin85421): We may need to allow users to configure how many `Failed` or `Succeeded` Pods should be kept for debugging purposes.
if workerPod.Spec.RestartPolicy != corev1.RestartPolicyAlways && !isPodRunningOrPendingAndNotDeleting(workerPod) {
// If the Pod's status is `Failed` or `Succeeded`, the Pod will not restart and we can safely delete it.
if shouldDelete {
numDeletedUnhealthyWorkerPods++
r.Log.Info(fmt.Sprintf("The worker Pod %s status is %s. KubeRay will delete the Pod because the status is not Running or Pending. ", workerPod.Name, workerPod.Status.Phase))
deletedWorkers[workerPod.Name] = deleted
if err := r.Delete(ctx, &workerPod); err != nil {
return err
} else {
r.Recorder.Eventf(instance, corev1.EventTypeNormal, "Deleted", "Deleted worker Pod %s; status: %s", workerPod.Name, workerPod.Status.Phase)
}
r.Recorder.Eventf(instance, corev1.EventTypeNormal, "Deleted",
"Deleted worker Pod %s; Pod status: %s; Pod restart policy: %s; Ray container terminated status: %v",
workerPod.Name, workerPod.Status.Phase, workerPod.Spec.RestartPolicy, getRayContainerStateTerminated(workerPod))
}
}

Expand All @@ -582,8 +564,6 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv

// Always remove the specified WorkersToDelete - regardless of the value of Replicas.
// Essentially WorkersToDelete has to be deleted to meet the expectations of the Autoscaler.
deletedWorkers := make(map[string]struct{})
deleted := struct{}{}
r.Log.Info("reconcilePods", "removing the pods in the scaleStrategy of", worker.GroupName)
for _, podsToDelete := range worker.ScaleStrategy.WorkersToDelete {
pod := corev1.Pod{}
Expand All @@ -605,8 +585,7 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv

runningPods := corev1.PodList{}
for _, pod := range workerPods.Items {
// TODO (kevin85421): We also need to have a clear story of all the Pod status phases, especially for PodFailed.
if _, ok := deletedWorkers[pod.Name]; !ok && isPodRunningOrPendingAndNotDeleting(pod) {
if _, ok := deletedWorkers[pod.Name]; !ok {
runningPods.Items = append(runningPods.Items, pod)
}
}
Expand Down Expand Up @@ -668,8 +647,91 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv
return nil
}

func isPodRunningOrPendingAndNotDeleting(pod corev1.Pod) bool {
return (pod.Status.Phase == corev1.PodRunning || pod.Status.Phase == corev1.PodPending) && pod.ObjectMeta.DeletionTimestamp == nil
// shouldDeletePod returns whether the Pod should be deleted and the reason
//
// @param pod: The Pod to be checked.
// @param nodeType: The type of the node that the Pod belongs to (head or worker).
//
// @return: shouldDelete (bool), reason (string)
// (1) shouldDelete: Whether the Pod should be deleted.
// (2) reason: The reason why the Pod should or should not be deleted.
func shouldDeletePod(pod corev1.Pod, nodeType rayv1alpha1.RayNodeType) (bool, string) {
// If a Pod's restart policy is set to `Always`, KubeRay will not delete
// the Pod and rely on the Pod's restart policy to restart the Pod.
isRestartPolicyAlways := pod.Spec.RestartPolicy == corev1.RestartPolicyAlways

// If the Pod's status is `Failed` or `Succeeded`, the Pod will not restart and we can safely delete it.
if pod.Status.Phase == corev1.PodFailed || pod.Status.Phase == corev1.PodSucceeded {
if isRestartPolicyAlways {
// Based on my observation, a Pod with `RestartPolicy: Always` will never be in the terminated states (i.e., `Failed` or `Succeeded`).
// However, I couldn't find any well-defined behavior in the Kubernetes documentation, so I can't guarantee that the status transition
// from `Running` to `Failed / Succeeded` and back to `Running` won't occur when we kill the main process (i.e., `ray start` in KubeRay)
// in the head Pod. Therefore, I've added this check as a safeguard.
reason := fmt.Sprintf(
"The status of the %s Pod %s is %s. However, KubeRay will not delete the Pod because its restartPolicy is set to 'Always' "+
"and it should be able to restart automatically.", nodeType, pod.Name, pod.Status.Phase)
return false, reason
}

reason := fmt.Sprintf(
"The %s Pod %s status is %s which is a terminal state and it will not restart. "+
"KubeRay will delete the Pod and create new Pods in the next reconciliation if necessary.", nodeType, pod.Name, pod.Status.Phase)
return true, reason
}

rayContainerTerminated := getRayContainerStateTerminated(pod)
if pod.Status.Phase == corev1.PodRunning && rayContainerTerminated != nil {
if isRestartPolicyAlways {
// If restart policy is set to `Always`, KubeRay will not delete the Pod.
reason := fmt.Sprintf(
"The Pod status of the %s Pod %s is %s, and the Ray container terminated status is %v. However, KubeRay will not delete the Pod because its restartPolicy is set to 'Always' "+
"and it should be able to restart automatically.", nodeType, pod.Name, pod.Status.Phase, rayContainerTerminated)
return false, reason
}
reason := fmt.Sprintf(
"The Pod status of the %s Pod %s is %s, and the Ray container terminated status is %v. "+
"The container is unable to restart due to its restart policy %s, so KubeRay will delete it.",
nodeType, pod.Name, pod.Status.Phase, rayContainerTerminated, pod.Spec.RestartPolicy)
return true, reason
}

// TODO (kevin85421): Consider deleting a Pod if its Ray container restarts excessively, as this might
// suggest an unhealthy Kubernetes node. Deleting and then recreating the Pod might allow it to be
// scheduled on a different node.
//
// (1) Head Pod:
// It's aggressive to delete a head Pod that is not in a terminated state (i.e., `Failed` or `Succeeded`).
// We should only delete a head Pod when GCS fault tolerance is enabled, and drain the head Pod before
// deleting it.
//
// (2) Worker Pod:
// Compared to deleting a head Pod, removing a worker Pod is less aggressive and aligns more closely with
// the behavior of the Ray Autoscaler. Nevertheless, we should still carefully drain the node before deleting
// the worker Pod. Enabling GCS fault tolerance might not be necessary when deleting worker Pods. Note that
// the Ray Autoscaler will not delete any worker Pods that have never been registered with the Ray cluster.
// Therefore, we may need to address the Ray Autoscaler's blind spots.

reason := fmt.Sprintf(
"KubeRay does not need to delete the %s Pod %s. The Pod status is %s, and the Ray container terminated status is %v.",
nodeType, pod.Name, pod.Status.Phase, rayContainerTerminated)
return false, reason
}

// `ContainerStatuses` does not guarantee the order of the containers. Therefore, we need to find the Ray
// container's status by name. See the following links for more details:
// (1) https://discuss.kubernetes.io/t/pod-spec-containers-and-pod-status-containerstatuses-can-have-a-different-order-why/25273
// (2) https://github.com/kubernetes/kubernetes/blob/03762cbcb52b2a4394e4d795f9d3517a78a5e1a2/pkg/api/v1/pod/util.go#L261-L268
func getRayContainerStateTerminated(pod corev1.Pod) *corev1.ContainerStateTerminated {
rayContainerName := pod.Spec.Containers[common.RayContainerIndex].Name
for _, containerStatus := range pod.Status.ContainerStatuses {
if containerStatus.Name == rayContainerName {
return containerStatus.State.Terminated
}
}
// If the Ray container isn't found, we'll assume it hasn't terminated. This scenario
// typically arises during testing (`raycluster_controller_test.go`) as `envtest` lacks
// a Pod controller, preventing automatic Pod status updates.
return nil
}

func (r *RayClusterReconciler) createHeadIngress(ctx context.Context, ingress *networkingv1.Ingress, instance *rayv1alpha1.RayCluster) error {
Expand Down
Loading