Skip to content

Commit

Permalink
podprobemarker support serverless pod
Browse files Browse the repository at this point in the history
Signed-off-by: liheng.zms <[email protected]>
  • Loading branch information
zmberg committed Jan 10, 2025
1 parent 2cdb760 commit 0b4d1b3
Show file tree
Hide file tree
Showing 12 changed files with 674 additions and 21 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Build the manager and daemon binaries
ARG BASE_IMAGE=alpine
ARG BASE_IMAGE_VERSION=3.19@sha256:ae65dbf8749a7d4527648ccee1fa3deb6bfcae34cbc30fc67aa45c44dcaa90ee
FROM golang:1.20.14-alpine3.19@sha256:e47f121850f4e276b2b210c56df3fda9191278dd84a3a442bfe0b09934462a8f as builder
FROM golang:1.20.14-alpine3.19@sha256:e47f121850f4e276b2b210c56df3fda9191278dd84a3a442bfe0b09934462a8f AS builder

WORKDIR /workspace
# Copy the Go Modules manifests
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile_multiarch
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
ARG BASE_IMAGE=alpine
ARG BASE_IMAGE_VERSION=3.19@sha256:ae65dbf8749a7d4527648ccee1fa3deb6bfcae34cbc30fc67aa45c44dcaa90ee
ARG BUILD_BASE_IMAGE=golang:1.20.14-alpine3.19@sha256:e47f121850f4e276b2b210c56df3fda9191278dd84a3a442bfe0b09934462a8f
FROM --platform=$BUILDPLATFORM ${BUILD_BASE_IMAGE} as builder
FROM --platform=$BUILDPLATFORM ${BUILD_BASE_IMAGE} AS builder

WORKDIR /workspace
# Copy the Go Modules manifests
Expand Down
12 changes: 12 additions & 0 deletions apis/apps/v1alpha1/node_pod_probe_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package v1alpha1

import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

Expand Down Expand Up @@ -87,6 +88,17 @@ const (
ProbeUnknown ProbeState = "Unknown"
)

func (p ProbeState) IsEqualPodConditionStatus(status corev1.ConditionStatus) bool {
switch status {
case corev1.ConditionTrue:
return p == ProbeSucceeded
case corev1.ConditionFalse:
return p == ProbeFailed
default:
return p == ProbeUnknown
}
}

// +genclient
// +genclient:nonNamespaced
// +k8s:openapi-gen=true
Expand Down
24 changes: 24 additions & 0 deletions apis/apps/v1alpha1/pod_probe_marker_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,30 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

const (
// PodProbeMarkerAnnotationKey records the Probe Spec, mainly used for serverless Pod scenarios, as follows:
// annotations:
// kruise.io/podprobe: |
// [
// {
// "containerName": "minecraft",
// "name": "healthy",
// "podConditionType": "game.kruise.io/healthy",
// "probe": {
// "exec": {
// "command": [
// "bash",
// "/data/probe.sh"
// ]
// }
// }
// }
// ]
PodProbeMarkerAnnotationKey = "kruise.io/podprobe"
// PodProbeMarkerListAnnotationKey records the injected PodProbeMarker Name List
PodProbeMarkerListAnnotationKey = "kruise.io/podprobemarker-list"
)

