Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

podprobemarker support serverless pod #1875

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Build the manager and daemon binaries
ARG BASE_IMAGE=alpine
ARG BASE_IMAGE_VERSION=3.19@sha256:ae65dbf8749a7d4527648ccee1fa3deb6bfcae34cbc30fc67aa45c44dcaa90ee
FROM golang:1.20.14-alpine3.19@sha256:e47f121850f4e276b2b210c56df3fda9191278dd84a3a442bfe0b09934462a8f as builder
FROM golang:1.20.14-alpine3.19@sha256:e47f121850f4e276b2b210c56df3fda9191278dd84a3a442bfe0b09934462a8f AS builder

WORKDIR /workspace
# Copy the Go Modules manifests
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile_multiarch
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
ARG BASE_IMAGE=alpine
ARG BASE_IMAGE_VERSION=3.19@sha256:ae65dbf8749a7d4527648ccee1fa3deb6bfcae34cbc30fc67aa45c44dcaa90ee
ARG BUILD_BASE_IMAGE=golang:1.20.14-alpine3.19@sha256:e47f121850f4e276b2b210c56df3fda9191278dd84a3a442bfe0b09934462a8f
FROM --platform=$BUILDPLATFORM ${BUILD_BASE_IMAGE} as builder
FROM --platform=$BUILDPLATFORM ${BUILD_BASE_IMAGE} AS builder

WORKDIR /workspace
# Copy the Go Modules manifests
Expand Down
12 changes: 12 additions & 0 deletions apis/apps/v1alpha1/node_pod_probe_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package v1alpha1

import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

Expand Down Expand Up @@ -87,6 +88,17 @@ const (
ProbeUnknown ProbeState = "Unknown"
)

func (p ProbeState) IsEqualPodConditionStatus(status corev1.ConditionStatus) bool {
switch status {
case corev1.ConditionTrue:
return p == ProbeSucceeded
case corev1.ConditionFalse:
return p == ProbeFailed
default:
return p == ProbeUnknown
}
}

// +genclient
// +genclient:nonNamespaced
// +k8s:openapi-gen=true
Expand Down
25 changes: 25 additions & 0 deletions apis/apps/v1alpha1/pod_probe_marker_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,31 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

const (
// PodProbeMarkerAnnotationKey records the Probe Spec, mainly used for serverless Pod scenarios, as follows:
// annotations:
// kruise.io/podprobe: |
// [
// {
// "containerName": "minecraft",
// "name": "healthy",
// "podConditionType": "game.kruise.io/healthy",
// "probe": {
// "exec": {
// "command": [
// "bash",
// "/data/probe.sh"
// ]
// }
// }
// }
// ]
PodProbeMarkerAnnotationKey = "kruise.io/podprobe"
// PodProbeMarkerListAnnotationKey records the injected PodProbeMarker Name List
furykerry marked this conversation as resolved.
Show resolved Hide resolved
// example: kruise.io/podprobemarker-list="probe-marker-1,probe-marker-2"
PodProbeMarkerListAnnotationKey = "kruise.io/podprobemarker-list"
)

