Skip to content

Commit

Permalink
Replace time.Now() with clock.Clock.Now() in pkg/workload/workloa…
Browse files Browse the repository at this point in the history
…d.go (#3870)

* add clock argument to UpdateStatus

* use realClock in integration tests

* add clock argument to UpdateRequeState and use it

* fix lint errors

* use realClock instead of fakeClock in util.go
  • Loading branch information
TusharMohapatra07 authored Dec 31, 2024
1 parent 2a2486c commit 7eb9830
Show file tree
Hide file tree
Showing 9 changed files with 55 additions and 16 deletions.
8 changes: 4 additions & 4 deletions pkg/controller/jobframework/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ func (r *JobReconciler) ReconcileGenericJob(ctx context.Context, req ctrl.Reques
if !success {
reason = kueue.WorkloadFinishedReasonFailed
}
err := workload.UpdateStatus(ctx, r.client, wl, kueue.WorkloadFinished, metav1.ConditionTrue, reason, message, constants.JobControllerName)
err := workload.UpdateStatus(ctx, r.client, wl, kueue.WorkloadFinished, metav1.ConditionTrue, reason, message, constants.JobControllerName, r.clock)
if err != nil && !apierrors.IsNotFound(err) {
return ctrl.Result{}, err
}
Expand Down Expand Up @@ -465,7 +465,7 @@ func (r *JobReconciler) ReconcileGenericJob(ctx context.Context, req ctrl.Reques
if !apimeta.IsStatusConditionPresentAndEqual(wl.Status.Conditions, condition.Type, condition.Status) {
log.V(3).Info(fmt.Sprintf("Updating the PodsReady condition with status: %v", condition.Status))
apimeta.SetStatusCondition(&wl.Status.Conditions, condition)
err := workload.UpdateStatus(ctx, r.client, wl, condition.Type, condition.Status, condition.Reason, condition.Message, constants.JobControllerName)
err := workload.UpdateStatus(ctx, r.client, wl, condition.Type, condition.Status, condition.Reason, condition.Message, constants.JobControllerName, r.clock)
if err != nil {
log.Error(err, "Updating workload status")
}
Expand Down Expand Up @@ -504,7 +504,7 @@ func (r *JobReconciler) ReconcileGenericJob(ctx context.Context, req ctrl.Reques
log.Error(err, "Unsuspending job")
if podset.IsPermanent(err) {
// Mark the workload as finished with failure since the is no point to retry.
errUpdateStatus := workload.UpdateStatus(ctx, r.client, wl, kueue.WorkloadFinished, metav1.ConditionTrue, FailedToStartFinishedReason, err.Error(), constants.JobControllerName)
errUpdateStatus := workload.UpdateStatus(ctx, r.client, wl, kueue.WorkloadFinished, metav1.ConditionTrue, FailedToStartFinishedReason, err.Error(), constants.JobControllerName, r.clock)
if errUpdateStatus != nil {
log.Error(errUpdateStatus, "Updating workload status, on start failure", "err", err)
}
Expand Down Expand Up @@ -756,7 +756,7 @@ func (r *JobReconciler) ensurePrebuiltWorkloadInSync(ctx context.Context, wl *ku
metav1.ConditionTrue,
kueue.WorkloadFinishedReasonOutOfSync,
"The prebuilt workload is out of sync with its user job",
constants.JobControllerName)
constants.JobControllerName, r.clock)
return false, err
}
return true, nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,7 @@ func (s *Scheduler) admit(ctx context.Context, e *entry, cq *cache.ClusterQueueS
PodSetAssignments: e.assignment.ToAPI(),
}

workload.SetQuotaReservation(newWorkload, admission)
workload.SetQuotaReservation(newWorkload, admission, s.clock)
if workload.HasAllChecks(newWorkload, workload.AdmissionChecksForWorkload(log, newWorkload, cq.AdmissionChecks)) {
// sync Admitted, ignore the result since an API update is always done.
_ = workload.SyncAdmittedCondition(newWorkload, s.clock.Now())
Expand Down
11 changes: 6 additions & 5 deletions pkg/workload/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,8 +453,9 @@ func UpdateStatus(ctx context.Context,
conditionType string,
conditionStatus metav1.ConditionStatus,
reason, message string,
managerPrefix string) error {
now := metav1.Now()
managerPrefix string,
clock clock.Clock) error {
now := metav1.NewTime(clock.Now())
condition := metav1.Condition{
Type: conditionType,
Status: conditionStatus,
Expand Down Expand Up @@ -568,7 +569,7 @@ func BaseSSAWorkload(w *kueue.Workload) *kueue.Workload {

// SetQuotaReservation applies the provided admission to the workload.
// The WorkloadAdmitted and WorkloadEvicted are added or updated if necessary.
func SetQuotaReservation(w *kueue.Workload, admission *kueue.Admission) {
func SetQuotaReservation(w *kueue.Workload, admission *kueue.Admission, clock clock.Clock) {
w.Status.Admission = admission
message := fmt.Sprintf("Quota reserved in ClusterQueue %s", w.Status.Admission.ClusterQueue)
admittedCond := metav1.Condition{
Expand All @@ -585,14 +586,14 @@ func SetQuotaReservation(w *kueue.Workload, admission *kueue.Admission) {
evictedCond.Status = metav1.ConditionFalse
evictedCond.Reason = "QuotaReserved"
evictedCond.Message = api.TruncateConditionMessage("Previously: " + evictedCond.Message)
evictedCond.LastTransitionTime = metav1.Now()
evictedCond.LastTransitionTime = metav1.NewTime(clock.Now())
}
// reset Preempted condition if present.
if preemptedCond := apimeta.FindStatusCondition(w.Status.Conditions, kueue.WorkloadPreempted); preemptedCond != nil {
preemptedCond.Status = metav1.ConditionFalse
preemptedCond.Reason = "QuotaReserved"
preemptedCond.Message = api.TruncateConditionMessage("Previously: " + preemptedCond.Message)
preemptedCond.LastTransitionTime = metav1.Now()
preemptedCond.LastTransitionTime = metav1.NewTime(clock.Now())
}
}

Expand Down
5 changes: 4 additions & 1 deletion pkg/workload/workload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
testingclock "k8s.io/utils/clock/testing"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"

Expand Down Expand Up @@ -364,6 +365,8 @@ func TestNewInfo(t *testing.T) {
}

func TestUpdateWorkloadStatus(t *testing.T) {
now := time.Now()
fakeClock := testingclock.NewFakeClock(now)
cases := map[string]struct {
oldStatus kueue.WorkloadStatus
condType string
Expand Down Expand Up @@ -421,7 +424,7 @@ func TestUpdateWorkloadStatus(t *testing.T) {
workload.Status = tc.oldStatus
cl := utiltesting.NewFakeClientSSAAsSM(workload)
ctx := context.Background()
err := UpdateStatus(ctx, cl, workload, tc.condType, tc.condStatus, tc.reason, tc.message, "manager-prefix")
err := UpdateStatus(ctx, cl, workload, tc.condType, tc.condStatus, tc.reason, tc.message, "manager-prefix", fakeClock)
if err != nil {
t.Fatalf("Failed updating status: %v", err)
}
Expand Down
2 changes: 2 additions & 0 deletions test/integration/controller/jobs/job/job_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/clock"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -622,6 +623,7 @@ var _ = ginkgo.Describe("Job controller", ginkgo.Ordered, ginkgo.ContinueOnFailu
kueue.WorkloadEvicted,
metav1.ConditionTrue,
kueue.WorkloadEvictedByPreemption, "By test", "evict",
clock.RealClock{},
)).Should(gomega.Succeed())
}, util.Timeout, util.Interval).Should(gomega.Succeed())
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/clock"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -361,7 +362,7 @@ var _ = ginkgo.Describe("JobSet controller", ginkgo.Ordered, ginkgo.ContinueOnFa
ginkgo.By("preempt the workload", func() {
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(k8sClient.Get(ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed())
g.Expect(workload.UpdateStatus(ctx, k8sClient, createdWorkload, kueue.WorkloadEvicted, metav1.ConditionTrue, kueue.WorkloadEvictedByPreemption, "By test", "evict")).To(gomega.Succeed())
g.Expect(workload.UpdateStatus(ctx, k8sClient, createdWorkload, kueue.WorkloadEvicted, metav1.ConditionTrue, kueue.WorkloadEvictedByPreemption, "By test", "evict", clock.RealClock{})).To(gomega.Succeed())
}, util.Timeout, util.Interval).Should(gomega.Succeed())
})

Expand Down
3 changes: 2 additions & 1 deletion test/integration/controller/jobs/pod/pod_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/clock"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
Expand Down Expand Up @@ -330,7 +331,7 @@ var _ = ginkgo.Describe("Pod controller", ginkgo.Ordered, ginkgo.ContinueOnFailu

gomega.Expect(
workload.UpdateStatus(ctx, k8sClient, createdWorkload, kueue.WorkloadEvicted, metav1.ConditionTrue,
kueue.WorkloadEvictedByPreemption, "By test", "evict"),
kueue.WorkloadEvictedByPreemption, "By test", "evict", clock.RealClock{}),
).Should(gomega.Succeed())
util.FinishEvictionForWorkloads(ctx, k8sClient, createdWorkload)

Expand Down
34 changes: 32 additions & 2 deletions test/performance/scheduler/runner/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ import (
"context"
"strconv"
"sync"
"testing"
"time"

apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/workqueue"
"k8s.io/utils/clock"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -47,8 +49,13 @@ type reconciler struct {
atLock sync.RWMutex
admissionTime map[types.UID]time.Time
recorder *recorder.Recorder
clock clock.Clock
}

var (
realClock = clock.RealClock{}
)

func (r *reconciler) getAdmittedTime(uid types.UID) (time.Time, bool) {
r.atLock.RLock()
defer r.atLock.RUnlock()
Expand Down Expand Up @@ -138,7 +145,7 @@ func (r *reconciler) Reconcile(ctx context.Context, req reconcile.Request) (reco
if remaining > 0 {
return reconcile.Result{RequeueAfter: remaining}, nil
} else {
err := workload.UpdateStatus(ctx, r.client, &wl, kueue.WorkloadFinished, metav1.ConditionTrue, "ByTest", "By test runner", constants.JobControllerName)
err := workload.UpdateStatus(ctx, r.client, &wl, kueue.WorkloadFinished, metav1.ConditionTrue, "ByTest", "By test runner", constants.JobControllerName, r.clock)
if err == nil {
log.V(5).Info("Finish Workload")
}
Expand All @@ -148,11 +155,34 @@ func (r *reconciler) Reconcile(ctx context.Context, req reconcile.Request) (reco
return reconcile.Result{}, nil
}

func NewReconciler(c client.Client, r *recorder.Recorder) *reconciler {
func NewReconciler(c client.Client, r *recorder.Recorder, opts ...Option) *reconciler {
options := defaultOptions

for _, opt := range opts {
opt(&options)
}

return &reconciler{
client: c,
admissionTime: map[types.UID]time.Time{},
recorder: r,
clock: options.clock,
}
}

type options struct {
clock clock.Clock
}

type Option func(*options)

var defaultOptions = options{
clock: realClock,
}

func WithClock(_ testing.TB, c clock.Clock) Option {
return func(o *options) {
o.clock = c
}
}

Expand Down
3 changes: 2 additions & 1 deletion test/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"k8s.io/apimachinery/pkg/watch"
"k8s.io/component-base/metrics/testutil"
"k8s.io/klog/v2"
"k8s.io/utils/clock"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
Expand Down Expand Up @@ -594,7 +595,7 @@ func SetQuotaReservation(ctx context.Context, k8sClient client.Client, wl *kueue
if admission == nil {
workload.UnsetQuotaReservationWithCondition(wl, "EvictedByTest", "Evicted By Test", time.Now())
} else {
workload.SetQuotaReservation(wl, admission)
workload.SetQuotaReservation(wl, admission, clock.RealClock{})
}
return workload.ApplyAdmissionStatus(ctx, k8sClient, wl, false)
}
Expand Down

0 comments on commit 7eb9830

Please sign in to comment.