From 0b23bf65246f0afcb8496dd03548213a3c92d8ee Mon Sep 17 00:00:00 2001 From: AiRanthem Date: Fri, 10 Jan 2025 11:22:13 +0800 Subject: [PATCH] Feature: Add context logging to WorkloadSpread Signed-off-by: AiRanthem --- pkg/controller/workloadspread/reschedule.go | 15 +- .../workloadspread/reschedule_test.go | 2 +- .../update_pod_deletion_cost.go | 32 ++-- .../workloadspread_controller.go | 137 ++++++++++-------- .../workloadspread_controller_test.go | 8 +- .../workloadspread_controller_utils.go | 13 +- .../workloadspread_controller_utils_test.go | 3 +- .../workloadspread_event_handler.go | 51 ++++--- .../workloadspread_event_handler_test.go | 12 +- pkg/util/log.go | 42 ++++++ pkg/util/workloadspread/workloadspread.go | 108 +++++++------- .../workloadspread/workloadspread_test.go | 18 ++- .../pod/mutating/pod_create_update_handler.go | 3 +- pkg/webhook/pod/mutating/workloadspread.go | 8 +- pkg/webhook/pod/validating/workloadspread.go | 19 ++- .../workloadspread_create_update_handler.go | 6 +- .../validating/workloadspread_validation.go | 25 ++-- .../workloadspread_validation_test.go | 5 +- 18 files changed, 299 insertions(+), 208 deletions(-) create mode 100644 pkg/util/log.go diff --git a/pkg/controller/workloadspread/reschedule.go b/pkg/controller/workloadspread/reschedule.go index a429053676..92aedc68ac 100644 --- a/pkg/controller/workloadspread/reschedule.go +++ b/pkg/controller/workloadspread/reschedule.go @@ -21,6 +21,7 @@ import ( "time" "github.com/openkruise/kruise/pkg/controller/util" + util2 "github.com/openkruise/kruise/pkg/util" corev1 "k8s.io/api/core/v1" "k8s.io/klog/v2" @@ -36,9 +37,10 @@ import ( // should be recovered schedulable status to try scheduling Pods again. // TODO optimize the unschedulable duration of subset. // return one parameters - unschedulable Pods belongs to this subset. -func (r *ReconcileWorkloadSpread) rescheduleSubset(ws *appsv1alpha1.WorkloadSpread, +func (r *ReconcileWorkloadSpread) rescheduleSubset(ctx context.Context, ws *appsv1alpha1.WorkloadSpread, pods []*corev1.Pod, subsetStatus, oldSubsetStatus *appsv1alpha1.WorkloadSpreadSubsetStatus) []*corev1.Pod { + logger := util2.FromLogContext(ctx) scheduleFailedPods := make([]*corev1.Pod, 0) for i := range pods { if PodUnscheduledTimeout(ws, pods[i]) { @@ -47,7 +49,7 @@ func (r *ReconcileWorkloadSpread) rescheduleSubset(ws *appsv1alpha1.WorkloadSpre } unschedulable := len(scheduleFailedPods) > 0 if unschedulable { - klog.V(3).InfoS("Subset of WorkloadSpread is unschedulable", "subsetName", subsetStatus.Name, "workloadSpread", klog.KObj(ws)) + logger.V(3).Info("Subset of WorkloadSpread is unschedulable", "subsetName", subsetStatus.Name, "workloadSpread", klog.KObj(ws)) } oldCondition := GetWorkloadSpreadSubsetCondition(oldSubsetStatus, appsv1alpha1.SubsetSchedulable) @@ -85,18 +87,19 @@ func (r *ReconcileWorkloadSpread) rescheduleSubset(ws *appsv1alpha1.WorkloadSpre return scheduleFailedPods } -func (r *ReconcileWorkloadSpread) cleanupUnscheduledPods(ws *appsv1alpha1.WorkloadSpread, +func (r *ReconcileWorkloadSpread) cleanupUnscheduledPods(ctx context.Context, ws *appsv1alpha1.WorkloadSpread, scheduleFailedPodsMap map[string][]*corev1.Pod) error { for subsetName, pods := range scheduleFailedPodsMap { - if err := r.deletePodsForSubset(ws, pods, subsetName); err != nil { + if err := r.deletePodsForSubset(ctx, ws, pods, subsetName); err != nil { return err } } return nil } -func (r *ReconcileWorkloadSpread) deletePodsForSubset(ws *appsv1alpha1.WorkloadSpread, +func (r *ReconcileWorkloadSpread) deletePodsForSubset(ctx context.Context, ws *appsv1alpha1.WorkloadSpread, pods []*corev1.Pod, subsetName string) error { + logger := util2.FromLogContext(ctx) for _, pod := range pods { if err := r.Client.Delete(context.TODO(), pod); err != nil { r.recorder.Eventf(ws, corev1.EventTypeWarning, @@ -105,7 +108,7 @@ func (r *ReconcileWorkloadSpread) deletePodsForSubset(ws *appsv1alpha1.WorkloadS pod.Namespace, pod.Name, subsetName, ws.Namespace, ws.Name) return err } - klog.V(3).InfoS("WorkloadSpread deleted unschedulabe Pod in Subset successfully", "workloadSpread", klog.KObj(ws), "pod", klog.KObj(pod), "subsetName", subsetName) + logger.V(3).Info("WorkloadSpread deleted unschedulabe Pod in Subset successfully", "workloadSpread", klog.KObj(ws), "pod", klog.KObj(pod), "subsetName", subsetName) } return nil } diff --git a/pkg/controller/workloadspread/reschedule_test.go b/pkg/controller/workloadspread/reschedule_test.go index d4d0a6168d..527ffc8438 100644 --- a/pkg/controller/workloadspread/reschedule_test.go +++ b/pkg/controller/workloadspread/reschedule_test.go @@ -284,7 +284,7 @@ func TestRescheduleSubset(t *testing.T) { }, } - err := reconciler.syncWorkloadSpread(workloadSpread) + err := reconciler.syncWorkloadSpread(context.Background(), workloadSpread) if err != nil { t.Fatalf("sync WorkloadSpread failed: %s", err.Error()) } diff --git a/pkg/controller/workloadspread/update_pod_deletion_cost.go b/pkg/controller/workloadspread/update_pod_deletion_cost.go index 07ad84f9b5..4ebc738ff1 100644 --- a/pkg/controller/workloadspread/update_pod_deletion_cost.go +++ b/pkg/controller/workloadspread/update_pod_deletion_cost.go @@ -22,6 +22,7 @@ import ( "sort" "strconv" + "github.com/openkruise/kruise/pkg/util" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" @@ -56,9 +57,10 @@ func (r *ReconcileWorkloadSpread) getWorkloadLatestVersion(ws *appsv1alpha1.Work return wsutil.GetWorkloadVersion(r.Client, object) } -func (r *ReconcileWorkloadSpread) updateDeletionCost(ws *appsv1alpha1.WorkloadSpread, +func (r *ReconcileWorkloadSpread) updateDeletionCost(ctx context.Context, ws *appsv1alpha1.WorkloadSpread, versionedPodMap map[string]map[string][]*corev1.Pod, workloadReplicas int32) error { + logger := util.FromLogContext(ctx) targetRef := ws.Spec.TargetReference if targetRef == nil || !isEffectiveKindForDeletionCost(targetRef) { return nil @@ -66,7 +68,7 @@ func (r *ReconcileWorkloadSpread) updateDeletionCost(ws *appsv1alpha1.WorkloadSp latestVersion, err := r.getWorkloadLatestVersion(ws) if err != nil { - klog.ErrorS(err, "Failed to get the latest version for workload in workloadSpread", "workloadSpread", klog.KObj(ws)) + logger.Error(err, "Failed to get the latest version for workload in workloadSpread", "workloadSpread", klog.KObj(ws)) return err } @@ -74,7 +76,7 @@ func (r *ReconcileWorkloadSpread) updateDeletionCost(ws *appsv1alpha1.WorkloadSp // - to the latest version, we hope to scale down the last subset preferentially; // - to other old versions, we hope to scale down the first subset preferentially; for version, podMap := range versionedPodMap { - err = r.updateDeletionCostBySubset(ws, podMap, workloadReplicas, version != latestVersion) + err = r.updateDeletionCostBySubset(ctx, ws, podMap, workloadReplicas, version != latestVersion) if err != nil { return err } @@ -82,7 +84,7 @@ func (r *ReconcileWorkloadSpread) updateDeletionCost(ws *appsv1alpha1.WorkloadSp return nil } -func (r *ReconcileWorkloadSpread) updateDeletionCostBySubset(ws *appsv1alpha1.WorkloadSpread, +func (r *ReconcileWorkloadSpread) updateDeletionCostBySubset(ctx context.Context, ws *appsv1alpha1.WorkloadSpread, podMap map[string][]*corev1.Pod, workloadReplicas int32, reverseOrder bool) error { subsetNum := len(ws.Spec.Subsets) subsetIndex := func(index int) int { @@ -93,14 +95,14 @@ func (r *ReconcileWorkloadSpread) updateDeletionCostBySubset(ws *appsv1alpha1.Wo } // update Pod's deletion-cost annotation in each subset for idx, subset := range ws.Spec.Subsets { - if err := r.syncSubsetPodDeletionCost(ws, &subset, subsetIndex(idx), podMap[subset.Name], workloadReplicas); err != nil { + if err := r.syncSubsetPodDeletionCost(ctx, ws, &subset, subsetIndex(idx), podMap[subset.Name], workloadReplicas); err != nil { return err } } // update the deletion-cost annotation for such pods that do not match any real subsets. // these pods will have the minimum deletion-cost, and will be deleted preferentially. if len(podMap[FakeSubsetName]) > 0 { - if err := r.syncSubsetPodDeletionCost(ws, nil, len(ws.Spec.Subsets), podMap[FakeSubsetName], workloadReplicas); err != nil { + if err := r.syncSubsetPodDeletionCost(ctx, ws, nil, len(ws.Spec.Subsets), podMap[FakeSubsetName], workloadReplicas); err != nil { return err } } @@ -123,12 +125,13 @@ func (r *ReconcileWorkloadSpread) updateDeletionCostBySubset(ws *appsv1alpha1.Wo // maxReplicas 10 10 nil // pods number 20 20 20 // deletion-cost (300,-100) (200,-200) 100 -func (r *ReconcileWorkloadSpread) syncSubsetPodDeletionCost( +func (r *ReconcileWorkloadSpread) syncSubsetPodDeletionCost(ctx context.Context, ws *appsv1alpha1.WorkloadSpread, subset *appsv1alpha1.WorkloadSpreadSubset, subsetIndex int, pods []*corev1.Pod, workloadReplicas int32) error { + logger := util.FromLogContext(ctx) var err error // slice that will contain all Pods that want to set deletion-cost a positive value. var positivePods []*corev1.Pod @@ -154,7 +157,7 @@ func (r *ReconcileWorkloadSpread) syncSubsetPodDeletionCost( } else { subsetMaxReplicas, err := intstr.GetValueFromIntOrPercent(subset.MaxReplicas, int(workloadReplicas), true) if err != nil || subsetMaxReplicas < 0 { - klog.ErrorS(err, "Failed to get maxReplicas value from subset of WorkloadSpread", "subsetName", subset.Name, "workloadSpread", klog.KObj(ws)) + logger.Error(err, "Failed to get maxReplicas value from subset of WorkloadSpread", "subsetName", subset.Name, "workloadSpread", klog.KObj(ws)) return nil } @@ -180,17 +183,17 @@ func (r *ReconcileWorkloadSpread) syncSubsetPodDeletionCost( } } - err = r.updateDeletionCostForSubsetPods(ws, subset, positivePods, strconv.Itoa(wsutil.PodDeletionCostPositive*(len(ws.Spec.Subsets)-subsetIndex))) + err = r.updateDeletionCostForSubsetPods(ctx, ws, subset, positivePods, strconv.Itoa(wsutil.PodDeletionCostPositive*(len(ws.Spec.Subsets)-subsetIndex))) if err != nil { return err } - return r.updateDeletionCostForSubsetPods(ws, subset, negativePods, strconv.Itoa(wsutil.PodDeletionCostNegative*(subsetIndex+1))) + return r.updateDeletionCostForSubsetPods(ctx, ws, subset, negativePods, strconv.Itoa(wsutil.PodDeletionCostNegative*(subsetIndex+1))) } -func (r *ReconcileWorkloadSpread) updateDeletionCostForSubsetPods(ws *appsv1alpha1.WorkloadSpread, +func (r *ReconcileWorkloadSpread) updateDeletionCostForSubsetPods(ctx context.Context, ws *appsv1alpha1.WorkloadSpread, subset *appsv1alpha1.WorkloadSpreadSubset, pods []*corev1.Pod, deletionCostStr string) error { for _, pod := range pods { - if err := r.patchPodDeletionCost(ws, pod, deletionCostStr); err != nil { + if err := r.patchPodDeletionCost(ctx, ws, pod, deletionCostStr); err != nil { subsetName := FakeSubsetName if subset != nil { subsetName = subset.Name @@ -205,8 +208,9 @@ func (r *ReconcileWorkloadSpread) updateDeletionCostForSubsetPods(ws *appsv1alph return nil } -func (r *ReconcileWorkloadSpread) patchPodDeletionCost(ws *appsv1alpha1.WorkloadSpread, +func (r *ReconcileWorkloadSpread) patchPodDeletionCost(ctx context.Context, ws *appsv1alpha1.WorkloadSpread, pod *corev1.Pod, deletionCostStr string) error { + logger := util.FromLogContext(ctx) clone := pod.DeepCopy() annotationKey := wsutil.PodDeletionCostAnnotation annotationValue := deletionCostStr @@ -226,7 +230,7 @@ func (r *ReconcileWorkloadSpread) patchPodDeletionCost(ws *appsv1alpha1.Workload if err := r.Patch(context.TODO(), clone, client.RawPatch(types.StrategicMergePatchType, []byte(body))); err != nil { return err } - klog.V(3).InfoS("WorkloadSpread patched deletion-cost annotation for Pod successfully", "workloadSpread", klog.KObj(ws), "deletionCost", deletionCostStr, "pod", klog.KObj(pod)) + logger.V(3).Info("WorkloadSpread patched deletion-cost annotation for Pod successfully", "workloadSpread", klog.KObj(ws), "deletionCost", deletionCostStr, "pod", klog.KObj(pod)) return nil } diff --git a/pkg/controller/workloadspread/workloadspread_controller.go b/pkg/controller/workloadspread/workloadspread_controller.go index 951839ca7f..78376b82e9 100644 --- a/pkg/controller/workloadspread/workloadspread_controller.go +++ b/pkg/controller/workloadspread/workloadspread_controller.go @@ -204,7 +204,9 @@ type ReconcileWorkloadSpread struct { // +kubebuilder:rbac:groups=batch,resources=jobs,verbs=get;list;watch // +kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch;update;patch;delete -func (r *ReconcileWorkloadSpread) Reconcile(_ context.Context, req reconcile.Request) (reconcile.Result, error) { +func (r *ReconcileWorkloadSpread) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { + ctx = util.NewLogContext(ctx) + logger := util.FromLogContext(ctx) ws := &appsv1alpha1.WorkloadSpread{} err := r.Get(context.TODO(), req.NamespacedName, ws) @@ -220,7 +222,7 @@ func (r *ReconcileWorkloadSpread) Reconcile(_ context.Context, req reconcile.Req Name: req.Name, }, }); cacheErr != nil { - klog.ErrorS(cacheErr, "Failed to delete workloadSpread cache after deletion", "workloadSpread", req) + logger.Error(cacheErr, "Failed to delete workloadSpread cache after deletion", "workloadSpread", req) } return reconcile.Result{}, nil } else if err != nil { @@ -229,13 +231,14 @@ func (r *ReconcileWorkloadSpread) Reconcile(_ context.Context, req reconcile.Req } startTime := time.Now() - klog.V(3).InfoS("Began to process WorkloadSpread", "workloadSpread", klog.KObj(ws)) - err = r.syncWorkloadSpread(ws) - klog.V(3).InfoS("Finished syncing WorkloadSpread", "workloadSpread", klog.KObj(ws), "cost", time.Since(startTime)) + logger.V(3).Info("Began to process WorkloadSpread", "workloadSpread", klog.KObj(ws)) + err = r.syncWorkloadSpread(ctx, ws) + logger.V(3).Info("Finished syncing WorkloadSpread", "workloadSpread", klog.KObj(ws), "cost", time.Since(startTime)) return reconcile.Result{RequeueAfter: durationStore.Pop(getWorkloadSpreadKey(ws))}, err } -func (r *ReconcileWorkloadSpread) getPodJob(ref *appsv1alpha1.TargetReference, namespace string) ([]*corev1.Pod, int32, error) { +func (r *ReconcileWorkloadSpread) getPodJob(ctx context.Context, ref *appsv1alpha1.TargetReference, namespace string) ([]*corev1.Pod, int32, error) { + logger := util.FromLogContext(ctx) ok, err := wsutil.VerifyGroupKind(ref, controllerKindJob.Kind, []string{controllerKindJob.Group}) if err != nil || !ok { return nil, 0, err @@ -246,7 +249,7 @@ func (r *ReconcileWorkloadSpread) getPodJob(ref *appsv1alpha1.TargetReference, n if err != nil { // when error is NotFound, it is ok here. if errors.IsNotFound(err) { - klog.V(3).InfoS("Could not find Job", "job", klog.KRef(namespace, ref.Name)) + logger.V(3).Info("Could not find Job", "job", klog.KRef(namespace, ref.Name)) return nil, 0, nil } return nil, 0, err @@ -254,7 +257,7 @@ func (r *ReconcileWorkloadSpread) getPodJob(ref *appsv1alpha1.TargetReference, n labelSelector, err := util.ValidatedLabelSelectorAsSelector(job.Spec.Selector) if err != nil { - klog.ErrorS(err, "Failed to get labelSelector") + logger.Error(err, "Failed to get labelSelector") return nil, 0, err } @@ -276,7 +279,8 @@ func (r *ReconcileWorkloadSpread) getPodJob(ref *appsv1alpha1.TargetReference, n return matchedPods, *(job.Spec.Parallelism), nil } -func (r *ReconcileWorkloadSpread) getReplicasPathList(ws *appsv1alpha1.WorkloadSpread) ([]string, error) { +func (r *ReconcileWorkloadSpread) getReplicasPathList(ctx context.Context, ws *appsv1alpha1.WorkloadSpread) ([]string, error) { + logger := util.FromLogContext(ctx) if ws.Spec.TargetReference == nil { return nil, nil } @@ -295,7 +299,7 @@ func (r *ReconcileWorkloadSpread) getReplicasPathList(ws *appsv1alpha1.WorkloadS if wl.GroupVersion() != gv || wl.GroupVersionKind.Kind != ws.Spec.TargetReference.Kind { continue } - klog.V(5).InfoS("found replicas path in whitelist", "path", wl.ReplicasPath, "workloadSpread", klog.KObj(ws)) + logger.V(5).Info("found replicas path in whitelist", "path", wl.ReplicasPath, "workloadSpread", klog.KObj(ws)) return []string{wl.ReplicasPath}, nil } return nil, nil @@ -305,7 +309,8 @@ func (r *ReconcileWorkloadSpread) getReplicasPathList(ws *appsv1alpha1.WorkloadS // return two parameters // 1. podList for workloadSpread // 2. workloadReplicas -func (r *ReconcileWorkloadSpread) getPodsForWorkloadSpread(ws *appsv1alpha1.WorkloadSpread) ([]*corev1.Pod, int32, error) { +func (r *ReconcileWorkloadSpread) getPodsForWorkloadSpread(ctx context.Context, ws *appsv1alpha1.WorkloadSpread) ([]*corev1.Pod, int32, error) { + logger := util.FromLogContext(ctx) if ws.Spec.TargetReference == nil { return nil, 0, nil } @@ -316,26 +321,27 @@ func (r *ReconcileWorkloadSpread) getPodsForWorkloadSpread(ws *appsv1alpha1.Work switch targetRef.Kind { case controllerKindJob.Kind: - pods, workloadReplicas, err = r.getPodJob(targetRef, ws.Namespace) + pods, workloadReplicas, err = r.getPodJob(ctx, targetRef, ws.Namespace) default: pods, workloadReplicas, err = r.controllerFinder.GetPodsForRef(targetRef.APIVersion, targetRef.Kind, ws.Namespace, targetRef.Name, false) } if err != nil { - klog.ErrorS(err, "WorkloadSpread handled targetReference failed", "workloadSpread", klog.KObj(ws)) + logger.Error(err, "WorkloadSpread handled targetReference failed", "workloadSpread", klog.KObj(ws)) return nil, 0, err } - workloadReplicas, pods, err = r.filterWorkload(ws, pods, workloadReplicas) + workloadReplicas, pods, err = r.filterWorkload(ctx, ws, pods, workloadReplicas) if err != nil { - klog.ErrorS(err, "Filter workload failed", "workloadSpread", klog.KObj(ws)) + logger.Error(err, "Filter workload failed", "workloadSpread", klog.KObj(ws)) return nil, 0, err } return pods, workloadReplicas, err } -func (r *ReconcileWorkloadSpread) filterWorkload(ws *appsv1alpha1.WorkloadSpread, pods []*corev1.Pod, replicas int32) (int32, []*corev1.Pod, error) { - klog.V(5).InfoS("before workload filtering", "pods", len(pods), "replicas", replicas, "workloadSpread", klog.KObj(ws)) - replicasPathList, err := r.getReplicasPathList(ws) +func (r *ReconcileWorkloadSpread) filterWorkload(ctx context.Context, ws *appsv1alpha1.WorkloadSpread, pods []*corev1.Pod, replicas int32) (int32, []*corev1.Pod, error) { + logger := util.FromLogContext(ctx) + logger.V(5).Info("before workload filtering", "pods", len(pods), "replicas", replicas, "workloadSpread", klog.KObj(ws)) + replicasPathList, err := r.getReplicasPathList(ctx, ws) if err != nil { return replicas, pods, err } @@ -358,11 +364,11 @@ func (r *ReconcileWorkloadSpread) filterWorkload(ws *appsv1alpha1.WorkloadSpread } filteredReplicas += n } - klog.V(4).InfoS("replicas after filtering", "replicas", filteredReplicas, + logger.V(4).Info("replicas after filtering", "replicas", filteredReplicas, "replicasPathList", replicasPathList, "workloadSpread", klog.KObj(ws)) } else { filteredReplicas = replicas - klog.V(4).InfoS("replicas not filtered", "workloadSpread", klog.KObj(ws)) + logger.V(4).Info("replicas not filtered", "workloadSpread", klog.KObj(ws)) } var filteredPods []*corev1.Pod if ws.Spec.TargetFilter != nil && ws.Spec.TargetFilter.Selector != nil { @@ -375,7 +381,7 @@ func (r *ReconcileWorkloadSpread) filterWorkload(ws *appsv1alpha1.WorkloadSpread filteredPods = append(filteredPods, pod) } } - klog.V(4).InfoS("pods after filtering", "pods", len(filteredPods), "selector", ws.Spec.TargetFilter.Selector) + logger.V(4).Info("pods after filtering", "pods", len(filteredPods), "selector", ws.Spec.TargetFilter.Selector) } else { filteredPods = pods } @@ -388,49 +394,51 @@ func (r *ReconcileWorkloadSpread) filterWorkload(ws *appsv1alpha1.WorkloadSpread // Lastly, we update the WorkloadSpread's Status and clean up scheduled failed Pods. controller should collaborate with webhook // to maintain WorkloadSpread status together. The controller is responsible for calculating the real status, and the webhook // mainly counts missingReplicas and records the creation or deletion entry of Pod into map. -func (r *ReconcileWorkloadSpread) syncWorkloadSpread(ws *appsv1alpha1.WorkloadSpread) error { +func (r *ReconcileWorkloadSpread) syncWorkloadSpread(ctx context.Context, ws *appsv1alpha1.WorkloadSpread) error { + logger := util.FromLogContext(ctx) if ws.Spec.TargetReference == nil { - klog.InfoS("WorkloadSpread has no target reference", "workloadSpread", klog.KObj(ws)) + logger.Info("WorkloadSpread has no target reference", "workloadSpread", klog.KObj(ws)) return nil } - pods, workloadReplicas, err := r.getPodsForWorkloadSpread(ws) + pods, workloadReplicas, err := r.getPodsForWorkloadSpread(ctx, ws) if err != nil { - klog.ErrorS(err, "WorkloadSpread got matched pods failed", "workloadSpread", klog.KObj(ws)) + logger.Error(err, "WorkloadSpread got matched pods failed", "workloadSpread", klog.KObj(ws)) return err } if len(pods) == 0 { - klog.InfoS("WorkloadSpread had no matched pods", "workloadSpread", klog.KObj(ws), "targetWorkloadReplicas", workloadReplicas) + logger.Info("WorkloadSpread had no matched pods", "workloadSpread", klog.KObj(ws), "targetWorkloadReplicas", workloadReplicas) } // group Pods by pod-revision and subset - versionedPodMap, subsetPodMap, err := r.groupVersionedPods(ws, pods, workloadReplicas) + versionedPodMap, subsetPodMap, err := r.groupVersionedPods(ctx, ws, pods, workloadReplicas) if err != nil { return err } // update deletion-cost for each subset - err = r.updateDeletionCost(ws, versionedPodMap, workloadReplicas) + err = r.updateDeletionCost(ctx, ws, versionedPodMap, workloadReplicas) if err != nil { return err } // calculate status and reschedule - status, scheduleFailedPodMap := r.calculateWorkloadSpreadStatus(ws, versionedPodMap, subsetPodMap, workloadReplicas) + status, scheduleFailedPodMap := r.calculateWorkloadSpreadStatus(ctx, ws, versionedPodMap, subsetPodMap, workloadReplicas) if status == nil { return nil } // update status - err = r.UpdateWorkloadSpreadStatus(ws, status) + err = r.UpdateWorkloadSpreadStatus(ctx, ws, status) if err != nil { return err } // clean up unschedulable Pods - return r.cleanupUnscheduledPods(ws, scheduleFailedPodMap) + return r.cleanupUnscheduledPods(ctx, ws, scheduleFailedPodMap) } -func getInjectWorkloadSpreadFromPod(pod *corev1.Pod) *wsutil.InjectWorkloadSpread { +func getInjectWorkloadSpreadFromPod(ctx context.Context, pod *corev1.Pod) *wsutil.InjectWorkloadSpread { + logger := util.FromLogContext(ctx) injectStr, exist := pod.GetAnnotations()[wsutil.MatchedWorkloadSpreadSubsetAnnotations] if !exist { return nil @@ -439,14 +447,14 @@ func getInjectWorkloadSpreadFromPod(pod *corev1.Pod) *wsutil.InjectWorkloadSprea injectWS := &wsutil.InjectWorkloadSpread{} err := json.Unmarshal([]byte(injectStr), injectWS) if err != nil { - klog.ErrorS(err, "Failed to unmarshal JSON from Pod", "JSON", injectStr, "pod", klog.KObj(pod)) + logger.Error(err, "Failed to unmarshal JSON from Pod", "JSON", injectStr, "pod", klog.KObj(pod)) return nil } return injectWS } // groupVersionedPods will group pods by pod version and subset -func (r *ReconcileWorkloadSpread) groupVersionedPods(ws *appsv1alpha1.WorkloadSpread, allPods []*corev1.Pod, replicas int32) (map[string]map[string][]*corev1.Pod, map[string][]*corev1.Pod, error) { +func (r *ReconcileWorkloadSpread) groupVersionedPods(ctx context.Context, ws *appsv1alpha1.WorkloadSpread, allPods []*corev1.Pod, replicas int32) (map[string]map[string][]*corev1.Pod, map[string][]*corev1.Pod, error) { versionedPods := map[string][]*corev1.Pod{} for _, pod := range allPods { version := wsutil.GetPodVersion(pod) @@ -458,7 +466,7 @@ func (r *ReconcileWorkloadSpread) groupVersionedPods(ws *appsv1alpha1.WorkloadSp // group pods by version for version, pods := range versionedPods { // group pods by subset - podMap, err := r.groupPodBySubset(ws, pods, replicas) + podMap, err := r.groupPodBySubset(ctx, ws, pods, replicas) if err != nil { return nil, nil, err } @@ -471,7 +479,7 @@ func (r *ReconcileWorkloadSpread) groupVersionedPods(ws *appsv1alpha1.WorkloadSp } // groupPodBySubset returns a map, the key is the name of subset and the value represents the Pods of the corresponding subset. -func (r *ReconcileWorkloadSpread) groupPodBySubset(ws *appsv1alpha1.WorkloadSpread, pods []*corev1.Pod, replicas int32) (map[string][]*corev1.Pod, error) { +func (r *ReconcileWorkloadSpread) groupPodBySubset(ctx context.Context, ws *appsv1alpha1.WorkloadSpread, pods []*corev1.Pod, replicas int32) (map[string][]*corev1.Pod, error) { podMap := make(map[string][]*corev1.Pod, len(ws.Spec.Subsets)+1) podMap[FakeSubsetName] = []*corev1.Pod{} subsetMissingReplicas := make(map[string]int) @@ -483,7 +491,7 @@ func (r *ReconcileWorkloadSpread) groupPodBySubset(ws *appsv1alpha1.WorkloadSpre // count managed pods for each subset for i := range pods { - injectWS := getInjectWorkloadSpreadFromPod(pods[i]) + injectWS := getInjectWorkloadSpreadFromPod(ctx, pods[i]) if isNotMatchedWS(injectWS, ws) { continue } @@ -494,7 +502,7 @@ func (r *ReconcileWorkloadSpread) groupPodBySubset(ws *appsv1alpha1.WorkloadSpre } for i := range pods { - subsetName, err := r.getSuitableSubsetNameForPod(ws, pods[i], subsetMissingReplicas) + subsetName, err := r.getSuitableSubsetNameForPod(ctx, ws, pods[i], subsetMissingReplicas) if err != nil { return nil, err } @@ -511,11 +519,11 @@ func (r *ReconcileWorkloadSpread) groupPodBySubset(ws *appsv1alpha1.WorkloadSpre } // getSuitableSubsetNameForPod will return (FakeSubsetName, nil) if not found suitable subset for pod -func (r *ReconcileWorkloadSpread) getSuitableSubsetNameForPod(ws *appsv1alpha1.WorkloadSpread, pod *corev1.Pod, subsetMissingReplicas map[string]int) (string, error) { - injectWS := getInjectWorkloadSpreadFromPod(pod) +func (r *ReconcileWorkloadSpread) getSuitableSubsetNameForPod(ctx context.Context, ws *appsv1alpha1.WorkloadSpread, pod *corev1.Pod, subsetMissingReplicas map[string]int) (string, error) { + injectWS := getInjectWorkloadSpreadFromPod(ctx, pod) if isNotMatchedWS(injectWS, ws) { // process the pods that were created before workloadSpread - matchedSubset, err := r.getAndUpdateSuitableSubsetName(ws, pod, subsetMissingReplicas) + matchedSubset, err := r.getAndUpdateSuitableSubsetName(ctx, ws, pod, subsetMissingReplicas) if err != nil { return "", err } else if matchedSubset == nil { @@ -528,7 +536,8 @@ func (r *ReconcileWorkloadSpread) getSuitableSubsetNameForPod(ws *appsv1alpha1.W // getSuitableSubsetForOldPod returns a suitable subset for the pod which was created before workloadSpread. // getSuitableSubsetForOldPod will return (nil, nil) if there is no suitable subset for the pod. -func (r *ReconcileWorkloadSpread) getAndUpdateSuitableSubsetName(ws *appsv1alpha1.WorkloadSpread, pod *corev1.Pod, subsetMissingReplicas map[string]int) (*appsv1alpha1.WorkloadSpreadSubset, error) { +func (r *ReconcileWorkloadSpread) getAndUpdateSuitableSubsetName(ctx context.Context, ws *appsv1alpha1.WorkloadSpread, pod *corev1.Pod, subsetMissingReplicas map[string]int) (*appsv1alpha1.WorkloadSpreadSubset, error) { + logger := util.FromLogContext(ctx) if len(pod.Spec.NodeName) == 0 { return nil, nil } @@ -546,11 +555,11 @@ func (r *ReconcileWorkloadSpread) getAndUpdateSuitableSubsetName(ws *appsv1alpha for i := range ws.Spec.Subsets { subset := &ws.Spec.Subsets[i] // in case of that this pod was scheduled to the node which matches a subset of workloadSpread - matched, preferredScore, err := matchesSubset(pod, node, subset, subsetMissingReplicas[subset.Name]) + matched, preferredScore, err := matchesSubset(ctx, pod, node, subset, subsetMissingReplicas[subset.Name]) if err != nil { // requiredSelectorTerm field was validated at webhook stage, so this error should not occur // this error should not be returned, because it is a non-transient error - klog.ErrorS(err, "Unexpected error occurred when matching pod with subset, please check requiredSelectorTerm field of subset in WorkloadSpread", + logger.Error(err, "Unexpected error occurred when matching pod with subset, please check requiredSelectorTerm field of subset in WorkloadSpread", "pod", klog.KObj(pod), "subsetName", subset.Name, "workloadSpread", klog.KObj(ws)) } // select the most favorite subsets for the pod by subset.PreferredNodeSelectorTerms @@ -561,7 +570,7 @@ func (r *ReconcileWorkloadSpread) getAndUpdateSuitableSubsetName(ws *appsv1alpha } if favoriteSubset != nil { - if err := r.patchFavoriteSubsetMetadataToPod(pod, ws, favoriteSubset); err != nil { + if err := r.patchFavoriteSubsetMetadataToPod(ctx, pod, ws, favoriteSubset); err != nil { return nil, err } subsetMissingReplicas[favoriteSubset.Name]-- @@ -573,7 +582,8 @@ func (r *ReconcileWorkloadSpread) getAndUpdateSuitableSubsetName(ws *appsv1alpha // patchFavoriteSubsetMetadataToPod patch MatchedWorkloadSpreadSubsetAnnotations to the pod, // and select labels/annotations form favoriteSubset.patch, then patch them to the pod; -func (r *ReconcileWorkloadSpread) patchFavoriteSubsetMetadataToPod(pod *corev1.Pod, ws *appsv1alpha1.WorkloadSpread, favoriteSubset *appsv1alpha1.WorkloadSpreadSubset) error { +func (r *ReconcileWorkloadSpread) patchFavoriteSubsetMetadataToPod(ctx context.Context, pod *corev1.Pod, ws *appsv1alpha1.WorkloadSpread, favoriteSubset *appsv1alpha1.WorkloadSpreadSubset) error { + logger := util.FromLogContext(ctx) patchMetadata := make(map[string]interface{}) // decode favoriteSubset.patch.raw and add their labels and annotations to the patch if favoriteSubset.Patch.Raw != nil && !strings.EqualFold(ws.Annotations[IgnorePatchExistingPodsAnnotation], "true") { @@ -603,7 +613,7 @@ func (r *ReconcileWorkloadSpread) patchFavoriteSubsetMetadataToPod(pod *corev1.P }) if err := r.Patch(context.TODO(), pod, client.RawPatch(types.StrategicMergePatchType, patch)); err != nil { - klog.ErrorS(err, `Failed to patch "matched-workloadspread" annotation for pod`, + logger.Error(err, `Failed to patch "matched-workloadspread" annotation for pod`, "pod", klog.KObj(pod), "annotationValue", fmt.Sprintf("{Name: %s, Subset: %s}", ws.Name, favoriteSubset.Name)) return err } @@ -614,7 +624,7 @@ func (r *ReconcileWorkloadSpread) patchFavoriteSubsetMetadataToPod(pod *corev1.P // return two parameters // 1. current WorkloadSpreadStatus // 2. a map, the key is the subsetName, the value is the schedule failed Pods belongs to the subset. -func (r *ReconcileWorkloadSpread) calculateWorkloadSpreadStatus(ws *appsv1alpha1.WorkloadSpread, +func (r *ReconcileWorkloadSpread) calculateWorkloadSpreadStatus(ctx context.Context, ws *appsv1alpha1.WorkloadSpread, versionedPodMap map[string]map[string][]*corev1.Pod, subsetPodMap map[string][]*corev1.Pod, workloadReplicas int32) (*appsv1alpha1.WorkloadSpreadStatus, map[string][]*corev1.Pod) { status := appsv1alpha1.WorkloadSpreadStatus{} @@ -625,11 +635,11 @@ func (r *ReconcileWorkloadSpread) calculateWorkloadSpreadStatus(ws *appsv1alpha1 // overall subset statuses var scheduleFailedPodMap map[string][]*corev1.Pod - status.SubsetStatuses, scheduleFailedPodMap = r.calculateWorkloadSpreadSubsetStatuses(ws, ws.Status.SubsetStatuses, subsetPodMap, workloadReplicas) + status.SubsetStatuses, scheduleFailedPodMap = r.calculateWorkloadSpreadSubsetStatuses(ctx, ws, ws.Status.SubsetStatuses, subsetPodMap, workloadReplicas) // versioned subset statuses calculated by observed pods for version, podMap := range versionedPodMap { - status.VersionedSubsetStatuses[version], _ = r.calculateWorkloadSpreadSubsetStatuses(ws, ws.Status.VersionedSubsetStatuses[version], podMap, workloadReplicas) + status.VersionedSubsetStatuses[version], _ = r.calculateWorkloadSpreadSubsetStatuses(ctx, ws, ws.Status.VersionedSubsetStatuses[version], podMap, workloadReplicas) } // Consider this case: @@ -639,7 +649,7 @@ func (r *ReconcileWorkloadSpread) calculateWorkloadSpreadStatus(ws *appsv1alpha1 if _, exist := versionedPodMap[version]; exist { continue } - versionSubsetStatues, _ := r.calculateWorkloadSpreadSubsetStatuses(ws, ws.Status.VersionedSubsetStatuses[version], nil, workloadReplicas) + versionSubsetStatues, _ := r.calculateWorkloadSpreadSubsetStatuses(ctx, ws, ws.Status.VersionedSubsetStatuses[version], nil, workloadReplicas) if !isEmptySubsetStatuses(versionSubsetStatues) { status.VersionedSubsetStatuses[version] = versionSubsetStatues } @@ -658,7 +668,7 @@ func isEmptySubsetStatuses(statues []appsv1alpha1.WorkloadSpreadSubsetStatus) bo return replicas+creating+deleting == 0 } -func (r *ReconcileWorkloadSpread) calculateWorkloadSpreadSubsetStatuses(ws *appsv1alpha1.WorkloadSpread, +func (r *ReconcileWorkloadSpread) calculateWorkloadSpreadSubsetStatuses(ctx context.Context, ws *appsv1alpha1.WorkloadSpread, oldSubsetStatuses []appsv1alpha1.WorkloadSpreadSubsetStatus, podMap map[string][]*corev1.Pod, workloadReplicas int32, ) ([]appsv1alpha1.WorkloadSpreadSubsetStatus, map[string][]*corev1.Pod) { subsetStatuses := make([]appsv1alpha1.WorkloadSpreadSubsetStatus, len(ws.Spec.Subsets)) @@ -683,7 +693,7 @@ func (r *ReconcileWorkloadSpread) calculateWorkloadSpreadSubsetStatuses(ws *apps subset := &ws.Spec.Subsets[i] // calculate subset status - subsetStatus := r.calculateWorkloadSpreadSubsetStatus(ws, podMap[subset.Name], subset, + subsetStatus := r.calculateWorkloadSpreadSubsetStatus(ctx, ws, podMap[subset.Name], subset, oldSubsetStatusMap[subset.Name], workloadReplicas) if subsetStatus == nil { return nil, nil @@ -692,7 +702,7 @@ func (r *ReconcileWorkloadSpread) calculateWorkloadSpreadSubsetStatuses(ws *apps // don't reschedule the last subset. if rescheduleCriticalSeconds > 0 { if i != len(ws.Spec.Subsets)-1 { - pods := r.rescheduleSubset(ws, podMap[subset.Name], subsetStatus, oldSubsetStatusMap[subset.Name]) + pods := r.rescheduleSubset(ctx, ws, podMap[subset.Name], subsetStatus, oldSubsetStatusMap[subset.Name]) scheduleFailedPodMap[subset.Name] = pods } else { oldCondition := GetWorkloadSpreadSubsetCondition(oldSubsetStatusMap[subset.Name], appsv1alpha1.SubsetSchedulable) @@ -712,11 +722,12 @@ func (r *ReconcileWorkloadSpread) calculateWorkloadSpreadSubsetStatuses(ws *apps } // calculateWorkloadSpreadSubsetStatus returns the current subsetStatus for subset. -func (r *ReconcileWorkloadSpread) calculateWorkloadSpreadSubsetStatus(ws *appsv1alpha1.WorkloadSpread, +func (r *ReconcileWorkloadSpread) calculateWorkloadSpreadSubsetStatus(ctx context.Context, ws *appsv1alpha1.WorkloadSpread, pods []*corev1.Pod, subset *appsv1alpha1.WorkloadSpreadSubset, oldSubsetStatus *appsv1alpha1.WorkloadSpreadSubsetStatus, workloadReplicas int32) *appsv1alpha1.WorkloadSpreadSubsetStatus { + logger := util.FromLogContext(ctx) // current subsetStatus in this reconcile subsetStatus := &appsv1alpha1.WorkloadSpreadSubsetStatus{} subsetStatus.Name = subset.Name @@ -731,7 +742,7 @@ func (r *ReconcileWorkloadSpread) calculateWorkloadSpreadSubsetStatus(ws *appsv1 } else { subsetMaxReplicas, err = intstr.GetScaledValueFromIntOrPercent(subset.MaxReplicas, int(workloadReplicas), true) if err != nil || subsetMaxReplicas < 0 { - klog.ErrorS(err, "Failed to get maxReplicas value from subset of WorkloadSpread", "subsetName", subset.Name, "workloadSpread", klog.KObj(ws)) + logger.Error(err, "Failed to get maxReplicas value from subset of WorkloadSpread", "subsetName", subset.Name, "workloadSpread", klog.KObj(ws)) return nil } } @@ -753,7 +764,7 @@ func (r *ReconcileWorkloadSpread) calculateWorkloadSpreadSubsetStatus(ws *appsv1 for _, pod := range pods { // remove this Pod from creatingPods map because this Pod has been created. - injectWS := getInjectWorkloadSpreadFromPod(pod) + injectWS := getInjectWorkloadSpreadFromPod(ctx, pod) if injectWS != nil && injectWS.UID != "" { // Deployment or other native k8s workload has not generated the full pod.Name when webhook is mutating Pod. // So webhook generates a UID to identify Pod and restore it into the creatingPods map. The generated @@ -823,8 +834,9 @@ func (r *ReconcileWorkloadSpread) calculateWorkloadSpreadSubsetStatus(ws *appsv1 return subsetStatus } -func (r *ReconcileWorkloadSpread) UpdateWorkloadSpreadStatus(ws *appsv1alpha1.WorkloadSpread, +func (r *ReconcileWorkloadSpread) UpdateWorkloadSpreadStatus(ctx context.Context, ws *appsv1alpha1.WorkloadSpread, status *appsv1alpha1.WorkloadSpreadStatus) error { + logger := util.FromLogContext(ctx) if apiequality.Semantic.DeepEqual(status, ws.Status) { return nil } @@ -832,9 +844,9 @@ func (r *ReconcileWorkloadSpread) UpdateWorkloadSpreadStatus(ws *appsv1alpha1.Wo clone := ws.DeepCopy() clone.Status = *status - err := r.writeWorkloadSpreadStatus(clone) + err := r.writeWorkloadSpreadStatus(ctx, clone) if err == nil { - klog.V(3).InfoS(makeStatusChangedLog(ws, status), "workloadSpread", klog.KObj(ws)) + logger.V(3).Info(makeStatusChangedLog(ws, status), "workloadSpread", klog.KObj(ws)) } return err } @@ -887,7 +899,8 @@ func makeStatusChangedLog(ws *appsv1alpha1.WorkloadSpread, status *appsv1alpha1. return log } -func (r *ReconcileWorkloadSpread) writeWorkloadSpreadStatus(ws *appsv1alpha1.WorkloadSpread) error { +func (r *ReconcileWorkloadSpread) writeWorkloadSpreadStatus(ctx context.Context, ws *appsv1alpha1.WorkloadSpread) error { + logger := util.FromLogContext(ctx) unlock := util.GlobalKeyedMutex.Lock(string(ws.GetUID())) defer unlock() // If this update fails, don't retry it. Allow the failure to get handled & @@ -895,7 +908,7 @@ func (r *ReconcileWorkloadSpread) writeWorkloadSpreadStatus(ws *appsv1alpha1.Wor err := r.Status().Update(context.TODO(), ws) if err == nil { if cacheErr := util.GlobalCache.Add(ws); cacheErr != nil { - klog.ErrorS(cacheErr, "Failed to update WorkloadSpread cache after update status", "workloadSpread", klog.KObj(ws)) + logger.Error(cacheErr, "Failed to update WorkloadSpread cache after update status", "workloadSpread", klog.KObj(ws)) } } return err diff --git a/pkg/controller/workloadspread/workloadspread_controller_test.go b/pkg/controller/workloadspread/workloadspread_controller_test.go index 16879fcced..04c8e84ef8 100644 --- a/pkg/controller/workloadspread/workloadspread_controller_test.go +++ b/pkg/controller/workloadspread/workloadspread_controller_test.go @@ -725,7 +725,7 @@ func TestSubsetPodDeletionCost(t *testing.T) { recorder: record.NewFakeRecorder(10), } - err := r.syncSubsetPodDeletionCost(workloadSpread, &workloadSpread.Spec.Subsets[0], cs.subsetIndex, cs.getPods(), 5) + err := r.syncSubsetPodDeletionCost(context.Background(), workloadSpread, &workloadSpread.Spec.Subsets[0], cs.subsetIndex, cs.getPods(), 5) if err != nil { t.Fatalf("set pod deletion-cost annotation failed: %s", err.Error()) } @@ -1772,7 +1772,7 @@ func TestWorkloadSpreadReconcile(t *testing.T) { controllerFinder: &controllerfinder.ControllerFinder{Client: fakeClient}, } - err := reconciler.syncWorkloadSpread(workloadSpread) + err := reconciler.syncWorkloadSpread(context.Background(), workloadSpread) if err != nil { t.Fatalf("sync WorkloadSpread failed: %s", err.Error()) } @@ -1857,11 +1857,11 @@ func TestUpdateSubsetSequence(t *testing.T) { } r := ReconcileWorkloadSpread{} - versionedPodMap, subsetsPods, err := r.groupVersionedPods(workloadSpread, pods, 5) + versionedPodMap, subsetsPods, err := r.groupVersionedPods(context.Background(), workloadSpread, pods, 5) if err != nil { t.Fatalf("error group pods") } - status, _ := r.calculateWorkloadSpreadStatus(workloadSpread, versionedPodMap, subsetsPods, 5) + status, _ := r.calculateWorkloadSpreadStatus(context.Background(), workloadSpread, versionedPodMap, subsetsPods, 5) if status == nil { t.Fatalf("error get WorkloadSpread status") } else { diff --git a/pkg/controller/workloadspread/workloadspread_controller_utils.go b/pkg/controller/workloadspread/workloadspread_controller_utils.go index 831454f8fe..1a605bf9cf 100644 --- a/pkg/controller/workloadspread/workloadspread_controller_utils.go +++ b/pkg/controller/workloadspread/workloadspread_controller_utils.go @@ -17,10 +17,12 @@ limitations under the License. package workloadspread import ( + "context" "encoding/json" "reflect" appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" + "github.com/openkruise/kruise/pkg/util" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/strategicpatch" @@ -88,7 +90,7 @@ func filterOutCondition(conditions []appsv1alpha1.WorkloadSpreadSubsetCondition, return newConditions } -func matchesSubset(pod *corev1.Pod, node *corev1.Node, subset *appsv1alpha1.WorkloadSpreadSubset, missingReplicas int) (bool, int64, error) { +func matchesSubset(ctx context.Context, pod *corev1.Pod, node *corev1.Node, subset *appsv1alpha1.WorkloadSpreadSubset, missingReplicas int) (bool, int64, error) { // necessary condition matched, err := matchesSubsetRequiredAndToleration(pod, node, subset) if err != nil || !matched { @@ -105,7 +107,7 @@ func matchesSubset(pod *corev1.Pod, node *corev1.Node, subset *appsv1alpha1.Work // preferredPodScore is in [0, 1] preferredPodScore := int64(0) if subset.Patch.Raw != nil { - preferredPodScore = podPreferredScore(subset, pod) + preferredPodScore = podPreferredScore(ctx, subset, pod) } // we prefer the subset that still has room for more replicas @@ -119,17 +121,18 @@ func matchesSubset(pod *corev1.Pod, node *corev1.Node, subset *appsv1alpha1.Work return matched, preferredScore, nil } -func podPreferredScore(subset *appsv1alpha1.WorkloadSpreadSubset, pod *corev1.Pod) int64 { +func podPreferredScore(ctx context.Context, subset *appsv1alpha1.WorkloadSpreadSubset, pod *corev1.Pod) int64 { + logger := util.FromLogContext(ctx) podBytes, _ := json.Marshal(pod) modified, err := strategicpatch.StrategicMergePatch(podBytes, subset.Patch.Raw, &corev1.Pod{}) if err != nil { - klog.ErrorS(err, "Failed to merge patch raw for pod and subset", "pod", klog.KObj(pod), "subsetName", subset.Name) + logger.Error(err, "Failed to merge patch raw for pod and subset", "pod", klog.KObj(pod), "subsetName", subset.Name) return 0 } patchedPod := &corev1.Pod{} err = json.Unmarshal(modified, patchedPod) if err != nil { - klog.ErrorS(err, "Failed to unmarshal for pod and subset", "pod", klog.KObj(pod), "subsetName", subset.Name) + logger.Error(err, "Failed to unmarshal for pod and subset", "pod", klog.KObj(pod), "subsetName", subset.Name) return 0 } // TODO: consider json annotation just like `{"json_key": ["value1", "value2"]}`. diff --git a/pkg/controller/workloadspread/workloadspread_controller_utils_test.go b/pkg/controller/workloadspread/workloadspread_controller_utils_test.go index 966ec426fb..ae4e94717d 100644 --- a/pkg/controller/workloadspread/workloadspread_controller_utils_test.go +++ b/pkg/controller/workloadspread/workloadspread_controller_utils_test.go @@ -17,6 +17,7 @@ limitations under the License. package workloadspread import ( + "context" "fmt" "testing" @@ -227,7 +228,7 @@ func TestMatchSubset(t *testing.T) { subset := cs.getSubset() node := cs.getNode() pod := cs.getPod() - isMatch, score, err := matchesSubset(pod, node, subset, 0) + isMatch, score, err := matchesSubset(context.Background(), pod, node, subset, 0) if err != nil { t.Fatal("unexpected err occurred") } diff --git a/pkg/controller/workloadspread/workloadspread_event_handler.go b/pkg/controller/workloadspread/workloadspread_event_handler.go index 68293fd5ef..6497317a75 100644 --- a/pkg/controller/workloadspread/workloadspread_event_handler.go +++ b/pkg/controller/workloadspread/workloadspread_event_handler.go @@ -21,6 +21,7 @@ import ( "encoding/json" "reflect" + "github.com/openkruise/kruise/pkg/util" appsv1 "k8s.io/api/apps/v1" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" @@ -56,7 +57,7 @@ var _ handler.EventHandler = &podEventHandler{} type podEventHandler struct{} func (p *podEventHandler) Create(ctx context.Context, evt event.CreateEvent, q workqueue.RateLimitingInterface) { - p.handlePod(q, evt.Object, CreateEventAction) + p.handlePod(ctx, q, evt.Object, CreateEventAction) } func (p *podEventHandler) Update(ctx context.Context, evt event.UpdateEvent, q workqueue.RateLimitingInterface) { @@ -64,27 +65,28 @@ func (p *podEventHandler) Update(ctx context.Context, evt event.UpdateEvent, q w newPod := evt.ObjectNew.(*corev1.Pod) if kubecontroller.IsPodActive(oldPod) && !kubecontroller.IsPodActive(newPod) || wsutil.GetPodVersion(oldPod) != wsutil.GetPodVersion(newPod) { - p.handlePod(q, newPod, UpdateEventAction) + p.handlePod(ctx, q, newPod, UpdateEventAction) } } func (p *podEventHandler) Delete(ctx context.Context, evt event.DeleteEvent, q workqueue.RateLimitingInterface) { - p.handlePod(q, evt.Object, DeleteEventAction) + p.handlePod(ctx, q, evt.Object, DeleteEventAction) } func (p *podEventHandler) Generic(ctx context.Context, evt event.GenericEvent, q workqueue.RateLimitingInterface) { } -func (p *podEventHandler) handlePod(q workqueue.RateLimitingInterface, obj runtime.Object, action EventAction) { +func (p *podEventHandler) handlePod(ctx context.Context, q workqueue.RateLimitingInterface, obj runtime.Object, action EventAction) { + logger := util.FromLogContext(ctx) pod := obj.(*corev1.Pod) if value, exist := pod.GetAnnotations()[wsutil.MatchedWorkloadSpreadSubsetAnnotations]; exist { injectWorkloadSpread := &wsutil.InjectWorkloadSpread{} if err := json.Unmarshal([]byte(value), injectWorkloadSpread); err != nil { - klog.ErrorS(err, "Failed to unmarshal JSON to WorkloadSpread", "JSON", value) + logger.Error(err, "Failed to unmarshal JSON to WorkloadSpread", "JSON", value) return } nsn := types.NamespacedName{Namespace: pod.GetNamespace(), Name: injectWorkloadSpread.Name} - klog.V(5).InfoS("Handle Pod and reconcile WorkloadSpread", + logger.V(5).Info("Handle Pod and reconcile WorkloadSpread", "action", action, "pod", klog.KObj(pod), "workloadSpread", nsn) q.Add(reconcile.Request{NamespacedName: nsn}) } @@ -96,11 +98,12 @@ type workloadEventHandler struct { client.Reader } -func (w workloadEventHandler) Create(ctx context.Context, evt event.CreateEvent, q workqueue.RateLimitingInterface) { - w.handleWorkload(q, evt.Object, CreateEventAction) +func (w *workloadEventHandler) Create(ctx context.Context, evt event.CreateEvent, q workqueue.RateLimitingInterface) { + w.handleWorkload(ctx, q, evt.Object, CreateEventAction) } -func (w workloadEventHandler) Update(ctx context.Context, evt event.UpdateEvent, q workqueue.RateLimitingInterface) { +func (w *workloadEventHandler) Update(ctx context.Context, evt event.UpdateEvent, q workqueue.RateLimitingInterface) { + logger := util.FromLogContext(ctx) var gvk schema.GroupVersionKind var oldReplicas int32 var newReplicas int32 @@ -138,8 +141,8 @@ func (w workloadEventHandler) Update(ctx context.Context, evt event.UpdateEvent, newReplicas = *evt.ObjectNew.(*appsv1beta1.StatefulSet).Spec.Replicas gvk = controllerKruiseKindSts case *unstructured.Unstructured: - oldReplicas = wsutil.GetReplicasFromCustomWorkload(w.Reader, evt.ObjectOld.(*unstructured.Unstructured)) - newReplicas = wsutil.GetReplicasFromCustomWorkload(w.Reader, evt.ObjectNew.(*unstructured.Unstructured)) + oldReplicas = wsutil.GetReplicasFromCustomWorkload(ctx, w.Reader, evt.ObjectOld.(*unstructured.Unstructured)) + newReplicas = wsutil.GetReplicasFromCustomWorkload(ctx, w.Reader, evt.ObjectNew.(*unstructured.Unstructured)) gvk = evt.ObjectNew.(*unstructured.Unstructured).GroupVersionKind() default: return @@ -152,14 +155,14 @@ func (w workloadEventHandler) Update(ctx context.Context, evt event.UpdateEvent, Name: evt.ObjectNew.GetName(), } owner := metav1.GetControllerOfNoCopy(evt.ObjectNew) - ws, err := w.getWorkloadSpreadForWorkload(workloadNsn, gvk, owner) + ws, err := w.getWorkloadSpreadForWorkload(ctx, workloadNsn, gvk, owner) if err != nil { - klog.ErrorS(err, "Unable to get WorkloadSpread related with resource kind", + logger.Error(err, "Unable to get WorkloadSpread related with resource kind", "kind", gvk.Kind, "workload", workloadNsn) return } if ws != nil { - klog.V(3).InfoS("Workload changed replicas managed by WorkloadSpread", + logger.V(3).Info("Workload changed replicas managed by WorkloadSpread", "kind", gvk.Kind, "workload", workloadNsn, "oldReplicas", oldReplicas, "newReplicas", newReplicas, "workloadSpread", klog.KObj(ws)) nsn := types.NamespacedName{Namespace: ws.GetNamespace(), Name: ws.GetName()} q.Add(reconcile.Request{NamespacedName: nsn}) @@ -167,15 +170,16 @@ func (w workloadEventHandler) Update(ctx context.Context, evt event.UpdateEvent, } } -func (w workloadEventHandler) Delete(ctx context.Context, evt event.DeleteEvent, q workqueue.RateLimitingInterface) { - w.handleWorkload(q, evt.Object, DeleteEventAction) +func (w *workloadEventHandler) Delete(ctx context.Context, evt event.DeleteEvent, q workqueue.RateLimitingInterface) { + w.handleWorkload(ctx, q, evt.Object, DeleteEventAction) } -func (w workloadEventHandler) Generic(ctx context.Context, evt event.GenericEvent, q workqueue.RateLimitingInterface) { +func (w *workloadEventHandler) Generic(ctx context.Context, evt event.GenericEvent, q workqueue.RateLimitingInterface) { } -func (w *workloadEventHandler) handleWorkload(q workqueue.RateLimitingInterface, +func (w *workloadEventHandler) handleWorkload(ctx context.Context, q workqueue.RateLimitingInterface, obj client.Object, action EventAction) { + logger := util.FromLogContext(ctx) var gvk schema.GroupVersionKind switch obj.(type) { case *appsv1alpha1.CloneSet: @@ -199,27 +203,28 @@ func (w *workloadEventHandler) handleWorkload(q workqueue.RateLimitingInterface, Name: obj.GetName(), } owner := metav1.GetControllerOfNoCopy(obj) - ws, err := w.getWorkloadSpreadForWorkload(workloadNsn, gvk, owner) + ws, err := w.getWorkloadSpreadForWorkload(ctx, workloadNsn, gvk, owner) if err != nil { - klog.ErrorS(err, "Unable to get WorkloadSpread related with workload", + logger.Error(err, "Unable to get WorkloadSpread related with workload", "kind", gvk.Kind, "workload", workloadNsn) return } if ws != nil { - klog.V(5).InfoS("Handle workload and reconcile WorkloadSpread", + logger.V(5).Info("Handle workload and reconcile WorkloadSpread", "action", action, "kind", gvk.Kind, "workload", workloadNsn, "workloadSpread", klog.KObj(ws)) nsn := types.NamespacedName{Namespace: ws.GetNamespace(), Name: ws.GetName()} q.Add(reconcile.Request{NamespacedName: nsn}) } } -func (w *workloadEventHandler) getWorkloadSpreadForWorkload( +func (w *workloadEventHandler) getWorkloadSpreadForWorkload(ctx context.Context, workloadNamespaceName types.NamespacedName, gvk schema.GroupVersionKind, ownerRef *metav1.OwnerReference) (*appsv1alpha1.WorkloadSpread, error) { + logger := util.FromLogContext(ctx) wsList := &appsv1alpha1.WorkloadSpreadList{} listOptions := &client.ListOptions{Namespace: workloadNamespaceName.Namespace} if err := w.List(context.TODO(), wsList, listOptions); err != nil { - klog.ErrorS(err, "Failed to list WorkloadSpread", "namespace", workloadNamespaceName.Namespace) + logger.Error(err, "Failed to list WorkloadSpread", "namespace", workloadNamespaceName.Namespace) return nil, err } diff --git a/pkg/controller/workloadspread/workloadspread_event_handler_test.go b/pkg/controller/workloadspread/workloadspread_event_handler_test.go index f1461070db..dc18e9426a 100644 --- a/pkg/controller/workloadspread/workloadspread_event_handler_test.go +++ b/pkg/controller/workloadspread/workloadspread_event_handler_test.go @@ -380,7 +380,7 @@ func TestGetWorkloadSpreadForCloneSet(t *testing.T) { Name: cs.getCloneSet().Name, } handler := workloadEventHandler{Reader: fakeClient} - workloadSpread, _ := handler.getWorkloadSpreadForWorkload(nsn, controllerKruiseKindCS, nil) + workloadSpread, _ := handler.getWorkloadSpreadForWorkload(context.Background(), nsn, controllerKruiseKindCS, nil) expectTopology := cs.expectWorkloadSpread() if expectTopology == nil { @@ -505,7 +505,7 @@ func TestGetWorkloadSpreadForDeployment(t *testing.T) { Name: cs.getDeployment().Name, } handler := workloadEventHandler{Reader: fakeClient} - workloadSpread, _ := handler.getWorkloadSpreadForWorkload(nsn, controllerKindDep, nil) + workloadSpread, _ := handler.getWorkloadSpreadForWorkload(context.Background(), nsn, controllerKindDep, nil) expectTopology := cs.expectWorkloadSpread() if expectTopology == nil { @@ -607,7 +607,7 @@ func TestGetWorkloadSpreadForJob(t *testing.T) { Name: cs.getJob().Name, } handler := workloadEventHandler{Reader: fakeClient} - workloadSpread, _ := handler.getWorkloadSpreadForWorkload(nsn, controllerKindJob, nil) + workloadSpread, _ := handler.getWorkloadSpreadForWorkload(context.Background(), nsn, controllerKindJob, nil) expectTopology := cs.expectWorkloadSpread() if expectTopology == nil { @@ -732,7 +732,7 @@ func TestGetWorkloadSpreadForReplicaSet(t *testing.T) { Name: cs.getReplicaset().Name, } handler := workloadEventHandler{Reader: fakeClient} - workloadSpread, _ := handler.getWorkloadSpreadForWorkload(nsn, controllerKindRS, nil) + workloadSpread, _ := handler.getWorkloadSpreadForWorkload(context.Background(), nsn, controllerKindRS, nil) expectTopology := cs.expectWorkloadSpread() if expectTopology == nil { @@ -857,7 +857,7 @@ func TestGetWorkloadSpreadForStatefulSet(t *testing.T) { Name: cs.getStatefulSet().Name, } handler := workloadEventHandler{Reader: fakeClient} - workloadSpread, _ := handler.getWorkloadSpreadForWorkload(nsn, controllerKindSts, nil) + workloadSpread, _ := handler.getWorkloadSpreadForWorkload(context.Background(), nsn, controllerKindSts, nil) expectTopology := cs.expectWorkloadSpread() if expectTopology == nil { @@ -982,7 +982,7 @@ func TestGetWorkloadSpreadForAdvancedStatefulSet(t *testing.T) { Name: cs.getStatefulSet().Name, } handler := workloadEventHandler{Reader: fakeClient} - workloadSpread, _ := handler.getWorkloadSpreadForWorkload(nsn, controllerKruiseKindSts, nil) + workloadSpread, _ := handler.getWorkloadSpreadForWorkload(context.Background(), nsn, controllerKruiseKindSts, nil) expectTopology := cs.expectWorkloadSpread() if expectTopology == nil { diff --git a/pkg/util/log.go b/pkg/util/log.go new file mode 100644 index 0000000000..1a2d0c862e --- /dev/null +++ b/pkg/util/log.go @@ -0,0 +1,42 @@ +/* +Copyright 2025 The Kruise Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "context" + + "github.com/google/uuid" + "k8s.io/klog/v2" +) + +type loggerContextKey struct{} + +func NewLogContext(ctx context.Context) context.Context { + return context.WithValue(ctx, loggerContextKey{}, uuid.New().String()) +} + +func NewLogContextWithId(ctx context.Context, id string) context.Context { + return context.WithValue(ctx, loggerContextKey{}, id) +} + +func FromLogContext(ctx context.Context) klog.Logger { + value := ctx.Value(loggerContextKey{}) + if value == nil { + return klog.Background() + } + return klog.Background().WithValues("context", value) +} diff --git a/pkg/util/workloadspread/workloadspread.go b/pkg/util/workloadspread/workloadspread.go index ff5cf0d3f3..817209fc31 100644 --- a/pkg/util/workloadspread/workloadspread.go +++ b/pkg/util/workloadspread/workloadspread.go @@ -192,7 +192,7 @@ func matchReference(ref *metav1.OwnerReference) (bool, error) { // TODO consider pod/status update operation -func (h *Handler) HandlePodCreation(pod *corev1.Pod) (skip bool, err error) { +func (h *Handler) HandlePodCreation(ctx context.Context, pod *corev1.Pod) (skip bool, err error) { start := time.Now() // filter out pods, include the following: @@ -200,6 +200,7 @@ func (h *Handler) HandlePodCreation(pod *corev1.Pod) (skip bool, err error) { // 2. Pod.Status.Phase = Succeeded or Failed // 3. Pod.OwnerReference is nil // 4. Pod.OwnerReference is not one of workloads, such as CloneSet, Deployment, ReplicaSet. + logger := util.FromLogContext(ctx) if !kubecontroller.IsPodActive(pod) { return true, nil } @@ -224,9 +225,9 @@ func (h *Handler) HandlePodCreation(pod *corev1.Pod) (skip bool, err error) { continue } // determine if the reference of workloadSpread and pod is equal - referenceEqual, err := h.isReferenceEqual(ws.Spec.TargetReference, ref, pod.GetNamespace()) + referenceEqual, err := h.isReferenceEqual(ctx, ws.Spec.TargetReference, ref, pod.GetNamespace()) if err != nil { - klog.ErrorS(err, "failed to determine whether workloadspread refers pod's owner", + logger.Error(err, "failed to determine whether workloadspread refers pod's owner", "pod", klog.KObj(pod), "workloadspread", klog.KObj(&ws)) if errors.IsNotFound(err) { return true, err @@ -235,7 +236,7 @@ func (h *Handler) HandlePodCreation(pod *corev1.Pod) (skip bool, err error) { } selected, err := IsPodSelected(ws.Spec.TargetFilter, pod.GetLabels()) if err != nil { - klog.ErrorS(err, "failed to determine whether workloadspread selects pod", + logger.Error(err, "failed to determine whether workloadspread selects pod", "pod", klog.KObj(pod), "workloadspread", klog.KObj(&ws)) continue } @@ -251,16 +252,16 @@ func (h *Handler) HandlePodCreation(pod *corev1.Pod) (skip bool, err error) { } defer func() { - klog.V(3).InfoS("Cost of handling pod creation by WorkloadSpread", + logger.V(3).Info("Cost of handling pod creation by WorkloadSpread", "namespace", matchedWS.Namespace, "name", matchedWS.Name, "cost", time.Since(start)) }() - return false, h.mutatingPod(matchedWS, pod, nil, CreateOperation) + return false, h.mutatingPod(ctx, matchedWS, pod, nil, CreateOperation) } -func (h *Handler) HandlePodDeletion(pod *corev1.Pod, operation Operation) error { +func (h *Handler) HandlePodDeletion(ctx context.Context, pod *corev1.Pod, operation Operation) error { start := time.Now() - + logger := util.FromLogContext(ctx) var injectWS *InjectWorkloadSpread str, ok := pod.Annotations[MatchedWorkloadSpreadSubsetAnnotations] if !ok || str == "" { @@ -268,7 +269,7 @@ func (h *Handler) HandlePodDeletion(pod *corev1.Pod, operation Operation) error } err := json.Unmarshal([]byte(str), &injectWS) if err != nil { - klog.ErrorS(err, "parse Pod annotations failed", "namespace", pod.Namespace, "name", pod.Name, + logger.Error(err, "parse Pod annotations failed", "namespace", pod.Namespace, "name", pod.Name, "key", MatchedWorkloadSpreadSubsetAnnotations, "value", str) return nil } @@ -285,33 +286,34 @@ func (h *Handler) HandlePodDeletion(pod *corev1.Pod, operation Operation) error err = h.Client.Get(context.TODO(), client.ObjectKey{Namespace: pod.Namespace, Name: injectWS.Name}, matchedWS) if err != nil { if errors.IsNotFound(err) { - klog.InfoS("Pod matched WorkloadSpread Not Found", "namespace", pod.Namespace, "name", pod.Name, "workloadSpread", injectWS.Name) + logger.Info("Pod matched WorkloadSpread Not Found", "namespace", pod.Namespace, "name", pod.Name, "workloadSpread", injectWS.Name) return nil } - klog.ErrorS(err, "get pod matched workloadSpread failed", "namespace", pod.Namespace, "name", pod.Name, "workloadSpread", injectWS.Name) + logger.Error(err, "get pod matched workloadSpread failed", "namespace", pod.Namespace, "name", pod.Name, "workloadSpread", injectWS.Name) return err } defer func() { - klog.V(3).InfoS("Cost of handling pod deletion by WorkloadSpread", + logger.V(3).Info("Cost of handling pod deletion by WorkloadSpread", "namespace", matchedWS.Namespace, "name", matchedWS.Name, "cost", time.Since(start)) }() - return h.mutatingPod(matchedWS, pod, injectWS, operation) + return h.mutatingPod(ctx, matchedWS, pod, injectWS, operation) } -func (h *Handler) mutatingPod(matchedWS *appsv1alpha1.WorkloadSpread, +func (h *Handler) mutatingPod(ctx context.Context, matchedWS *appsv1alpha1.WorkloadSpread, pod *corev1.Pod, injectWS *InjectWorkloadSpread, operation Operation) error { + logger := util.FromLogContext(ctx) podName := pod.Name if podName == "" { podName = pod.GetGenerateName() } - klog.V(3).InfoS("Operation Pod matched WorkloadSpread", "operation", operation, "podNs", pod.Namespace, "podName", podName, "wsNs", matchedWS.Namespace, "wsName", matchedWS.Name) + logger.V(3).Info("Operation Pod matched WorkloadSpread", "operation", operation, "podNs", pod.Namespace, "podName", podName, "wsNs", matchedWS.Namespace, "wsName", matchedWS.Name) - suitableSubsetName, generatedUID, err := h.acquireSuitableSubset(matchedWS, pod, injectWS, operation) + suitableSubsetName, generatedUID, err := h.acquireSuitableSubset(ctx, matchedWS, pod, injectWS, operation) if err != nil { return err } @@ -319,25 +321,26 @@ func (h *Handler) mutatingPod(matchedWS *appsv1alpha1.WorkloadSpread, var injectErr error // if create pod, inject affinity、toleration、metadata in pod object if operation == CreateOperation && len(suitableSubsetName) > 0 { - if _, injectErr = injectWorkloadSpreadIntoPod(matchedWS, pod, suitableSubsetName, generatedUID); injectErr != nil { - klog.InfoS("failed to inject Pod subset data for WorkloadSpread", + if _, injectErr = injectWorkloadSpreadIntoPod(ctx, matchedWS, pod, suitableSubsetName, generatedUID); injectErr != nil { + logger.Info("failed to inject Pod subset data for WorkloadSpread", "podNs", pod.Namespace, "podName", podName, "suitableSubsetName", suitableSubsetName, "wsNs", matchedWS.Namespace, "wsName", matchedWS.Name) return injectErr } - klog.V(3).InfoS("inject Pod subset data for WorkloadSpread", + logger.V(3).Info("inject Pod subset data for WorkloadSpread", "podNs", pod.Namespace, "podName", podName, "suitableSubsetName", suitableSubsetName, "wsNs", matchedWS.Namespace, "wsName", matchedWS.Name) } - klog.V(3).InfoS("handler operation Pod generatedUID for WorkloadSpread done", + logger.V(3).Info("handler operation Pod generatedUID for WorkloadSpread done", "operation", operation, "podNs", pod.Namespace, "podName", podName, "generatedUID", generatedUID, "wsNs", matchedWS.Namespace, "wsName", matchedWS.Name) return injectErr } -func (h *Handler) acquireSuitableSubset(matchedWS *appsv1alpha1.WorkloadSpread, +func (h *Handler) acquireSuitableSubset(ctx context.Context, matchedWS *appsv1alpha1.WorkloadSpread, pod *corev1.Pod, injectWS *InjectWorkloadSpread, operation Operation) (string, string, error) { + logger := util.FromLogContext(ctx) if len(matchedWS.Spec.Subsets) == 1 && matchedWS.Spec.Subsets[0].MaxReplicas == nil { return matchedWS.Spec.Subsets[0].Name, "", nil @@ -394,7 +397,7 @@ func (h *Handler) acquireSuitableSubset(matchedWS *appsv1alpha1.WorkloadSpread, // 1. get two matchedWS, one is cached by this webhook process, // another is cached by informer, compare and get the newer one; // 2. if 1. failed, directly fetch matchedWS from APIServer; - wsClone, err = h.tryToGetTheLatestMatchedWS(matchedWS, refresh) + wsClone, err = h.tryToGetTheLatestMatchedWS(ctx, matchedWS, refresh) costOfGet += time.Since(startGet) if err != nil { return err @@ -405,7 +408,7 @@ func (h *Handler) acquireSuitableSubset(matchedWS *appsv1alpha1.WorkloadSpread, // check whether WorkloadSpread has suitable subset for the pod // 1. changed indicates whether workloadSpread status changed // 2. suitableSubset is matched subset for the pod - changed, suitableSubset, generatedUID, err = h.updateSubsetForPod(wsClone, pod, injectWS, operation) + changed, suitableSubset, generatedUID, err = h.updateSubsetForPod(ctx, wsClone, pod, injectWS, operation) if !changed || err != nil { return err } @@ -416,17 +419,17 @@ func (h *Handler) acquireSuitableSubset(matchedWS *appsv1alpha1.WorkloadSpread, refresh = true conflictTimes++ } else { - klog.V(3).InfoS("update WorkloadSpread success", + logger.V(3).Info("update WorkloadSpread success", "namespace", wsClone.Namespace, "name", wsClone.Name, "subsetStatus", suitableSubset.Name, "missingReplicas", suitableSubset.MissingReplicas, "creatingPods", len(suitableSubset.CreatingPods), "deletingPods", len(suitableSubset.DeletingPods)) if cacheErr := util.GlobalCache.Add(wsClone); cacheErr != nil { - klog.ErrorS(cacheErr, "Failed to update workloadSpread cache after update status", "namespace", wsClone.Namespace, "name", wsClone.Name) + logger.Error(cacheErr, "Failed to update workloadSpread cache after update status", "namespace", wsClone.Namespace, "name", wsClone.Name) } } costOfUpdate += time.Since(start) return err }); err != nil { - klog.ErrorS(err, "update WorkloadSpread error", "namespace", matchedWS.Namespace, "name", matchedWS.Name) + logger.Error(err, "update WorkloadSpread error", "namespace", matchedWS.Namespace, "name", matchedWS.Name) return "", "", err } } @@ -435,17 +438,17 @@ func (h *Handler) acquireSuitableSubset(matchedWS *appsv1alpha1.WorkloadSpread, suitableSubsetName = suitableSubset.Name } - klog.V(5).InfoS("Cost of assigning suitable subset of WorkloadSpread for pod", + logger.V(5).Info("Cost of assigning suitable subset of WorkloadSpread for pod", "namespace", matchedWS.Namespace, "name", matchedWS.Name, "conflictTimes", conflictTimes, "costOfGet", costOfGet, "costOfUpdate", costOfUpdate) return suitableSubsetName, generatedUID, nil } -func (h *Handler) tryToGetTheLatestMatchedWS(matchedWS *appsv1alpha1.WorkloadSpread, refresh bool) ( +func (h *Handler) tryToGetTheLatestMatchedWS(ctx context.Context, matchedWS *appsv1alpha1.WorkloadSpread, refresh bool) ( *appsv1alpha1.WorkloadSpread, error) { var err error var wsClone *appsv1alpha1.WorkloadSpread - + logger := util.FromLogContext(ctx) if refresh { // TODO: shall we set metav1.GetOptions{resourceVersion="0"} so that we get the cached object in apiServer memory instead of etcd? wsClone, err = kubeClient.GetGenericClient().KruiseClient.AppsV1alpha1(). @@ -454,13 +457,13 @@ func (h *Handler) tryToGetTheLatestMatchedWS(matchedWS *appsv1alpha1.WorkloadSpr if errors.IsNotFound(err) { return nil, nil } - klog.ErrorS(err, "error getting updated WorkloadSpread from APIServer", "namespace", matchedWS.Namespace, "name", matchedWS.Name) + logger.Error(err, "error getting updated WorkloadSpread from APIServer", "namespace", matchedWS.Namespace, "name", matchedWS.Name) return nil, err } } else { item, _, cacheErr := util.GlobalCache.Get(matchedWS) if cacheErr != nil { - klog.ErrorS(cacheErr, "Failed to get cached WorkloadSpread from GlobalCache", "namespace", matchedWS.Namespace, "name", matchedWS.Name) + logger.Error(cacheErr, "Failed to get cached WorkloadSpread from GlobalCache", "namespace", matchedWS.Namespace, "name", matchedWS.Name) } if localCachedWS, ok := item.(*appsv1alpha1.WorkloadSpread); ok { wsClone = localCachedWS.DeepCopy() @@ -489,7 +492,7 @@ func (h *Handler) tryToGetTheLatestMatchedWS(matchedWS *appsv1alpha1.WorkloadSpr // 1. changed(bool) indicates if workloadSpread.Status has changed // 2. suitableSubset(*struct{}) indicates which workloadSpread.Subset does this pod match // 3. generatedUID(types.UID) indicates which workloadSpread generate a UID for identifying Pod without a full name. -func (h *Handler) updateSubsetForPod(ws *appsv1alpha1.WorkloadSpread, +func (h *Handler) updateSubsetForPod(ctx context.Context, ws *appsv1alpha1.WorkloadSpread, pod *corev1.Pod, injectWS *InjectWorkloadSpread, operation Operation) ( bool, *appsv1alpha1.WorkloadSpreadSubsetStatus, string, error) { var suitableSubset *appsv1alpha1.WorkloadSpreadSubsetStatus @@ -497,10 +500,11 @@ func (h *Handler) updateSubsetForPod(ws *appsv1alpha1.WorkloadSpread, // We only care about the corresponding versioned subset status. var err error + logger := util.FromLogContext(ctx) version := GetPodVersion(pod) subsetStatuses := ws.Status.VersionedSubsetStatuses[version] if len(subsetStatuses) == 0 { - subsetStatuses, err = h.initializedSubsetStatuses(ws) + subsetStatuses, err = h.initializedSubsetStatuses(ctx, ws) if err != nil { return false, nil, "", err } @@ -521,7 +525,7 @@ func (h *Handler) updateSubsetForPod(ws *appsv1alpha1.WorkloadSpread, suitableSubset = h.getSuitableSubset(subsetStatuses) if suitableSubset == nil { - klog.InfoS("WorkloadSpread doesn't have a suitable subset for Pod when creating", + logger.Info("WorkloadSpread doesn't have a suitable subset for Pod when creating", "namespace", ws.Namespace, "wsName", ws.Name, "podName", pod.GetGenerateName()) return false, nil, "", nil } @@ -551,7 +555,7 @@ func (h *Handler) updateSubsetForPod(ws *appsv1alpha1.WorkloadSpread, suitableSubset = getSpecificSubset(subsetStatuses, injectWS.Subset) if suitableSubset == nil { - klog.V(5).InfoS("Pod matched WorkloadSpread not found Subset when deleting", + logger.V(5).Info("Pod matched WorkloadSpread not found Subset when deleting", "namespace", ws.Namespace, "podName", pod.Name, "wsName", ws.Name, "subset", injectWS.Subset) return false, nil, "", nil } @@ -594,7 +598,8 @@ func isPodRecordedInSubset(subsetStatuses []appsv1alpha1.WorkloadSpreadSubsetSta return false, nil } -func injectWorkloadSpreadIntoPod(ws *appsv1alpha1.WorkloadSpread, pod *corev1.Pod, subsetName string, generatedUID string) (bool, error) { +func injectWorkloadSpreadIntoPod(ctx context.Context, ws *appsv1alpha1.WorkloadSpread, pod *corev1.Pod, subsetName string, generatedUID string) (bool, error) { + logger := util.FromLogContext(ctx) var subset *appsv1alpha1.WorkloadSpreadSubset for _, object := range ws.Spec.Subsets { if subsetName == object.Name { @@ -640,12 +645,12 @@ func injectWorkloadSpreadIntoPod(ws *appsv1alpha1.WorkloadSpread, pod *corev1.Po cloneBytes, _ := json.Marshal(pod) modified, err := strategicpatch.StrategicMergePatch(cloneBytes, subset.Patch.Raw, &corev1.Pod{}) if err != nil { - klog.ErrorS(err, "failed to merge patch raw", "raw", subset.Patch.Raw) + logger.Error(err, "failed to merge patch raw", "raw", subset.Patch.Raw) return false, err } newPod := &corev1.Pod{} if err = json.Unmarshal(modified, newPod); err != nil { - klog.ErrorS(err, "failed to unmarshal to Pod", "pod", modified) + logger.Error(err, "failed to unmarshal to Pod", "pod", modified) return false, err } *pod = *newPod @@ -702,20 +707,21 @@ func (h *Handler) getSuitableSubset(subsetStatuses []appsv1alpha1.WorkloadSpread return nil } -func (h *Handler) isReferenceEqual(target *appsv1alpha1.TargetReference, owner *metav1.OwnerReference, namespace string) (bool, error) { +func (h *Handler) isReferenceEqual(ctx context.Context, target *appsv1alpha1.TargetReference, owner *metav1.OwnerReference, namespace string) (bool, error) { + logger := util.FromLogContext(ctx) if owner == nil { return false, nil } targetGv, err := schema.ParseGroupVersion(target.APIVersion) if err != nil { - klog.ErrorS(err, "parse TargetReference apiVersion failed", "apiVersion", target.APIVersion) + logger.Error(err, "parse TargetReference apiVersion failed", "apiVersion", target.APIVersion) return false, err } ownerGv, err := schema.ParseGroupVersion(owner.APIVersion) if err != nil { - klog.ErrorS(err, "parse OwnerReference apiVersion failed", "apiVersion", owner.APIVersion) + logger.Error(err, "parse OwnerReference apiVersion failed", "apiVersion", owner.APIVersion) return false, err } @@ -732,7 +738,7 @@ func (h *Handler) isReferenceEqual(target *appsv1alpha1.TargetReference, owner * return false, err } - return h.isReferenceEqual(target, metav1.GetControllerOfNoCopy(ownerObject), namespace) + return h.isReferenceEqual(ctx, target, metav1.GetControllerOfNoCopy(ownerObject), namespace) } // statefulPodRegex is a regular expression that extracts the parent StatefulSet and ordinal from the Name of a Pod @@ -804,9 +810,10 @@ func initializeWorkloadsInWhiteList(c client.Client) { workloadsInWhiteListInitialized = true } -func (h *Handler) initializedSubsetStatuses(ws *appsv1alpha1.WorkloadSpread) ([]appsv1alpha1.WorkloadSpreadSubsetStatus, error) { - replicas, err := h.getWorkloadReplicas(ws) - klog.V(5).InfoS("get workload replicas", "replicas", replicas, "err", err, "workloadSpread", klog.KObj(ws)) +func (h *Handler) initializedSubsetStatuses(ctx context.Context, ws *appsv1alpha1.WorkloadSpread) ([]appsv1alpha1.WorkloadSpreadSubsetStatus, error) { + logger := util.FromLogContext(ctx) + replicas, err := h.getWorkloadReplicas(ctx, ws) + logger.V(5).Info("get workload replicas", "replicas", replicas, "err", err, "workloadSpread", klog.KObj(ws)) if err != nil { return nil, err } @@ -825,7 +832,7 @@ func (h *Handler) initializedSubsetStatuses(ws *appsv1alpha1.WorkloadSpread) ([] return subsetStatuses, nil } -func (h *Handler) getWorkloadReplicas(ws *appsv1alpha1.WorkloadSpread) (int32, error) { +func (h *Handler) getWorkloadReplicas(ctx context.Context, ws *appsv1alpha1.WorkloadSpread) (int32, error) { if ws.Spec.TargetReference == nil || !hasPercentSubset(ws) { return 0, nil } @@ -857,7 +864,7 @@ func (h *Handler) getWorkloadReplicas(ws *appsv1alpha1.WorkloadSpread) (int32, e case *appsv1beta1.StatefulSet: return *o.Spec.Replicas, nil case *unstructured.Unstructured: - return GetReplicasFromCustomWorkload(h.Client, o), nil + return GetReplicasFromCustomWorkload(ctx, h.Client, o), nil } return 0, fmt.Errorf("got unexpected workload type for workloadspread %s/%s", ws.Namespace, ws.Name) } @@ -901,13 +908,14 @@ func GetReplicasFromObject(object *unstructured.Unstructured, replicasPath strin return int32(replicas), nil } -func GetReplicasFromCustomWorkload(reader client.Reader, object *unstructured.Unstructured) int32 { +func GetReplicasFromCustomWorkload(ctx context.Context, reader client.Reader, object *unstructured.Unstructured) int32 { if object == nil { return 0 } + logger := util.FromLogContext(ctx) whiteList, err := configuration.GetWSWatchCustomWorkloadWhiteList(reader) if err != nil { - klog.Error("Failed to get workloadSpread custom workload white list from kruise config map") + logger.Error(err, "Failed to get workloadSpread custom workload white list from kruise config map") return 0 } @@ -918,7 +926,7 @@ func GetReplicasFromCustomWorkload(reader client.Reader, object *unstructured.Un } replicas, err := GetReplicasFromObject(object, wl.ReplicasPath) if err != nil { - klog.ErrorS(err, "Failed to get replicas from custom workload", "gvk", gvk, "object", klog.KObj(object), "replicasPath", wl.ReplicasPath) + logger.Error(err, "Failed to get replicas from custom workload", "gvk", gvk, "object", klog.KObj(object), "replicasPath", wl.ReplicasPath) } return replicas } diff --git a/pkg/util/workloadspread/workloadspread_test.go b/pkg/util/workloadspread/workloadspread_test.go index 81399693a3..6a5da34451 100644 --- a/pkg/util/workloadspread/workloadspread_test.go +++ b/pkg/util/workloadspread/workloadspread_test.go @@ -474,7 +474,7 @@ func TestWorkloadSpreadCreatePodWithoutFullName(t *testing.T) { ws.Status.VersionedSubsetStatuses[VersionIgnored] = ws.Status.SubsetStatuses pod := podDemo.DeepCopy() pod.Name = "" - _, suitableSubset, generatedUID, _ := handler.updateSubsetForPod(ws, pod, nil, CreateOperation) + _, suitableSubset, generatedUID, _ := handler.updateSubsetForPod(context.Background(), ws, pod, nil, CreateOperation) if generatedUID == "" { t.Fatalf("generate id failed") } @@ -1135,11 +1135,11 @@ func TestWorkloadSpreadMutatingPod(t *testing.T) { var err error switch cs.getOperation() { case CreateOperation: - _, err = handler.HandlePodCreation(podIn) + _, err = handler.HandlePodCreation(context.Background(), podIn) case DeleteOperation: - err = handler.HandlePodDeletion(podIn, DeleteOperation) + err = handler.HandlePodDeletion(context.Background(), podIn, DeleteOperation) case EvictionOperation: - err = handler.HandlePodDeletion(podIn, EvictionOperation) + err = handler.HandlePodDeletion(context.Background(), podIn, EvictionOperation) } errHandler := cs.errorHandler if errHandler == nil { @@ -1153,6 +1153,8 @@ func TestWorkloadSpreadMutatingPod(t *testing.T) { if !reflect.DeepEqual(podInBy, expectPodBy) { t.Logf("actual annotations: %+v", podIn.Annotations) t.Logf("expect annotations: %+v", podExpect.Annotations) + t.Logf("actual json: %s", podInBy) + t.Logf("expect json: %s", expectPodBy) t.Fatalf("pod DeepEqual failed") } latestWS, err := getLatestWorkloadSpread(fakeClient, workloadSpreadIn) @@ -1293,7 +1295,7 @@ func TestGetWorkloadReplicas(t *testing.T) { ws.Spec.Subsets = append(ws.Spec.Subsets, appsv1alpha1.WorkloadSpreadSubset{ MaxReplicas: &percent, }) - replicas, err := h.getWorkloadReplicas(ws) + replicas, err := h.getWorkloadReplicas(context.Background(), ws) if cs.wantErr != (err != nil) { t.Fatalf("wantErr: %v, but got: %v", cs.wantErr, err) } @@ -1481,7 +1483,7 @@ func TestIsReferenceEqual(t *testing.T) { for _, cs := range cases { t.Run(cs.name, func(t *testing.T) { h := Handler{fake.NewClientBuilder().Build()} - ok, err := h.isReferenceEqual(cs.getTargetRef(), cs.getOwnerRef(), "") + ok, err := h.isReferenceEqual(context.Background(), cs.getTargetRef(), cs.getOwnerRef(), "") if ok != cs.expectEqual { t.Fatalf("isReferenceEqual failed") } @@ -1625,7 +1627,7 @@ func TestIsReferenceEqual2(t *testing.T) { handler := &Handler{Client: cli} workloadsInWhiteListInitialized = false initializeWorkloadsInWhiteList(cli) - result, _ := handler.isReferenceEqual(&ref, metav1.GetControllerOf(pod), pod.GetNamespace()) + result, _ := handler.isReferenceEqual(context.Background(), &ref, metav1.GetControllerOf(pod), pod.GetNamespace()) if result != cs.Expect { t.Fatalf("got unexpected result") } @@ -1890,7 +1892,7 @@ func TestInitializedSubsetStatuses(t *testing.T) { } spread := cs.spread() handler := &Handler{builder.Build()} - result, err := handler.initializedSubsetStatuses(spread) + result, err := handler.initializedSubsetStatuses(context.Background(), spread) if err != nil { t.Fatal(err.Error()) } diff --git a/pkg/webhook/pod/mutating/pod_create_update_handler.go b/pkg/webhook/pod/mutating/pod_create_update_handler.go index c9cd6d6270..ef31b894d5 100644 --- a/pkg/webhook/pod/mutating/pod_create_update_handler.go +++ b/pkg/webhook/pod/mutating/pod_create_update_handler.go @@ -21,6 +21,7 @@ import ( "encoding/json" "net/http" + "github.com/openkruise/kruise/pkg/util" corev1 "k8s.io/api/core/v1" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/webhook/admission" @@ -46,7 +47,7 @@ var _ admission.Handler = &PodCreateHandler{} // Handle handles admission requests. func (h *PodCreateHandler) Handle(ctx context.Context, req admission.Request) admission.Response { obj := &corev1.Pod{} - + ctx = util.NewLogContextWithId(ctx, string(req.UID)) err := h.Decoder.Decode(req, obj) if err != nil { return admission.Errored(http.StatusBadRequest, err) diff --git a/pkg/webhook/pod/mutating/workloadspread.go b/pkg/webhook/pod/mutating/workloadspread.go index 48b574d793..df79687b8e 100644 --- a/pkg/webhook/pod/mutating/workloadspread.go +++ b/pkg/webhook/pod/mutating/workloadspread.go @@ -19,16 +19,17 @@ package mutating import ( "context" + "github.com/openkruise/kruise/pkg/util" wsutil "github.com/openkruise/kruise/pkg/util/workloadspread" admissionv1 "k8s.io/api/admission/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apiserver/pkg/util/dryrun" - "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/webhook/admission" ) func (h *PodCreateHandler) workloadSpreadMutatingPod(ctx context.Context, req admission.Request, pod *corev1.Pod) (skip bool, err error) { + logger := util.FromLogContext(ctx) if len(req.AdmissionRequest.SubResource) > 0 || req.AdmissionRequest.Resource.Resource != "pods" { return true, nil @@ -47,10 +48,11 @@ func (h *PodCreateHandler) workloadSpreadMutatingPod(ctx context.Context, req ad // check dry run dryRun = dryrun.IsDryRun(options.DryRun) if dryRun { - klog.V(5).InfoS("Operation is a dry run, then admit", "operation", req.AdmissionRequest.Operation, "namespace", pod.Namespace, "podName", pod.Name) + logger.V(5).WithValues("operation", req.AdmissionRequest.Operation, "namespace", pod.Namespace, "podName", pod.Name). + Info("Operation is a dry run, then admit") return true, nil } - return workloadSpreadHandler.HandlePodCreation(pod) + return workloadSpreadHandler.HandlePodCreation(ctx, pod) default: return true, nil } diff --git a/pkg/webhook/pod/validating/workloadspread.go b/pkg/webhook/pod/validating/workloadspread.go index 217a239e43..55384f0890 100644 --- a/pkg/webhook/pod/validating/workloadspread.go +++ b/pkg/webhook/pod/validating/workloadspread.go @@ -19,12 +19,12 @@ package validating import ( "context" + "github.com/openkruise/kruise/pkg/util" admissionv1 "k8s.io/api/admission/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apiserver/pkg/util/dryrun" - "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/apis/policy" wsutil "github.com/openkruise/kruise/pkg/util/workloadspread" @@ -38,16 +38,19 @@ import ( // 3. err(error) func (p *PodCreateHandler) workloadSpreadValidatingPod(ctx context.Context, req admission.Request) (bool, string, error) { pod := &corev1.Pod{} + ctx = util.NewLogContext(ctx) + logger := util.FromLogContext(ctx) + var dryRun bool var err error workloadSpreadHandler := wsutil.NewWorkloadSpreadHandler(p.Client) - klog.V(6).InfoS("workloadSpread validate Operation", "operation", req.Operation, "namespace", req.Namespace, "name", req.Name) + logger.V(6).Info("workloadSpread validate Operation", "operation", req.Operation, "namespace", req.Namespace, "name", req.Name) switch req.AdmissionRequest.Operation { case admissionv1.Delete: if req.AdmissionRequest.SubResource != "" { - klog.V(6).InfoS("Pod AdmissionRequest operation(DELETE) subResource, then admit", "namespace", req.Namespace, "name", req.Name, "subResource", req.SubResource) + logger.V(6).Info("Pod AdmissionRequest operation(DELETE) subResource, then admit", "namespace", req.Namespace, "name", req.Name, "subResource", req.SubResource) return true, "", nil } @@ -64,18 +67,18 @@ func (p *PodCreateHandler) workloadSpreadValidatingPod(ctx context.Context, req } dryRun = dryrun.IsDryRun(deletion.DryRun) if dryRun { - klog.V(5).InfoS("Operation is a dry run, then admit", "operation", req.AdmissionRequest.Operation, "namespace", pod.Namespace, "name", pod.Name) + logger.V(5).Info("Operation is a dry run, then admit", "operation", req.AdmissionRequest.Operation, "namespace", pod.Namespace, "name", pod.Name) return true, "", err } - err = workloadSpreadHandler.HandlePodDeletion(pod, wsutil.DeleteOperation) + err = workloadSpreadHandler.HandlePodDeletion(ctx, pod, wsutil.DeleteOperation) if err != nil { return false, "", err } case admissionv1.Create: // ignore create operation other than subresource eviction if req.AdmissionRequest.SubResource != "eviction" { - klog.V(6).InfoS("Pod AdmissionRequest operation(CREATE) Resource and subResource, then admit", "namespace", req.Namespace, "name", req.Name, "resource", req.Resource, "subResource", req.SubResource) + logger.V(6).Info("Pod AdmissionRequest operation(CREATE) Resource and subResource, then admit", "namespace", req.Namespace, "name", req.Name, "resource", req.Resource, "subResource", req.SubResource) return true, "", nil } @@ -88,7 +91,7 @@ func (p *PodCreateHandler) workloadSpreadValidatingPod(ctx context.Context, req if eviction.DeleteOptions != nil { dryRun = dryrun.IsDryRun(eviction.DeleteOptions.DryRun) if dryRun { - klog.V(5).InfoS("Operation[Eviction] is a dry run, then admit", "namespace", req.AdmissionRequest.Namespace, "name", req.AdmissionRequest.Name) + logger.V(5).Info("Operation[Eviction] is a dry run, then admit", "namespace", req.AdmissionRequest.Namespace, "name", req.AdmissionRequest.Name) return true, "", nil } } @@ -102,7 +105,7 @@ func (p *PodCreateHandler) workloadSpreadValidatingPod(ctx context.Context, req return false, "", err } - err = workloadSpreadHandler.HandlePodDeletion(pod, wsutil.EvictionOperation) + err = workloadSpreadHandler.HandlePodDeletion(ctx, pod, wsutil.EvictionOperation) if err != nil { return false, "", err } diff --git a/pkg/webhook/workloadspread/validating/workloadspread_create_update_handler.go b/pkg/webhook/workloadspread/validating/workloadspread_create_update_handler.go index 08e53769f0..24013b0a97 100644 --- a/pkg/webhook/workloadspread/validating/workloadspread_create_update_handler.go +++ b/pkg/webhook/workloadspread/validating/workloadspread_create_update_handler.go @@ -21,6 +21,7 @@ import ( "fmt" "net/http" + "github.com/openkruise/kruise/pkg/util" admissionv1 "k8s.io/api/admission/v1" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/webhook/admission" @@ -46,6 +47,7 @@ var _ admission.Handler = &WorkloadSpreadCreateUpdateHandler{} // Handle handles admission requests. func (h *WorkloadSpreadCreateUpdateHandler) Handle(ctx context.Context, req admission.Request) admission.Response { + ctx = util.NewLogContextWithId(ctx, string(req.UID)) obj := &appsv1alpha1.WorkloadSpread{} oldObj := &appsv1alpha1.WorkloadSpread{} if !utilfeature.DefaultFeatureGate.Enabled(features.WorkloadSpread) { @@ -56,7 +58,7 @@ func (h *WorkloadSpreadCreateUpdateHandler) Handle(ctx context.Context, req admi if err := h.Decoder.Decode(req, obj); err != nil { return admission.Errored(http.StatusBadRequest, err) } - if allErrs := h.validatingWorkloadSpreadFn(obj); len(allErrs) > 0 { + if allErrs := h.validatingWorkloadSpreadFn(ctx, obj); len(allErrs) > 0 { return admission.Errored(http.StatusBadRequest, allErrs.ToAggregate()) } case admissionv1.Update: @@ -67,7 +69,7 @@ func (h *WorkloadSpreadCreateUpdateHandler) Handle(ctx context.Context, req admi return admission.Errored(http.StatusBadRequest, err) } - validationErrorList := h.validatingWorkloadSpreadFn(obj) + validationErrorList := h.validatingWorkloadSpreadFn(ctx, obj) updateErrorList := validateWorkloadSpreadUpdate(obj, oldObj) if allErrs := append(validationErrorList, updateErrorList...); len(allErrs) > 0 { return admission.Errored(http.StatusBadRequest, allErrs.ToAggregate()) diff --git a/pkg/webhook/workloadspread/validating/workloadspread_validation.go b/pkg/webhook/workloadspread/validating/workloadspread_validation.go index 374901e814..ad3b4a95e5 100644 --- a/pkg/webhook/workloadspread/validating/workloadspread_validation.go +++ b/pkg/webhook/workloadspread/validating/workloadspread_validation.go @@ -23,6 +23,7 @@ import ( "math" "time" + "github.com/openkruise/kruise/pkg/util" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/strategicpatch" @@ -35,7 +36,6 @@ import ( "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/validation/field" - "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/apis/core" corev1 "k8s.io/kubernetes/pkg/apis/core/v1" corevalidation "k8s.io/kubernetes/pkg/apis/core/validation" @@ -60,10 +60,11 @@ var ( controllerKruiseKindAlphaSts = appsv1alpha1.SchemeGroupVersion.WithKind("StatefulSet") ) -func verifyGroupKind(ref *appsv1alpha1.TargetReference, expectedKind string, expectedGroups []string) (bool, error) { +func verifyGroupKind(ctx context.Context, ref *appsv1alpha1.TargetReference, expectedKind string, expectedGroups []string) (bool, error) { + logger := util.FromLogContext(ctx) gv, err := schema.ParseGroupVersion(ref.APIVersion) if err != nil { - klog.ErrorS(err, "failed to parse GroupVersion for apiVersion", "apiVersion", ref.APIVersion) + logger.Error(err, "failed to parse GroupVersion for apiVersion", "apiVersion", ref.APIVersion) return false, err } @@ -80,9 +81,9 @@ func verifyGroupKind(ref *appsv1alpha1.TargetReference, expectedKind string, exp return false, nil } -func (h *WorkloadSpreadCreateUpdateHandler) validatingWorkloadSpreadFn(obj *appsv1alpha1.WorkloadSpread) field.ErrorList { +func (h *WorkloadSpreadCreateUpdateHandler) validatingWorkloadSpreadFn(ctx context.Context, obj *appsv1alpha1.WorkloadSpread) field.ErrorList { // validate ws.spec. - allErrs := validateWorkloadSpreadSpec(h, obj, field.NewPath("spec")) + allErrs := validateWorkloadSpreadSpec(ctx, h, obj, field.NewPath("spec")) // validate whether ws.spec.targetRef is in conflict with others. wsList := &appsv1alpha1.WorkloadSpreadList{} @@ -95,7 +96,7 @@ func (h *WorkloadSpreadCreateUpdateHandler) validatingWorkloadSpreadFn(obj *apps return allErrs } -func validateWorkloadSpreadSpec(h *WorkloadSpreadCreateUpdateHandler, obj *appsv1alpha1.WorkloadSpread, fldPath *field.Path) field.ErrorList { +func validateWorkloadSpreadSpec(ctx context.Context, h *WorkloadSpreadCreateUpdateHandler, obj *appsv1alpha1.WorkloadSpread, fldPath *field.Path) field.ErrorList { spec := &obj.Spec allErrs := field.ErrorList{} var workloadTemplate client.Object @@ -109,7 +110,7 @@ func validateWorkloadSpreadSpec(h *WorkloadSpreadCreateUpdateHandler, obj *appsv } else { switch spec.TargetReference.Kind { case controllerKruiseKindCS.Kind: - ok, err := verifyGroupKind(spec.TargetReference, controllerKruiseKindCS.Kind, []string{controllerKruiseKindCS.Group}) + ok, err := verifyGroupKind(ctx, spec.TargetReference, controllerKruiseKindCS.Kind, []string{controllerKruiseKindCS.Group}) if !ok || err != nil { allErrs = append(allErrs, field.Invalid(fldPath.Child("targetRef"), spec.TargetReference, "TargetReference is not valid for CloneSet.")) } else { @@ -119,7 +120,7 @@ func validateWorkloadSpreadSpec(h *WorkloadSpreadCreateUpdateHandler, obj *appsv } } case controllerKindDep.Kind: - ok, err := verifyGroupKind(spec.TargetReference, controllerKindDep.Kind, []string{controllerKindDep.Group}) + ok, err := verifyGroupKind(ctx, spec.TargetReference, controllerKindDep.Kind, []string{controllerKindDep.Group}) if !ok || err != nil { allErrs = append(allErrs, field.Invalid(fldPath.Child("targetRef"), spec.TargetReference, "TargetReference is not valid for Deployment.")) } else { @@ -129,7 +130,7 @@ func validateWorkloadSpreadSpec(h *WorkloadSpreadCreateUpdateHandler, obj *appsv } } case controllerKindRS.Kind: - ok, err := verifyGroupKind(spec.TargetReference, controllerKindRS.Kind, []string{controllerKindRS.Group}) + ok, err := verifyGroupKind(ctx, spec.TargetReference, controllerKindRS.Kind, []string{controllerKindRS.Group}) if !ok || err != nil { allErrs = append(allErrs, field.Invalid(fldPath.Child("targetRef"), spec.TargetReference, "TargetReference is not valid for ReplicaSet.")) } else { @@ -139,7 +140,7 @@ func validateWorkloadSpreadSpec(h *WorkloadSpreadCreateUpdateHandler, obj *appsv } } case controllerKindJob.Kind: - ok, err := verifyGroupKind(spec.TargetReference, controllerKindJob.Kind, []string{controllerKindJob.Group}) + ok, err := verifyGroupKind(ctx, spec.TargetReference, controllerKindJob.Kind, []string{controllerKindJob.Group}) if !ok || err != nil { allErrs = append(allErrs, field.Invalid(fldPath.Child("targetRef"), spec.TargetReference, "TargetReference is not valid for Job.")) } else { @@ -149,7 +150,7 @@ func validateWorkloadSpreadSpec(h *WorkloadSpreadCreateUpdateHandler, obj *appsv } } case controllerKindSts.Kind: - ok, err := verifyGroupKind(spec.TargetReference, controllerKindSts.Kind, []string{controllerKindSts.Group, controllerKruiseKindAlphaSts.Group, controllerKruiseKindBetaSts.Group}) + ok, err := verifyGroupKind(ctx, spec.TargetReference, controllerKindSts.Kind, []string{controllerKindSts.Group, controllerKruiseKindAlphaSts.Group, controllerKruiseKindBetaSts.Group}) if !ok || err != nil { allErrs = append(allErrs, field.Invalid(fldPath.Child("targetRef"), spec.TargetReference, "TargetReference is not valid for StatefulSet.")) } else { @@ -166,7 +167,7 @@ func validateWorkloadSpreadSpec(h *WorkloadSpreadCreateUpdateHandler, obj *appsv } matched := false for _, wl := range whiteList.Workloads { - if ok, _ := verifyGroupKind(spec.TargetReference, wl.Kind, []string{wl.Group}); ok { + if ok, _ := verifyGroupKind(ctx, spec.TargetReference, wl.Kind, []string{wl.Group}); ok { matched = true break } diff --git a/pkg/webhook/workloadspread/validating/workloadspread_validation_test.go b/pkg/webhook/workloadspread/validating/workloadspread_validation_test.go index 17a461a94e..73d6ee6cbe 100644 --- a/pkg/webhook/workloadspread/validating/workloadspread_validation_test.go +++ b/pkg/webhook/workloadspread/validating/workloadspread_validation_test.go @@ -16,6 +16,7 @@ limitations under the License. package validating import ( + "context" "strconv" "testing" @@ -463,7 +464,7 @@ func TestValidateWorkloadSpreadCreate(t *testing.T) { t.Run("success case "+strconv.Itoa(i), func(t *testing.T) { fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(workloadSpreadDemo).Build() handler.Client = fakeClient - if errs := handler.validatingWorkloadSpreadFn(&successCase); len(errs) != 0 { + if errs := handler.validatingWorkloadSpreadFn(context.Background(), &successCase); len(errs) != 0 { t.Errorf("expected success: %v", errs) } }) @@ -811,7 +812,7 @@ func TestValidateWorkloadSpreadCreate(t *testing.T) { t.Run(errorCase.name, func(t *testing.T) { fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(workloadSpreadDemo).Build() handler.Client = fakeClient - errs := handler.validatingWorkloadSpreadFn(errorCase.getWorkloadSpread()) + errs := handler.validatingWorkloadSpreadFn(context.Background(), errorCase.getWorkloadSpread()) if len(errs) == 0 { t.Errorf("expected failure for %s", errorCase.name) }