// PodProbeMarkerSpec defines the desired state of PodProbeMarker
type PodProbeMarkerSpec struct {
// Selector is a label query over pods that should exec custom probe
Expand Down
127 changes: 113 additions & 14 deletions pkg/controller/podprobemarker/pod_probe_marker_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,19 @@

import (
"context"
"encoding/json"
"flag"
"fmt"
"reflect"
"strings"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"
kubecontroller "k8s.io/kubernetes/pkg/controller"
Expand Down Expand Up @@ -60,6 +64,8 @@
const (
// PodProbeMarkerFinalizer is on PodProbeMarker, and used to remove podProbe from NodePodProbe.Spec
PodProbeMarkerFinalizer = "kruise.io/node-pod-probe-cleanup"

VirtualKubelet = "virtual-kubelet"
)

/**
Expand Down Expand Up @@ -145,14 +151,14 @@
// Error reading the object - requeue the request.
return err
}
pods, err := r.getMatchingPods(ppm)
normalPods, serverlessPods, err := r.getMatchingPods(ppm)
if err != nil {
klog.ErrorS(err, "PodProbeMarker listed pods failed", "podProbeMarker", klog.KObj(ppm))
return err
}
// remove podProbe from NodePodProbe.Spec
if !ppm.DeletionTimestamp.IsZero() {
return r.handlerPodProbeMarkerFinalizer(ppm, pods)
return r.handlerPodProbeMarkerFinalizer(ppm, normalPods)
}
// add finalizer
if !controllerutil.ContainsFinalizer(ppm, PodProbeMarkerFinalizer) {
Expand All @@ -164,9 +170,26 @@
klog.V(3).InfoS("Added PodProbeMarker finalizer success", "podProbeMarker", klog.KObj(ppm))
}

// map[probe.PodConditionType] = probe.MarkerPolicy
markers := make(map[string][]appsv1alpha1.ProbeMarkerPolicy)
for _, probe := range ppm.Spec.Probes {
if probe.PodConditionType == "" || len(probe.MarkerPolicy) == 0 {
continue

Check warning on line 177 in pkg/controller/podprobemarker/pod_probe_marker_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/podprobemarker/pod_probe_marker_controller.go#L177

Added line #L177 was not covered by tests
}
markers[probe.PodConditionType] = probe.MarkerPolicy
}
if utilfeature.DefaultFeatureGate.Enabled(features.EnablePodProbeMarkerOnServerless) && len(markers) != 0 {
for _, pod := range serverlessPods {
if err = r.markServerlessPod(pod, markers); err != nil {
klog.ErrorS(err, "Failed to marker serverless pod", "podProbeMarker", klog.KObj(ppm), "pod", klog.KObj(pod))
return err
}

Check warning on line 186 in pkg/controller/podprobemarker/pod_probe_marker_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/podprobemarker/pod_probe_marker_controller.go#L182-L186

Added lines #L182 - L186 were not covered by tests
}
}

groupByNode := make(map[string][]*corev1.Pod)
for i, pod := range pods {
groupByNode[pod.Spec.NodeName] = append(groupByNode[pod.Spec.NodeName], pods[i])
for i, pod := range normalPods {
groupByNode[pod.Spec.NodeName] = append(groupByNode[pod.Spec.NodeName], normalPods[i])
}

// add podProbe in NodePodProbe
Expand All @@ -182,11 +205,12 @@
if err := r.Client.Get(context.TODO(), client.ObjectKey{Namespace: ppm.Namespace, Name: ppm.Name}, ppmClone); err != nil {
klog.ErrorS(err, "Failed to get updated PodProbeMarker from client", "podProbeMarker", klog.KObj(ppm))
}
if ppmClone.Status.ObservedGeneration == ppmClone.Generation && int(ppmClone.Status.MatchedPods) == len(pods) {
matchedPods := len(normalPods) + len(serverlessPods)
if ppmClone.Status.ObservedGeneration == ppmClone.Generation && int(ppmClone.Status.MatchedPods) == matchedPods {
return nil
}
ppmClone.Status.ObservedGeneration = ppmClone.Generation
ppmClone.Status.MatchedPods = int64(len(pods))
ppmClone.Status.MatchedPods = int64(matchedPods)
return r.Client.Status().Update(context.TODO(), ppmClone)
}); err != nil {
klog.ErrorS(err, "PodProbeMarker update status failed", "podProbeMarker", klog.KObj(ppm))
Expand Down Expand Up @@ -214,6 +238,59 @@
return nil
}

// marker labels or annotations on Pod based on probing results
// markers is map[probe.PodConditionType] = probe.MarkerPolicy
func (r *ReconcilePodProbeMarker) markServerlessPod(pod *corev1.Pod, markers map[string][]appsv1alpha1.ProbeMarkerPolicy) error {
newObjectMeta := *pod.ObjectMeta.DeepCopy()
if newObjectMeta.Annotations == nil {
newObjectMeta.Annotations = make(map[string]string)
}

Check warning on line 247 in pkg/controller/podprobemarker/pod_probe_marker_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/podprobemarker/pod_probe_marker_controller.go#L246-L247

Added lines #L246 - L247 were not covered by tests
if newObjectMeta.Labels == nil {
newObjectMeta.Labels = make(map[string]string)
}

Check warning on line 250 in pkg/controller/podprobemarker/pod_probe_marker_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/podprobemarker/pod_probe_marker_controller.go#L249-L250

Added lines #L249 - L250 were not covered by tests

for cond, policy := range markers {
condition := util.GetCondition(pod, corev1.PodConditionType(cond))
if condition == nil {
continue

Check warning on line 255 in pkg/controller/podprobemarker/pod_probe_marker_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/podprobemarker/pod_probe_marker_controller.go#L255

Added line #L255 was not covered by tests
}

for _, obj := range policy {
if !obj.State.IsEqualPodConditionStatus(condition.Status) {
continue
}
for k, v := range obj.Annotations {
newObjectMeta.Annotations[k] = v
}
for k, v := range obj.Labels {
newObjectMeta.Labels[k] = v
}
}
}

// no change
if reflect.DeepEqual(pod.ObjectMeta, newObjectMeta) {
return nil
}

Check warning on line 274 in pkg/controller/podprobemarker/pod_probe_marker_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/podprobemarker/pod_probe_marker_controller.go#L273-L274

Added lines #L273 - L274 were not covered by tests

// patch change
oldBytes, _ := json.Marshal(corev1.Pod{ObjectMeta: pod.ObjectMeta})
newBytes, _ := json.Marshal(corev1.Pod{ObjectMeta: newObjectMeta})
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldBytes, newBytes, &corev1.Pod{})
if err != nil {
return err
}

Check warning on line 282 in pkg/controller/podprobemarker/pod_probe_marker_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/podprobemarker/pod_probe_marker_controller.go#L281-L282

Added lines #L281 - L282 were not covered by tests
obj := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: pod.Namespace, Name: pod.Name}}
if err = r.Patch(context.TODO(), obj, client.RawPatch(types.StrategicMergePatchType, patchBytes)); err != nil {
if errors.IsNotFound(err) {
return nil
}
return err

Check warning on line 288 in pkg/controller/podprobemarker/pod_probe_marker_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/podprobemarker/pod_probe_marker_controller.go#L285-L288

Added lines #L285 - L288 were not covered by tests
}
klog.V(3).InfoS("PodProbeMarker marker pod success", "pod", klog.KObj(pod), "patch", string(patchBytes))
return nil
}

func (r *ReconcilePodProbeMarker) updateNodePodProbes(ppm *appsv1alpha1.PodProbeMarker, nodeName string, pods []*corev1.Pod) error {
npp := &appsv1alpha1.NodePodProbe{}
err := r.Get(context.TODO(), client.ObjectKey{Name: nodeName}, npp)
Expand Down Expand Up @@ -355,28 +432,50 @@
}

// If you need update the pod object, you must DeepCopy it
func (r *ReconcilePodProbeMarker) getMatchingPods(ppm *appsv1alpha1.PodProbeMarker) ([]*corev1.Pod, error) {
// para1: normal pods, which is on normal node
// para2: serverless pods
func (r *ReconcilePodProbeMarker) getMatchingPods(ppm *appsv1alpha1.PodProbeMarker) ([]*corev1.Pod, []*corev1.Pod, error) {
// get more faster selector
selector, err := util.ValidatedLabelSelectorAsSelector(ppm.Spec.Selector)
if err != nil {
return nil, err
return nil, nil, err

Check warning on line 441 in pkg/controller/podprobemarker/pod_probe_marker_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/podprobemarker/pod_probe_marker_controller.go#L441

Added line #L441 was not covered by tests
}
// DisableDeepCopy:true, indicates must be deep copy before update pod objection
listOpts := &client.ListOptions{LabelSelector: selector, Namespace: ppm.Namespace}
podList := &corev1.PodList{}
if listErr := r.Client.List(context.TODO(), podList, listOpts, utilclient.DisableDeepCopy); listErr != nil {
return nil, err
return nil, nil, err

Check warning on line 447 in pkg/controller/podprobemarker/pod_probe_marker_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/podprobemarker/pod_probe_marker_controller.go#L447

Added line #L447 was not covered by tests
}
pods := make([]*corev1.Pod, 0, len(podList.Items))
normalPods := make([]*corev1.Pod, 0, len(podList.Items))
serverlessPods := make([]*corev1.Pod, 0, len(podList.Items))
nodes := map[string]*corev1.Node{}
for i := range podList.Items {
pod := &podList.Items[i]
condition := util.GetCondition(pod, corev1.PodInitialized)
if kubecontroller.IsPodActive(pod) && pod.Spec.NodeName != "" &&
condition != nil && condition.Status == corev1.ConditionTrue {
pods = append(pods, pod)
if !kubecontroller.IsPodActive(pod) || condition == nil ||
condition.Status != corev1.ConditionTrue || pod.Spec.NodeName == "" {
continue
}

node, ok := nodes[pod.Spec.NodeName]
if !ok {
node = &corev1.Node{}
if err = r.Get(context.TODO(), client.ObjectKey{Name: pod.Spec.NodeName}, node); err != nil {
if errors.IsNotFound(err) {
continue

Check warning on line 465 in pkg/controller/podprobemarker/pod_probe_marker_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/podprobemarker/pod_probe_marker_controller.go#L464-L465

Added lines #L464 - L465 were not covered by tests
}
return nil, nil, err

Check warning on line 467 in pkg/controller/podprobemarker/pod_probe_marker_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/podprobemarker/pod_probe_marker_controller.go#L467

Added line #L467 was not covered by tests
}
nodes[node.Name] = node
}

if node.Labels["type"] == VirtualKubelet {
serverlessPods = append(serverlessPods, pod)

Check warning on line 473 in pkg/controller/podprobemarker/pod_probe_marker_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/podprobemarker/pod_probe_marker_controller.go#L473

Added line #L473 was not covered by tests
} else {
normalPods = append(normalPods, pod)
}
}
return pods, nil
return normalPods, serverlessPods, nil
}

func (r *ReconcilePodProbeMarker) removePodProbeFromNodePodProbe(ppmName, nppName string) error {
Expand Down
Loading
Loading