Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: Add context logging to WorkloadSpread #1879

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions pkg/controller/workloadspread/reschedule.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"time"

"github.com/openkruise/kruise/pkg/controller/util"
util2 "github.com/openkruise/kruise/pkg/util"
corev1 "k8s.io/api/core/v1"
"k8s.io/klog/v2"

Expand All @@ -36,9 +37,10 @@ import (
// should be recovered schedulable status to try scheduling Pods again.
// TODO optimize the unschedulable duration of subset.
// return one parameters - unschedulable Pods belongs to this subset.
func (r *ReconcileWorkloadSpread) rescheduleSubset(ws *appsv1alpha1.WorkloadSpread,
func (r *ReconcileWorkloadSpread) rescheduleSubset(ctx context.Context, ws *appsv1alpha1.WorkloadSpread,
pods []*corev1.Pod,
subsetStatus, oldSubsetStatus *appsv1alpha1.WorkloadSpreadSubsetStatus) []*corev1.Pod {
logger := util2.FromLogContext(ctx)
scheduleFailedPods := make([]*corev1.Pod, 0)
for i := range pods {
if PodUnscheduledTimeout(ws, pods[i]) {
Expand All @@ -47,7 +49,7 @@ func (r *ReconcileWorkloadSpread) rescheduleSubset(ws *appsv1alpha1.WorkloadSpre
}
unschedulable := len(scheduleFailedPods) > 0
if unschedulable {
klog.V(3).InfoS("Subset of WorkloadSpread is unschedulable", "subsetName", subsetStatus.Name, "workloadSpread", klog.KObj(ws))
logger.V(3).Info("Subset of WorkloadSpread is unschedulable", "subsetName", subsetStatus.Name, "workloadSpread", klog.KObj(ws))
}

oldCondition := GetWorkloadSpreadSubsetCondition(oldSubsetStatus, appsv1alpha1.SubsetSchedulable)
Expand Down Expand Up @@ -85,18 +87,19 @@ func (r *ReconcileWorkloadSpread) rescheduleSubset(ws *appsv1alpha1.WorkloadSpre
return scheduleFailedPods
}

func (r *ReconcileWorkloadSpread) cleanupUnscheduledPods(ws *appsv1alpha1.WorkloadSpread,
func (r *ReconcileWorkloadSpread) cleanupUnscheduledPods(ctx context.Context, ws *appsv1alpha1.WorkloadSpread,
scheduleFailedPodsMap map[string][]*corev1.Pod) error {
for subsetName, pods := range scheduleFailedPodsMap {
if err := r.deletePodsForSubset(ws, pods, subsetName); err != nil {
if err := r.deletePodsForSubset(ctx, ws, pods, subsetName); err != nil {
return err
}
}
return nil
}

func (r *ReconcileWorkloadSpread) deletePodsForSubset(ws *appsv1alpha1.WorkloadSpread,
func (r *ReconcileWorkloadSpread) deletePodsForSubset(ctx context.Context, ws *appsv1alpha1.WorkloadSpread,
pods []*corev1.Pod, subsetName string) error {
logger := util2.FromLogContext(ctx)
for _, pod := range pods {
if err := r.Client.Delete(context.TODO(), pod); err != nil {
r.recorder.Eventf(ws, corev1.EventTypeWarning,
Expand All @@ -105,7 +108,7 @@ func (r *ReconcileWorkloadSpread) deletePodsForSubset(ws *appsv1alpha1.WorkloadS
pod.Namespace, pod.Name, subsetName, ws.Namespace, ws.Name)
return err
}
klog.V(3).InfoS("WorkloadSpread deleted unschedulabe Pod in Subset successfully", "workloadSpread", klog.KObj(ws), "pod", klog.KObj(pod), "subsetName", subsetName)
logger.V(3).Info("WorkloadSpread deleted unschedulabe Pod in Subset successfully", "workloadSpread", klog.KObj(ws), "pod", klog.KObj(pod), "subsetName", subsetName)
}
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/workloadspread/reschedule_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ func TestRescheduleSubset(t *testing.T) {
},
}

err := reconciler.syncWorkloadSpread(workloadSpread)
err := reconciler.syncWorkloadSpread(context.Background(), workloadSpread)
if err != nil {
t.Fatalf("sync WorkloadSpread failed: %s", err.Error())
}
Expand Down
32 changes: 18 additions & 14 deletions pkg/controller/workloadspread/update_pod_deletion_cost.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
"sort"
"strconv"

"github.com/openkruise/kruise/pkg/util"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -56,33 +57,34 @@
return wsutil.GetWorkloadVersion(r.Client, object)
}

func (r *ReconcileWorkloadSpread) updateDeletionCost(ws *appsv1alpha1.WorkloadSpread,
func (r *ReconcileWorkloadSpread) updateDeletionCost(ctx context.Context, ws *appsv1alpha1.WorkloadSpread,
versionedPodMap map[string]map[string][]*corev1.Pod,
workloadReplicas int32) error {
logger := util.FromLogContext(ctx)
targetRef := ws.Spec.TargetReference
if targetRef == nil || !isEffectiveKindForDeletionCost(targetRef) {
return nil
}

latestVersion, err := r.getWorkloadLatestVersion(ws)
if err != nil {
klog.ErrorS(err, "Failed to get the latest version for workload in workloadSpread", "workloadSpread", klog.KObj(ws))
logger.Error(err, "Failed to get the latest version for workload in workloadSpread", "workloadSpread", klog.KObj(ws))

Check warning on line 71 in pkg/controller/workloadspread/update_pod_deletion_cost.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/workloadspread/update_pod_deletion_cost.go#L71

Added line #L71 was not covered by tests
return err
}

// To try our best to keep the distribution of workload description during workload rolling:
// - to the latest version, we hope to scale down the last subset preferentially;
// - to other old versions, we hope to scale down the first subset preferentially;
for version, podMap := range versionedPodMap {
err = r.updateDeletionCostBySubset(ws, podMap, workloadReplicas, version != latestVersion)
err = r.updateDeletionCostBySubset(ctx, ws, podMap, workloadReplicas, version != latestVersion)
if err != nil {
return err
}
}
return nil
}

func (r *ReconcileWorkloadSpread) updateDeletionCostBySubset(ws *appsv1alpha1.WorkloadSpread,
func (r *ReconcileWorkloadSpread) updateDeletionCostBySubset(ctx context.Context, ws *appsv1alpha1.WorkloadSpread,
podMap map[string][]*corev1.Pod, workloadReplicas int32, reverseOrder bool) error {
subsetNum := len(ws.Spec.Subsets)
subsetIndex := func(index int) int {
Expand All @@ -93,14 +95,14 @@
}
// update Pod's deletion-cost annotation in each subset
for idx, subset := range ws.Spec.Subsets {
if err := r.syncSubsetPodDeletionCost(ws, &subset, subsetIndex(idx), podMap[subset.Name], workloadReplicas); err != nil {
if err := r.syncSubsetPodDeletionCost(ctx, ws, &subset, subsetIndex(idx), podMap[subset.Name], workloadReplicas); err != nil {
return err
}
}
// update the deletion-cost annotation for such pods that do not match any real subsets.
// these pods will have the minimum deletion-cost, and will be deleted preferentially.
if len(podMap[FakeSubsetName]) > 0 {
if err := r.syncSubsetPodDeletionCost(ws, nil, len(ws.Spec.Subsets), podMap[FakeSubsetName], workloadReplicas); err != nil {
if err := r.syncSubsetPodDeletionCost(ctx, ws, nil, len(ws.Spec.Subsets), podMap[FakeSubsetName], workloadReplicas); err != nil {
return err
}
}
Expand All @@ -123,12 +125,13 @@
// maxReplicas 10 10 nil
// pods number 20 20 20
// deletion-cost (300,-100) (200,-200) 100
func (r *ReconcileWorkloadSpread) syncSubsetPodDeletionCost(
func (r *ReconcileWorkloadSpread) syncSubsetPodDeletionCost(ctx context.Context,
ws *appsv1alpha1.WorkloadSpread,
subset *appsv1alpha1.WorkloadSpreadSubset,
subsetIndex int,
pods []*corev1.Pod,
workloadReplicas int32) error {
logger := util.FromLogContext(ctx)
var err error
// slice that will contain all Pods that want to set deletion-cost a positive value.
var positivePods []*corev1.Pod
Expand All @@ -154,7 +157,7 @@
} else {
subsetMaxReplicas, err := intstr.GetValueFromIntOrPercent(subset.MaxReplicas, int(workloadReplicas), true)
if err != nil || subsetMaxReplicas < 0 {
klog.ErrorS(err, "Failed to get maxReplicas value from subset of WorkloadSpread", "subsetName", subset.Name, "workloadSpread", klog.KObj(ws))
logger.Error(err, "Failed to get maxReplicas value from subset of WorkloadSpread", "subsetName", subset.Name, "workloadSpread", klog.KObj(ws))

Check warning on line 160 in pkg/controller/workloadspread/update_pod_deletion_cost.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/workloadspread/update_pod_deletion_cost.go#L160

Added line #L160 was not covered by tests
return nil
}

Expand All @@ -180,17 +183,17 @@
}
}

err = r.updateDeletionCostForSubsetPods(ws, subset, positivePods, strconv.Itoa(wsutil.PodDeletionCostPositive*(len(ws.Spec.Subsets)-subsetIndex)))
err = r.updateDeletionCostForSubsetPods(ctx, ws, subset, positivePods, strconv.Itoa(wsutil.PodDeletionCostPositive*(len(ws.Spec.Subsets)-subsetIndex)))
if err != nil {
return err
}
return r.updateDeletionCostForSubsetPods(ws, subset, negativePods, strconv.Itoa(wsutil.PodDeletionCostNegative*(subsetIndex+1)))
return r.updateDeletionCostForSubsetPods(ctx, ws, subset, negativePods, strconv.Itoa(wsutil.PodDeletionCostNegative*(subsetIndex+1)))
}

func (r *ReconcileWorkloadSpread) updateDeletionCostForSubsetPods(ws *appsv1alpha1.WorkloadSpread,
func (r *ReconcileWorkloadSpread) updateDeletionCostForSubsetPods(ctx context.Context, ws *appsv1alpha1.WorkloadSpread,
subset *appsv1alpha1.WorkloadSpreadSubset, pods []*corev1.Pod, deletionCostStr string) error {
for _, pod := range pods {
if err := r.patchPodDeletionCost(ws, pod, deletionCostStr); err != nil {
if err := r.patchPodDeletionCost(ctx, ws, pod, deletionCostStr); err != nil {
subsetName := FakeSubsetName
if subset != nil {
subsetName = subset.Name
Expand All @@ -205,8 +208,9 @@
return nil
}

func (r *ReconcileWorkloadSpread) patchPodDeletionCost(ws *appsv1alpha1.WorkloadSpread,
func (r *ReconcileWorkloadSpread) patchPodDeletionCost(ctx context.Context, ws *appsv1alpha1.WorkloadSpread,
pod *corev1.Pod, deletionCostStr string) error {
logger := util.FromLogContext(ctx)
clone := pod.DeepCopy()
annotationKey := wsutil.PodDeletionCostAnnotation
annotationValue := deletionCostStr
Expand All @@ -226,7 +230,7 @@
if err := r.Patch(context.TODO(), clone, client.RawPatch(types.StrategicMergePatchType, []byte(body))); err != nil {
return err
}
klog.V(3).InfoS("WorkloadSpread patched deletion-cost annotation for Pod successfully", "workloadSpread", klog.KObj(ws), "deletionCost", deletionCostStr, "pod", klog.KObj(pod))
logger.V(3).Info("WorkloadSpread patched deletion-cost annotation for Pod successfully", "workloadSpread", klog.KObj(ws), "deletionCost", deletionCostStr, "pod", klog.KObj(pod))
return nil
}

Expand Down
Loading
Loading