From 7eb983015093f8faa6b7ba06359730c2cabd8533 Mon Sep 17 00:00:00 2001 From: Tushar Mohapatra <137442734+TusharMohapatra07@users.noreply.github.com> Date: Wed, 1 Jan 2025 02:38:12 +0530 Subject: [PATCH] Replace `time.Now()` with `clock.Clock.Now()` in pkg/workload/workload.go (#3870) * add clock argument to UpdateStatus * use realClock in integration tests * add clock argument to UpdateRequeState and use it * fix lint errors * use realClock instead of fakeClock in util.go --- pkg/controller/jobframework/reconciler.go | 8 ++--- pkg/scheduler/scheduler.go | 2 +- pkg/workload/workload.go | 11 +++--- pkg/workload/workload_test.go | 5 ++- .../jobs/job/job_controller_test.go | 2 ++ .../jobs/jobset/jobset_controller_test.go | 3 +- .../jobs/pod/pod_controller_test.go | 3 +- .../scheduler/runner/controller/controller.go | 34 +++++++++++++++++-- test/util/util.go | 3 +- 9 files changed, 55 insertions(+), 16 deletions(-) diff --git a/pkg/controller/jobframework/reconciler.go b/pkg/controller/jobframework/reconciler.go index 152bcdef2b..2e6f0cfb91 100644 --- a/pkg/controller/jobframework/reconciler.go +++ b/pkg/controller/jobframework/reconciler.go @@ -403,7 +403,7 @@ func (r *JobReconciler) ReconcileGenericJob(ctx context.Context, req ctrl.Reques if !success { reason = kueue.WorkloadFinishedReasonFailed } - err := workload.UpdateStatus(ctx, r.client, wl, kueue.WorkloadFinished, metav1.ConditionTrue, reason, message, constants.JobControllerName) + err := workload.UpdateStatus(ctx, r.client, wl, kueue.WorkloadFinished, metav1.ConditionTrue, reason, message, constants.JobControllerName, r.clock) if err != nil && !apierrors.IsNotFound(err) { return ctrl.Result{}, err } @@ -465,7 +465,7 @@ func (r *JobReconciler) ReconcileGenericJob(ctx context.Context, req ctrl.Reques if !apimeta.IsStatusConditionPresentAndEqual(wl.Status.Conditions, condition.Type, condition.Status) { log.V(3).Info(fmt.Sprintf("Updating the PodsReady condition with status: %v", condition.Status)) apimeta.SetStatusCondition(&wl.Status.Conditions, condition) - err := workload.UpdateStatus(ctx, r.client, wl, condition.Type, condition.Status, condition.Reason, condition.Message, constants.JobControllerName) + err := workload.UpdateStatus(ctx, r.client, wl, condition.Type, condition.Status, condition.Reason, condition.Message, constants.JobControllerName, r.clock) if err != nil { log.Error(err, "Updating workload status") } @@ -504,7 +504,7 @@ func (r *JobReconciler) ReconcileGenericJob(ctx context.Context, req ctrl.Reques log.Error(err, "Unsuspending job") if podset.IsPermanent(err) { // Mark the workload as finished with failure since the is no point to retry. - errUpdateStatus := workload.UpdateStatus(ctx, r.client, wl, kueue.WorkloadFinished, metav1.ConditionTrue, FailedToStartFinishedReason, err.Error(), constants.JobControllerName) + errUpdateStatus := workload.UpdateStatus(ctx, r.client, wl, kueue.WorkloadFinished, metav1.ConditionTrue, FailedToStartFinishedReason, err.Error(), constants.JobControllerName, r.clock) if errUpdateStatus != nil { log.Error(errUpdateStatus, "Updating workload status, on start failure", "err", err) } @@ -756,7 +756,7 @@ func (r *JobReconciler) ensurePrebuiltWorkloadInSync(ctx context.Context, wl *ku metav1.ConditionTrue, kueue.WorkloadFinishedReasonOutOfSync, "The prebuilt workload is out of sync with its user job", - constants.JobControllerName) + constants.JobControllerName, r.clock) return false, err } return true, nil diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index e39fe18ea8..41c7d45cbb 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -531,7 +531,7 @@ func (s *Scheduler) admit(ctx context.Context, e *entry, cq *cache.ClusterQueueS PodSetAssignments: e.assignment.ToAPI(), } - workload.SetQuotaReservation(newWorkload, admission) + workload.SetQuotaReservation(newWorkload, admission, s.clock) if workload.HasAllChecks(newWorkload, workload.AdmissionChecksForWorkload(log, newWorkload, cq.AdmissionChecks)) { // sync Admitted, ignore the result since an API update is always done. _ = workload.SyncAdmittedCondition(newWorkload, s.clock.Now()) diff --git a/pkg/workload/workload.go b/pkg/workload/workload.go index f0aaf6d706..8523536629 100644 --- a/pkg/workload/workload.go +++ b/pkg/workload/workload.go @@ -453,8 +453,9 @@ func UpdateStatus(ctx context.Context, conditionType string, conditionStatus metav1.ConditionStatus, reason, message string, - managerPrefix string) error { - now := metav1.Now() + managerPrefix string, + clock clock.Clock) error { + now := metav1.NewTime(clock.Now()) condition := metav1.Condition{ Type: conditionType, Status: conditionStatus, @@ -568,7 +569,7 @@ func BaseSSAWorkload(w *kueue.Workload) *kueue.Workload { // SetQuotaReservation applies the provided admission to the workload. // The WorkloadAdmitted and WorkloadEvicted are added or updated if necessary. -func SetQuotaReservation(w *kueue.Workload, admission *kueue.Admission) { +func SetQuotaReservation(w *kueue.Workload, admission *kueue.Admission, clock clock.Clock) { w.Status.Admission = admission message := fmt.Sprintf("Quota reserved in ClusterQueue %s", w.Status.Admission.ClusterQueue) admittedCond := metav1.Condition{ @@ -585,14 +586,14 @@ func SetQuotaReservation(w *kueue.Workload, admission *kueue.Admission) { evictedCond.Status = metav1.ConditionFalse evictedCond.Reason = "QuotaReserved" evictedCond.Message = api.TruncateConditionMessage("Previously: " + evictedCond.Message) - evictedCond.LastTransitionTime = metav1.Now() + evictedCond.LastTransitionTime = metav1.NewTime(clock.Now()) } // reset Preempted condition if present. if preemptedCond := apimeta.FindStatusCondition(w.Status.Conditions, kueue.WorkloadPreempted); preemptedCond != nil { preemptedCond.Status = metav1.ConditionFalse preemptedCond.Reason = "QuotaReserved" preemptedCond.Message = api.TruncateConditionMessage("Previously: " + preemptedCond.Message) - preemptedCond.LastTransitionTime = metav1.Now() + preemptedCond.LastTransitionTime = metav1.NewTime(clock.Now()) } } diff --git a/pkg/workload/workload_test.go b/pkg/workload/workload_test.go index 0c2b298995..e4395670cf 100644 --- a/pkg/workload/workload_test.go +++ b/pkg/workload/workload_test.go @@ -27,6 +27,7 @@ import ( "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" + testingclock "k8s.io/utils/clock/testing" "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/client" @@ -364,6 +365,8 @@ func TestNewInfo(t *testing.T) { } func TestUpdateWorkloadStatus(t *testing.T) { + now := time.Now() + fakeClock := testingclock.NewFakeClock(now) cases := map[string]struct { oldStatus kueue.WorkloadStatus condType string @@ -421,7 +424,7 @@ func TestUpdateWorkloadStatus(t *testing.T) { workload.Status = tc.oldStatus cl := utiltesting.NewFakeClientSSAAsSM(workload) ctx := context.Background() - err := UpdateStatus(ctx, cl, workload, tc.condType, tc.condStatus, tc.reason, tc.message, "manager-prefix") + err := UpdateStatus(ctx, cl, workload, tc.condType, tc.condStatus, tc.reason, tc.message, "manager-prefix", fakeClock) if err != nil { t.Fatalf("Failed updating status: %v", err) } diff --git a/test/integration/controller/jobs/job/job_controller_test.go b/test/integration/controller/jobs/job/job_controller_test.go index 62107a5ed1..922df3cb98 100644 --- a/test/integration/controller/jobs/job/job_controller_test.go +++ b/test/integration/controller/jobs/job/job_controller_test.go @@ -29,6 +29,7 @@ import ( "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/utils/clock" "k8s.io/utils/ptr" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -622,6 +623,7 @@ var _ = ginkgo.Describe("Job controller", ginkgo.Ordered, ginkgo.ContinueOnFailu kueue.WorkloadEvicted, metav1.ConditionTrue, kueue.WorkloadEvictedByPreemption, "By test", "evict", + clock.RealClock{}, )).Should(gomega.Succeed()) }, util.Timeout, util.Interval).Should(gomega.Succeed()) }) diff --git a/test/integration/controller/jobs/jobset/jobset_controller_test.go b/test/integration/controller/jobs/jobset/jobset_controller_test.go index 4e028e57f3..c235f4f613 100644 --- a/test/integration/controller/jobs/jobset/jobset_controller_test.go +++ b/test/integration/controller/jobs/jobset/jobset_controller_test.go @@ -28,6 +28,7 @@ import ( "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/utils/clock" "k8s.io/utils/ptr" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -361,7 +362,7 @@ var _ = ginkgo.Describe("JobSet controller", ginkgo.Ordered, ginkgo.ContinueOnFa ginkgo.By("preempt the workload", func() { gomega.Eventually(func(g gomega.Gomega) { g.Expect(k8sClient.Get(ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed()) - g.Expect(workload.UpdateStatus(ctx, k8sClient, createdWorkload, kueue.WorkloadEvicted, metav1.ConditionTrue, kueue.WorkloadEvictedByPreemption, "By test", "evict")).To(gomega.Succeed()) + g.Expect(workload.UpdateStatus(ctx, k8sClient, createdWorkload, kueue.WorkloadEvicted, metav1.ConditionTrue, kueue.WorkloadEvictedByPreemption, "By test", "evict", clock.RealClock{})).To(gomega.Succeed()) }, util.Timeout, util.Interval).Should(gomega.Succeed()) }) diff --git a/test/integration/controller/jobs/pod/pod_controller_test.go b/test/integration/controller/jobs/pod/pod_controller_test.go index 0dfa578df0..0518eae093 100644 --- a/test/integration/controller/jobs/pod/pod_controller_test.go +++ b/test/integration/controller/jobs/pod/pod_controller_test.go @@ -30,6 +30,7 @@ import ( "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/utils/clock" "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" @@ -330,7 +331,7 @@ var _ = ginkgo.Describe("Pod controller", ginkgo.Ordered, ginkgo.ContinueOnFailu gomega.Expect( workload.UpdateStatus(ctx, k8sClient, createdWorkload, kueue.WorkloadEvicted, metav1.ConditionTrue, - kueue.WorkloadEvictedByPreemption, "By test", "evict"), + kueue.WorkloadEvictedByPreemption, "By test", "evict", clock.RealClock{}), ).Should(gomega.Succeed()) util.FinishEvictionForWorkloads(ctx, k8sClient, createdWorkload) diff --git a/test/performance/scheduler/runner/controller/controller.go b/test/performance/scheduler/runner/controller/controller.go index 638ef41749..9b87562fb1 100644 --- a/test/performance/scheduler/runner/controller/controller.go +++ b/test/performance/scheduler/runner/controller/controller.go @@ -20,12 +20,14 @@ import ( "context" "strconv" "sync" + "testing" "time" apimeta "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/util/workqueue" + "k8s.io/utils/clock" "k8s.io/utils/ptr" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -47,8 +49,13 @@ type reconciler struct { atLock sync.RWMutex admissionTime map[types.UID]time.Time recorder *recorder.Recorder + clock clock.Clock } +var ( + realClock = clock.RealClock{} +) + func (r *reconciler) getAdmittedTime(uid types.UID) (time.Time, bool) { r.atLock.RLock() defer r.atLock.RUnlock() @@ -138,7 +145,7 @@ func (r *reconciler) Reconcile(ctx context.Context, req reconcile.Request) (reco if remaining > 0 { return reconcile.Result{RequeueAfter: remaining}, nil } else { - err := workload.UpdateStatus(ctx, r.client, &wl, kueue.WorkloadFinished, metav1.ConditionTrue, "ByTest", "By test runner", constants.JobControllerName) + err := workload.UpdateStatus(ctx, r.client, &wl, kueue.WorkloadFinished, metav1.ConditionTrue, "ByTest", "By test runner", constants.JobControllerName, r.clock) if err == nil { log.V(5).Info("Finish Workload") } @@ -148,11 +155,34 @@ func (r *reconciler) Reconcile(ctx context.Context, req reconcile.Request) (reco return reconcile.Result{}, nil } -func NewReconciler(c client.Client, r *recorder.Recorder) *reconciler { +func NewReconciler(c client.Client, r *recorder.Recorder, opts ...Option) *reconciler { + options := defaultOptions + + for _, opt := range opts { + opt(&options) + } + return &reconciler{ client: c, admissionTime: map[types.UID]time.Time{}, recorder: r, + clock: options.clock, + } +} + +type options struct { + clock clock.Clock +} + +type Option func(*options) + +var defaultOptions = options{ + clock: realClock, +} + +func WithClock(_ testing.TB, c clock.Clock) Option { + return func(o *options) { + o.clock = c } } diff --git a/test/util/util.go b/test/util/util.go index e2f86f825e..1de8cbbbee 100644 --- a/test/util/util.go +++ b/test/util/util.go @@ -44,6 +44,7 @@ import ( "k8s.io/apimachinery/pkg/watch" "k8s.io/component-base/metrics/testutil" "k8s.io/klog/v2" + "k8s.io/utils/clock" "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" @@ -594,7 +595,7 @@ func SetQuotaReservation(ctx context.Context, k8sClient client.Client, wl *kueue if admission == nil { workload.UnsetQuotaReservationWithCondition(wl, "EvictedByTest", "Evicted By Test", time.Now()) } else { - workload.SetQuotaReservation(wl, admission) + workload.SetQuotaReservation(wl, admission, clock.RealClock{}) } return workload.ApplyAdmissionStatus(ctx, k8sClient, wl, false) }