// PodProbeMarkerSpec defines the desired state of PodProbeMarker
type PodProbeMarkerSpec struct {
// Selector is a label query over pods that should exec custom probe
Expand Down
127 changes: 113 additions & 14 deletions pkg/controller/podprobemarker/pod_probe_marker_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,19 @@ package podprobemarker

import (
"context"
"encoding/json"
"flag"
"fmt"
"reflect"
"strings"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"
kubecontroller "k8s.io/kubernetes/pkg/controller"
Expand Down Expand Up @@ -60,6 +64,8 @@ var (
const (
// PodProbeMarkerFinalizer is on PodProbeMarker, and used to remove podProbe from NodePodProbe.Spec
PodProbeMarkerFinalizer = "kruise.io/node-pod-probe-cleanup"

VirtualKubelet = "virtual-kubelet"
)

/**
Expand Down Expand Up @@ -145,14 +151,14 @@ func (r *ReconcilePodProbeMarker) syncPodProbeMarker(ns, name string) error {
// Error reading the object - requeue the request.
return err
}
pods, err := r.getMatchingPods(ppm)
normalPods, serverlessPods, err := r.getMatchingPods(ppm)
if err != nil {
klog.ErrorS(err, "PodProbeMarker listed pods failed", "podProbeMarker", klog.KObj(ppm))
return err
}
// remove podProbe from NodePodProbe.Spec
if !ppm.DeletionTimestamp.IsZero() {
return r.handlerPodProbeMarkerFinalizer(ppm, pods)
return r.handlerPodProbeMarkerFinalizer(ppm, normalPods)
}
// add finalizer
if !controllerutil.ContainsFinalizer(ppm, PodProbeMarkerFinalizer) {
Expand All @@ -164,9 +170,26 @@ func (r *ReconcilePodProbeMarker) syncPodProbeMarker(ns, name string) error {
klog.V(3).InfoS("Added PodProbeMarker finalizer success", "podProbeMarker", klog.KObj(ppm))
}

// map[probe.PodConditionType] = probe.MarkerPolicy
markers := make(map[string][]appsv1alpha1.ProbeMarkerPolicy)
for _, probe := range ppm.Spec.Probes {
if probe.PodConditionType == "" || len(probe.MarkerPolicy) == 0 {
continue
}
markers[probe.PodConditionType] = probe.MarkerPolicy
}
if utilfeature.DefaultFeatureGate.Enabled(features.EnablePodProbeMarkerOnServerless) && len(markers) != 0 {
for _, pod := range serverlessPods {
if err = r.markerServerlessPod(pod, markers); err != nil {
klog.ErrorS(err, "Failed to marker serverless pod", "podProbeMarker", klog.KObj(ppm), "pod", klog.KObj(pod))
return err
}
}
}

groupByNode := make(map[string][]*corev1.Pod)
for i, pod := range pods {
groupByNode[pod.Spec.NodeName] = append(groupByNode[pod.Spec.NodeName], pods[i])
for i, pod := range normalPods {
groupByNode[pod.Spec.NodeName] = append(groupByNode[pod.Spec.NodeName], normalPods[i])
}

// add podProbe in NodePodProbe
Expand All @@ -182,11 +205,12 @@ func (r *ReconcilePodProbeMarker) syncPodProbeMarker(ns, name string) error {
if err := r.Client.Get(context.TODO(), client.ObjectKey{Namespace: ppm.Namespace, Name: ppm.Name}, ppmClone); err != nil {
klog.ErrorS(err, "Failed to get updated PodProbeMarker from client", "podProbeMarker", klog.KObj(ppm))
}
if ppmClone.Status.ObservedGeneration == ppmClone.Generation && int(ppmClone.Status.MatchedPods) == len(pods) {
matchedPods := len(normalPods) + len(serverlessPods)
if ppmClone.Status.ObservedGeneration == ppmClone.Generation && int(ppmClone.Status.MatchedPods) == matchedPods {
return nil
}
ppmClone.Status.ObservedGeneration = ppmClone.Generation
ppmClone.Status.MatchedPods = int64(len(pods))
ppmClone.Status.MatchedPods = int64(matchedPods)
return r.Client.Status().Update(context.TODO(), ppmClone)
}); err != nil {
klog.ErrorS(err, "PodProbeMarker update status failed", "podProbeMarker", klog.KObj(ppm))
Expand Down Expand Up @@ -214,6 +238,59 @@ func (r *ReconcilePodProbeMarker) handlerPodProbeMarkerFinalizer(ppm *appsv1alph
return nil
}

// marker labels or annotations on Pod based on probing results
// markers is map[probe.PodConditionType] = probe.MarkerPolicy
func (r *ReconcilePodProbeMarker) markerServerlessPod(pod *corev1.Pod, markers map[string][]appsv1alpha1.ProbeMarkerPolicy) error {
newObjectMeta := *pod.ObjectMeta.DeepCopy()
if newObjectMeta.Annotations == nil {
newObjectMeta.Annotations = make(map[string]string)
}
if newObjectMeta.Labels == nil {
newObjectMeta.Labels = make(map[string]string)
}

for cond, policy := range markers {
condition := util.GetCondition(pod, corev1.PodConditionType(cond))
if condition == nil {
continue
}

for _, obj := range policy {
if !obj.State.IsEqualPodConditionStatus(condition.Status) {
continue
}
for k, v := range obj.Annotations {
newObjectMeta.Annotations[k] = v
}
for k, v := range obj.Labels {
newObjectMeta.Labels[k] = v
}
}
}

// no change
if reflect.DeepEqual(pod.ObjectMeta, newObjectMeta) {
return nil
}

// patch change
oldBytes, _ := json.Marshal(corev1.Pod{ObjectMeta: pod.ObjectMeta})
newBytes, _ := json.Marshal(corev1.Pod{ObjectMeta: newObjectMeta})
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldBytes, newBytes, &corev1.Pod{})
if err != nil {
return err
}
obj := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: pod.Namespace, Name: pod.Name}}
if err = r.Patch(context.TODO(), obj, client.RawPatch(types.StrategicMergePatchType, patchBytes)); err != nil {
if errors.IsNotFound(err) {
return nil
}
return err
}
klog.V(3).InfoS("PodProbeMarker marker pod success", "pod", klog.KObj(pod), "patch", string(patchBytes))
return nil
}

func (r *ReconcilePodProbeMarker) updateNodePodProbes(ppm *appsv1alpha1.PodProbeMarker, nodeName string, pods []*corev1.Pod) error {
npp := &appsv1alpha1.NodePodProbe{}
err := r.Get(context.TODO(), client.ObjectKey{Name: nodeName}, npp)
Expand Down Expand Up @@ -355,28 +432,50 @@ func setPodContainerProbes(podProbe *appsv1alpha1.PodProbe, probe appsv1alpha1.P
}

// If you need update the pod object, you must DeepCopy it
func (r *ReconcilePodProbeMarker) getMatchingPods(ppm *appsv1alpha1.PodProbeMarker) ([]*corev1.Pod, error) {
// para1: normal pods, which is on normal node
// para2: serverless pods
func (r *ReconcilePodProbeMarker) getMatchingPods(ppm *appsv1alpha1.PodProbeMarker) ([]*corev1.Pod, []*corev1.Pod, error) {
// get more faster selector
selector, err := util.ValidatedLabelSelectorAsSelector(ppm.Spec.Selector)
if err != nil {
return nil, err
return nil, nil, err
}
// DisableDeepCopy:true, indicates must be deep copy before update pod objection
listOpts := &client.ListOptions{LabelSelector: selector, Namespace: ppm.Namespace}
podList := &corev1.PodList{}
if listErr := r.Client.List(context.TODO(), podList, listOpts, utilclient.DisableDeepCopy); listErr != nil {
return nil, err
return nil, nil, err
}
pods := make([]*corev1.Pod, 0, len(podList.Items))
normalPods := make([]*corev1.Pod, 0, len(podList.Items))
serverlessPods := make([]*corev1.Pod, 0, len(podList.Items))
nodes := map[string]*corev1.Node{}
for i := range podList.Items {
pod := &podList.Items[i]
condition := util.GetCondition(pod, corev1.PodInitialized)
if kubecontroller.IsPodActive(pod) && pod.Spec.NodeName != "" &&
condition != nil && condition.Status == corev1.ConditionTrue {
pods = append(pods, pod)
if !kubecontroller.IsPodActive(pod) || condition == nil ||
condition.Status != corev1.ConditionTrue || pod.Spec.NodeName == "" {
continue
}

node, ok := nodes[pod.Spec.NodeName]
if !ok {
node = &corev1.Node{}
if err = r.Get(context.TODO(), client.ObjectKey{Name: pod.Spec.NodeName}, node); err != nil {
if errors.IsNotFound(err) {
continue
}
return nil, nil, err
}
nodes[node.Name] = node
}

if node.Labels["type"] == VirtualKubelet {
serverlessPods = append(serverlessPods, pod)
} else {
normalPods = append(normalPods, pod)
}
}
return pods, nil
return normalPods, serverlessPods, nil
}

func (r *ReconcilePodProbeMarker) removePodProbeFromNodePodProbe(ppmName, nppName string) error {
Expand Down
Loading

0 comments on commit 0b4d1b3

Please sign in to comment.