Skip to content

Commit

Permalink
WaitForPodsReady: Reset .status.requeueState.count once the workload …
Browse files Browse the repository at this point in the history
…is deactivated

Signed-off-by: Yuki Iwai <[email protected]>
  • Loading branch information
tenzen-y committed May 25, 2024
1 parent fab6cfe commit 2cbde11
Show file tree
Hide file tree
Showing 6 changed files with 252 additions and 72 deletions.
46 changes: 31 additions & 15 deletions pkg/controller/core/workload_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,13 +163,6 @@ func (r *WorkloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c

if ptr.Deref(wl.Spec.Active, true) {
var updated bool

// If a deactivated workload is re-activated we need to reset the RequeueState.
if workload.IsEvictedByDeactivation(&wl) && wl.Status.RequeueState != nil {
wl.Status.RequeueState = nil
updated = true
}

if cond := apimeta.FindStatusCondition(wl.Status.Conditions, kueue.WorkloadRequeued); cond != nil && cond.Status == metav1.ConditionFalse {
switch cond.Reason {
case kueue.WorkloadEvictedByDeactivation:
Expand All @@ -194,16 +187,39 @@ func (r *WorkloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
if updated {
return ctrl.Result{}, workload.ApplyAdmissionStatus(ctx, r.client, &wl, true)
}
} else if !apimeta.IsStatusConditionTrue(wl.Status.Conditions, kueue.WorkloadEvicted) {
// if job is not active and does not have condition reason of WorkloadEvictedByDeactivation, update its condition
workload.SetEvictedCondition(&wl, kueue.WorkloadEvictedByDeactivation, "The workload is deactivated")
if err := workload.ApplyAdmissionStatus(ctx, r.client, &wl, true); err != nil {
return ctrl.Result{}, fmt.Errorf("setting eviction: %w", err)
} else {
var updated, evicted bool
if !apimeta.IsStatusConditionTrue(wl.Status.Conditions, kueue.WorkloadEvicted) {
message := "The workload is deactivated"

// The deactivation reason could be deduced as the maximum number of re-queuing retries if the workload met all criteria below:
// 1. The waitForPodsReady feature is enabled, which means that it has "PodsReady" condition.
// 2. The workload has already exceeded the PodsReadyTimeout.
// 3. The workload already has been re-queued previously, which means it doesn't have the requeueAt field.
// 4. The number of re-queued has already reached the waitForPodsReady.requeuingBackoffLimitCount.
if apimeta.IsStatusConditionFalse(wl.Status.Conditions, kueue.WorkloadPodsReady) &&
((!workload.HasRequeueState(&wl) && ptr.Equal(r.waitForPodsReady.requeuingBackoffLimitCount, ptr.To[int32](0))) ||
(workload.HasRequeueState(&wl) && wl.Status.RequeueState.RequeueAt == nil &&
ptr.Equal(wl.Status.RequeueState.Count, r.waitForPodsReady.requeuingBackoffLimitCount))) {
message = fmt.Sprintf("%s by exceeded the maximum number of re-queuing retries", message)
}
workload.SetEvictedCondition(&wl, kueue.WorkloadEvictedByDeactivation, message)
updated = true
evicted = true
}
if wl.Status.Admission != nil {
metrics.ReportEvictedWorkloads(string(wl.Status.Admission.ClusterQueue), kueue.WorkloadEvictedByDeactivation)
if wl.Status.RequeueState != nil {
wl.Status.RequeueState = nil
updated = true
}
if updated {
if err := workload.ApplyAdmissionStatus(ctx, r.client, &wl, true); err != nil {
return ctrl.Result{}, fmt.Errorf("setting eviction: %w", err)
}
if evicted && wl.Status.Admission != nil {
metrics.ReportEvictedWorkloads(string(wl.Status.Admission.ClusterQueue), kueue.WorkloadEvictedByDeactivation)
}
return ctrl.Result{}, nil
}
return ctrl.Result{}, nil
}

cqName, cqOk := r.queues.ClusterQueueForWorkload(&wl)
Expand Down
106 changes: 85 additions & 21 deletions pkg/controller/core/workload_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ var (
workloadCmpOpts = []cmp.Option{
cmpopts.EquateEmpty(),
cmpopts.IgnoreFields(
kueue.Workload{}, "TypeMeta", "ObjectMeta.ResourceVersion", "Status.RequeueState.RequeueAt",
kueue.Workload{}, "TypeMeta", "ObjectMeta.ResourceVersion",
),
cmpopts.IgnoreFields(metav1.Condition{}, "LastTransitionTime"),
cmpopts.IgnoreFields(kueue.AdmissionCheckState{}, "LastTransitionTime"),
Expand Down Expand Up @@ -646,8 +646,6 @@ func TestReconcile(t *testing.T) {
Reason: kueue.WorkloadEvictedByDeactivation,
Message: "The workload is deactivated",
}).
// The fake test not allow to save state with nil values when updating by Patch/Apply. So we are skipping this case.
// RequeueState(ptr.To[int32](4), ptr.To(metav1.NewTime(testStartTime.Truncate(time.Second)))).
Obj(),
wantWorkload: utiltesting.MakeWorkload("wl", "ns").
Active(true).
Expand Down Expand Up @@ -700,7 +698,7 @@ func TestReconcile(t *testing.T) {
Reason: kueue.WorkloadBackoffFinished,
Message: "The workload backoff was finished",
}).
RequeueState(ptr.To[int32](1), nil).
RequeueState(ptr.To[int32](1), ptr.To(metav1.NewTime(testStartTime.Truncate(time.Second)))).
Obj(),
},
"shouldn't set the WorkloadRequeued condition when backoff expires and workload finished": {
Expand Down Expand Up @@ -791,7 +789,7 @@ func TestReconcile(t *testing.T) {
}).
Obj(),
},
"should set the Evicted condition with InactiveWorkload reason when the .spec.active=False and Admitted when the Workload has Evicted=False condition": {
"should set the Evicted condition with InactiveWorkload reason when the .spec.active is False, Admitted, and the Workload has Evicted=False condition": {
workload: utiltesting.MakeWorkload("wl", "ns").
Active(false).
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Expand All @@ -815,7 +813,88 @@ func TestReconcile(t *testing.T) {
}).
Obj(),
},
"should keep the previous eviction reason when the Workload is already evicted by other reason even thou the Workload is deactivated.": {
"[backoffLimitCount: 0] should set the Evicted condition with InactiveWorkload reason, exceeded the maximum number of requeue retries" +
"when the .spec.active is False, Admitted, the Workload has Evicted=False and PodsReady=False condition": {
reconcilerOpts: []Option{
WithWaitForPodsReady(&waitForPodsReadyConfig{
timeout: 3 * time.Second,
requeuingBackoffLimitCount: ptr.To[int32](0),
requeuingBackoffBaseSeconds: 10,
requeuingBackoffJitter: 0,
}),
},
workload: utiltesting.MakeWorkload("wl", "ns").
Active(false).
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Admitted(true).
Condition(metav1.Condition{
Type: kueue.WorkloadPodsReady,
Status: metav1.ConditionFalse,
Reason: "PodsReady",
Message: "Not all pods are ready or succeeded",
}).
Obj(),
wantWorkload: utiltesting.MakeWorkload("wl", "ns").
Active(false).
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Admitted(true).
Condition(metav1.Condition{
Type: kueue.WorkloadPodsReady,
Status: metav1.ConditionFalse,
Reason: "PodsReady",
Message: "Not all pods are ready or succeeded",
}).
Condition(metav1.Condition{
Type: kueue.WorkloadEvicted,
Status: metav1.ConditionTrue,
Reason: kueue.WorkloadEvictedByDeactivation,
Message: "The workload is deactivated by exceeded the maximum number of re-queuing retries",
}).
Obj(),
},
"[backoffLimitCount: 100] should set the Evicted condition with InactiveWorkload reason, exceeded the maximum number of requeue retries" +
"when the .spec.active is False, Admitted, the Workload has Evicted=False and PodsReady=False condition, and the requeueState.count equals to backoffLimitCount": {
reconcilerOpts: []Option{
WithWaitForPodsReady(&waitForPodsReadyConfig{
timeout: 3 * time.Second,
requeuingBackoffLimitCount: ptr.To[int32](100),
requeuingBackoffBaseSeconds: 10,
requeuingBackoffJitter: 0,
}),
},
workload: utiltesting.MakeWorkload("wl", "ns").
Active(false).
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Admitted(true).
Condition(metav1.Condition{
Type: kueue.WorkloadPodsReady,
Status: metav1.ConditionFalse,
Reason: "PodsReady",
Message: "Not all pods are ready or succeeded",
}).
RequeueState(ptr.To[int32](100), nil).
Obj(),
wantWorkload: utiltesting.MakeWorkload("wl", "ns").
Active(false).
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Admitted(true).
Condition(metav1.Condition{
Type: kueue.WorkloadPodsReady,
Status: metav1.ConditionFalse,
Reason: "PodsReady",
Message: "Not all pods are ready or succeeded",
}).
Condition(metav1.Condition{
Type: kueue.WorkloadEvicted,
Status: metav1.ConditionTrue,
Reason: kueue.WorkloadEvictedByDeactivation,
Message: "The workload is deactivated by exceeded the maximum number of re-queuing retries",
}).
// The requeueState should be reset in the real cluster, but the fake client doesn't allow us to do it.
RequeueState(ptr.To[int32](100), nil).
Obj(),
},
"should keep the previous eviction reason when the Workload is already evicted by other reason even though the Workload is deactivated.": {
workload: utiltesting.MakeWorkload("wl", "ns").
Active(false).
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Expand Down Expand Up @@ -882,24 +961,9 @@ func TestReconcile(t *testing.T) {
}
gotWorkload = nil
}

if diff := cmp.Diff(tc.wantWorkload, gotWorkload, workloadCmpOpts...); diff != "" {
t.Errorf("Workloads after reconcile (-want,+got):\n%s", diff)
}

if tc.wantWorkload != nil {
if requeueState := tc.wantWorkload.Status.RequeueState; requeueState != nil && requeueState.RequeueAt != nil {
gotRequeueState := gotWorkload.Status.RequeueState
if gotRequeueState != nil && gotRequeueState.RequeueAt != nil {
if !gotRequeueState.RequeueAt.Equal(requeueState.RequeueAt) {
t.Errorf("Unexpected requeueState.requeueAt; gotRequeueAt %v needs to be after requeueAt %v", requeueState.RequeueAt, gotRequeueState.RequeueAt)
}
} else {
t.Errorf("Unexpected nil requeueState.requeuAt; requeueState.requeueAt shouldn't be nil")
}
}
}

if diff := cmp.Diff(tc.wantEvents, recorder.RecordedEvents, cmpopts.IgnoreFields(utiltesting.EventRecord{}, "Message")); diff != "" {
t.Errorf("unexpected events (-want/+got):\n%s", diff)
}
Expand Down
53 changes: 51 additions & 2 deletions test/integration/controller/jobs/job/job_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1892,6 +1892,7 @@ var _ = ginkgo.Describe("Job controller interacting with scheduler", ginkgo.Orde
var _ = ginkgo.Describe("Job controller interacting with Workload controller when waitForPodsReady with requeuing strategy is enabled", ginkgo.Ordered, ginkgo.ContinueOnFailure, func() {
var (
backoffBaseSeconds int32
backLimitCount *int32
ns *corev1.Namespace
fl *kueue.ResourceFlavor
cq *kueue.ClusterQueue
Expand All @@ -1910,6 +1911,7 @@ var _ = ginkgo.Describe("Job controller interacting with Workload controller whe
RequeuingStrategy: &configapi.RequeuingStrategy{
Timestamp: ptr.To(configapi.EvictionTimestamp),
BackoffBaseSeconds: ptr.To[int32](backoffBaseSeconds),
BackoffLimitCount: backLimitCount,
},
}
ctx, k8sClient = fwk.RunManager(cfg, managerAndControllersSetup(
Expand Down Expand Up @@ -2000,8 +2002,8 @@ var _ = ginkgo.Describe("Job controller interacting with Workload controller whe
ginkgo.When("short backoffBaseSeconds", func() {
ginkgo.BeforeEach(func() {
backoffBaseSeconds = 1
backLimitCount = ptr.To[int32](1)
})

ginkgo.It("should re-queue a workload evicted due to PodsReady timeout after the backoff elapses", func() {
ginkgo.By("creating job")
job := testingjob.MakeJob("job", ns.Name).Queue(lq.Name).Request(corev1.ResourceCPU, "2").Obj()
Expand All @@ -2011,9 +2013,10 @@ var _ = ginkgo.Describe("Job controller interacting with Workload controller whe
wlKey := types.NamespacedName{Name: workloadjob.GetWorkloadNameForJob(job.Name, job.UID), Namespace: job.Namespace}

ginkgo.By("admit the workload, it gets evicted due to PodsReadyTimeout and re-queued")
var admission *kueue.Admission
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(k8sClient.Get(ctx, wlKey, wl)).Should(gomega.Succeed())
admission := testing.MakeAdmission(cq.Name).
admission = testing.MakeAdmission(cq.Name).
Assignment(corev1.ResourceCPU, "on-demand", "1m").
AssignmentPodCount(wl.Spec.PodSets[0].Count).
Obj()
Expand Down Expand Up @@ -2060,6 +2063,52 @@ var _ = ginkgo.Describe("Job controller interacting with Workload controller whe
}, util.IgnoreConditionTimestampsAndObservedGeneration),
))
}, util.Timeout, util.Interval).Should(gomega.Succeed())

ginkgo.By("re-admit the workload to exceed the backoffLimitCount", func() {
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(k8sClient.Get(ctx, wlKey, wl)).Should(gomega.Succeed())
g.Expect(util.SetQuotaReservation(ctx, k8sClient, wl, admission)).Should(gomega.Succeed())
util.SyncAdmittedConditionForWorkloads(ctx, k8sClient, wl)
}, util.Timeout, util.Interval).Should(gomega.Succeed())
})

ginkgo.By("checking the workload is evicted by deactivated due to PodsReadyTimeout")
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(k8sClient.Get(ctx, wlKey, wl)).Should(gomega.Succeed())
g.Expect(wl.Status.RequeueState).Should(gomega.BeNil())
g.Expect(wl.Status.Conditions).To(gomega.ContainElements(
gomega.BeComparableTo(metav1.Condition{
Type: kueue.WorkloadPodsReady,
Status: metav1.ConditionFalse,
Reason: kueue.WorkloadPodsReady,
Message: "Not all pods are ready or succeeded",
}, util.IgnoreConditionTimestampsAndObservedGeneration),
gomega.BeComparableTo(metav1.Condition{
Type: kueue.WorkloadQuotaReserved,
Status: metav1.ConditionFalse,
Reason: "Pending",
Message: "The workload is deactivated by exceeded the maximum number of re-queuing retries",
}, util.IgnoreConditionTimestampsAndObservedGeneration),
gomega.BeComparableTo(metav1.Condition{
Type: kueue.WorkloadEvicted,
Status: metav1.ConditionTrue,
Reason: kueue.WorkloadEvictedByDeactivation,
Message: "The workload is deactivated by exceeded the maximum number of re-queuing retries",
}, util.IgnoreConditionTimestampsAndObservedGeneration),
gomega.BeComparableTo(metav1.Condition{
Type: kueue.WorkloadAdmitted,
Status: metav1.ConditionFalse,
Reason: "NoReservation",
Message: "The workload has no reservation",
}, util.IgnoreConditionTimestampsAndObservedGeneration),
gomega.BeComparableTo(metav1.Condition{
Type: kueue.WorkloadRequeued,
Status: metav1.ConditionFalse,
Reason: kueue.WorkloadEvictedByDeactivation,
Message: "The workload is deactivated by exceeded the maximum number of re-queuing retries",
}, util.IgnoreConditionTimestampsAndObservedGeneration),
))
}, util.Timeout, util.Interval).Should(gomega.Succeed())
})
})
})
Expand Down
10 changes: 4 additions & 6 deletions test/integration/controller/jobs/pod/pod_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1795,9 +1795,7 @@ var _ = ginkgo.Describe("Pod controller interacting with Workload controller whe
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(k8sClient.Get(ctx, wlKey, wl)).Should(gomega.Succeed())
g.Expect(ptr.Deref(wl.Spec.Active, true)).Should(gomega.BeFalse())
g.Expect(wl.Status.RequeueState).ShouldNot(gomega.BeNil())
g.Expect(wl.Status.RequeueState.Count).Should(gomega.Equal(ptr.To[int32](1)))
g.Expect(wl.Status.RequeueState.RequeueAt).Should(gomega.BeNil())
g.Expect(wl.Status.RequeueState).Should(gomega.BeNil())
g.Expect(wl.Status.Conditions).To(gomega.ContainElements(
gomega.BeComparableTo(metav1.Condition{
Type: kueue.WorkloadPodsReady,
Expand All @@ -1809,13 +1807,13 @@ var _ = ginkgo.Describe("Pod controller interacting with Workload controller whe
Type: kueue.WorkloadQuotaReserved,
Status: metav1.ConditionFalse,
Reason: "Pending",
Message: "The workload is deactivated",
Message: "The workload is deactivated by exceeded the maximum number of re-queuing retries",
}, util.IgnoreConditionTimestampsAndObservedGeneration),
gomega.BeComparableTo(metav1.Condition{
Type: kueue.WorkloadEvicted,
Status: metav1.ConditionTrue,
Reason: kueue.WorkloadEvictedByDeactivation,
Message: "The workload is deactivated",
Message: "The workload is deactivated by exceeded the maximum number of re-queuing retries",
}, util.IgnoreConditionTimestampsAndObservedGeneration),
gomega.BeComparableTo(metav1.Condition{
Type: kueue.WorkloadAdmitted,
Expand All @@ -1827,7 +1825,7 @@ var _ = ginkgo.Describe("Pod controller interacting with Workload controller whe
Type: podcontroller.WorkloadWaitingForReplacementPods,
Status: metav1.ConditionTrue,
Reason: kueue.WorkloadEvictedByDeactivation,
Message: "The workload is deactivated",
Message: "The workload is deactivated by exceeded the maximum number of re-queuing retries",
}, util.IgnoreConditionTimestampsAndObservedGeneration),
))
}, util.Timeout, util.Interval).Should(gomega.Succeed())
Expand Down
Loading

0 comments on commit 2cbde11

Please sign in to comment.