From df16ac1531f9ff1508ea26c045641f8d35a6220f Mon Sep 17 00:00:00 2001 From: nappelson Date: Fri, 22 Dec 2023 04:50:53 -0500 Subject: [PATCH] Fix issue where paused annotation being set to false still leads to scaled objects/jobs being paused (#5257) Signed-off-by: Nate Appelson --- CHANGELOG.md | 1 + apis/keda/v1alpha1/scaledobject_types.go | 18 +- controllers/keda/scaledjob_controller.go | 12 +- controllers/keda/scaledjob_controller_test.go | 181 ++++++++++++++++++ controllers/keda/scaledobject_controller.go | 10 +- .../keda/scaledobject_controller_test.go | 118 ++++++++++++ controllers/keda/suite_test.go | 7 + .../pause_scaledjob/pause_scaledjob_test.go | 16 ++ .../pause_scaledobject_explicitly_test.go | 49 +++-- 9 files changed, 383 insertions(+), 29 deletions(-) create mode 100644 controllers/keda/scaledjob_controller_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 1a8b98d4d97..b3d6ffadd00 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -64,6 +64,7 @@ Here is an overview of all new **experimental** features: - **General**: Add parameter queryParameters to prometheus-scaler ([#4962](https://github.com/kedacore/keda/issues/4962)) - **General**: Add validations for replica counts when creating ScaledObjects ([#5288](https://github.com/kedacore/keda/issues/5288)) - **General**: Bubble up AuthRef TriggerAuthentication errors as ScaledObject events ([#5190](https://github.com/kedacore/keda/issues/5190)) +- **General**: Fix issue where paused annotation being set to false still leads to scaled objects/jobs being paused ([#5215](https://github.com/kedacore/keda/issues/5215)) - **General**: Support TriggerAuthentication properties from ConfigMap ([#4830](https://github.com/kedacore/keda/issues/4830)) - **General**: Use client-side round-robin load balancing for grpc calls ([#5224](https://github.com/kedacore/keda/issues/5224)) - **GCP pubsub scaler**: Support distribution-valued metrics and metrics from topics ([#5070](https://github.com/kedacore/keda/issues/5070)) diff --git a/apis/keda/v1alpha1/scaledobject_types.go b/apis/keda/v1alpha1/scaledobject_types.go index a7357f5f804..0e8ddf614fe 100644 --- a/apis/keda/v1alpha1/scaledobject_types.go +++ b/apis/keda/v1alpha1/scaledobject_types.go @@ -19,6 +19,7 @@ package v1alpha1 import ( "fmt" "reflect" + "strconv" autoscalingv2 "k8s.io/api/autoscaling/v2" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -193,6 +194,11 @@ func (so *ScaledObject) GenerateIdentifier() string { return GenerateIdentifier("ScaledObject", so.Namespace, so.Name) } +func (so *ScaledObject) HasPausedReplicaAnnotation() bool { + _, pausedReplicasAnnotationFound := so.GetAnnotations()[PausedReplicasAnnotation] + return pausedReplicasAnnotationFound +} + // HasPausedAnnotition returns whether this ScaledObject has PausedAnnotation or PausedReplicasAnnotation func (so *ScaledObject) HasPausedAnnotation() bool { _, pausedAnnotationFound := so.GetAnnotations()[PausedAnnotation] @@ -207,8 +213,16 @@ func (so *ScaledObject) NeedToBePausedByAnnotation() bool { return so.Status.PausedReplicaCount != nil } - _, pausedAnnotationFound := so.GetAnnotations()[PausedAnnotation] - return pausedAnnotationFound + pausedAnnotationValue, pausedAnnotationFound := so.GetAnnotations()[PausedAnnotation] + if !pausedAnnotationFound { + return false + } + shouldPause, err := strconv.ParseBool(pausedAnnotationValue) + if err != nil { + // if annotation value is not a boolean, we assume user wants to pause the ScaledObject + return true + } + return shouldPause } // IsUsingModifiers determines whether scalingModifiers are defined or not diff --git a/controllers/keda/scaledjob_controller.go b/controllers/keda/scaledjob_controller.go index 52e0b6e2eb8..1adde787874 100755 --- a/controllers/keda/scaledjob_controller.go +++ b/controllers/keda/scaledjob_controller.go @@ -19,6 +19,7 @@ package keda import ( "context" "fmt" + "strconv" "sync" "time" @@ -216,9 +217,17 @@ func (r *ScaledJobReconciler) reconcileScaledJob(ctx context.Context, logger log // checkIfPaused checks the presence of "autoscaling.keda.sh/paused" annotation on the scaledJob and stop the scale loop. func (r *ScaledJobReconciler) checkIfPaused(ctx context.Context, logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob, conditions *kedav1alpha1.Conditions) (bool, error) { - _, pausedAnnotation := scaledJob.GetAnnotations()[kedav1alpha1.PausedAnnotation] + pausedAnnotationValue, pausedAnnotation := scaledJob.GetAnnotations()[kedav1alpha1.PausedAnnotation] pausedStatus := conditions.GetPausedCondition().Status == metav1.ConditionTrue + shouldPause := false if pausedAnnotation { + var err error + shouldPause, err = strconv.ParseBool(pausedAnnotationValue) + if err != nil { + shouldPause = true + } + } + if shouldPause { if !pausedStatus { logger.Info("ScaledJob is paused, stopping scaling loop.") msg := kedav1alpha1.ScaledJobConditionPausedMessage @@ -286,7 +295,6 @@ func (r *ScaledJobReconciler) deletePreviousVersionScaleJobs(ctx context.Context // requestScaleLoop request ScaleLoop handler for the respective ScaledJob func (r *ScaledJobReconciler) requestScaleLoop(ctx context.Context, logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob) error { logger.V(1).Info("Starting a new ScaleLoop") - key, err := cache.MetaNamespaceKeyFunc(scaledJob) if err != nil { logger.Error(err, "Error getting key for scaledJob") diff --git a/controllers/keda/scaledjob_controller_test.go b/controllers/keda/scaledjob_controller_test.go new file mode 100644 index 00000000000..eb60bbd3b2c --- /dev/null +++ b/controllers/keda/scaledjob_controller_test.go @@ -0,0 +1,181 @@ +package keda + +import ( + "context" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/log/zap" + + kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1" +) + +var _ = Describe("ScaledJobController", func() { + + var ( + testLogger = zap.New(zap.UseDevMode(true), zap.WriteTo(GinkgoWriter)) + ) + + Describe("functional tests", func() { + It("scaledjob paused condition status changes to true on annotation", func() { + jobName := "toggled-to-paused-annotation-name" + sjName := "sj-" + jobName + + sj := &kedav1alpha1.ScaledJob{ + ObjectMeta: metav1.ObjectMeta{ + Name: sjName, + Namespace: "default", + }, + Spec: kedav1alpha1.ScaledJobSpec{ + JobTargetRef: generateJobSpec(jobName), + Triggers: []kedav1alpha1.ScaleTriggers{ + { + Type: "cron", + Metadata: map[string]string{ + "timezone": "UTC", + "start": "0 * * * *", + "end": "1 * * * *", + "desiredReplicas": "1", + }, + }, + }, + }, + } + pollingInterval := int32(5) + sj.Spec.PollingInterval = &pollingInterval + err := k8sClient.Create(context.Background(), sj) + Expect(err).ToNot(HaveOccurred()) + + Eventually(func() metav1.ConditionStatus { + err := k8sClient.Get(context.Background(), types.NamespacedName{Name: sjName, Namespace: "default"}, sj) + if err != nil { + return metav1.ConditionTrue + } + return sj.Status.Conditions.GetPausedCondition().Status + }, 5*time.Second).Should(Or(Equal(metav1.ConditionFalse), Equal(metav1.ConditionUnknown))) + + // set annotation + Eventually(func() error { + err = k8sClient.Get(context.Background(), types.NamespacedName{Name: sjName, Namespace: "default"}, sj) + Expect(err).ToNot(HaveOccurred()) + annotations := make(map[string]string) + annotations[kedav1alpha1.PausedAnnotation] = "true" + sj.SetAnnotations(annotations) + pollingInterval := int32(6) + sj.Spec.PollingInterval = &pollingInterval + return k8sClient.Update(context.Background(), sj) + }).WithTimeout(1 * time.Minute).WithPolling(10 * time.Second).ShouldNot(HaveOccurred()) + testLogger.Info("annotation is set") + + // validate annotation is set correctly + Eventually(func() bool { + err = k8sClient.Get(context.Background(), types.NamespacedName{Name: sjName, Namespace: "default"}, sj) + Expect(err).ToNot(HaveOccurred()) + _, hasAnnotation := sj.GetAnnotations()[kedav1alpha1.PausedAnnotation] + return hasAnnotation + }).WithTimeout(1 * time.Minute).WithPolling(2 * time.Second).Should(BeTrue()) + + Eventually(func() metav1.ConditionStatus { + err := k8sClient.Get(context.Background(), types.NamespacedName{Name: sjName, Namespace: "default"}, sj) + if err != nil { + return metav1.ConditionUnknown + } + return sj.Status.Conditions.GetPausedCondition().Status + }).WithTimeout(2 * time.Minute).WithPolling(10 * time.Second).Should(Equal(metav1.ConditionTrue)) + }) + It("scaledjob paused status stays false when annotation is set to false", func() { + jobName := "turn-off-paused-annotation-name" + sjName := "sj-" + jobName + // create object already paused + sj := &kedav1alpha1.ScaledJob{ + ObjectMeta: metav1.ObjectMeta{ + Name: sjName, + Namespace: "default", + }, + Spec: kedav1alpha1.ScaledJobSpec{ + JobTargetRef: generateJobSpec(jobName), + Triggers: []kedav1alpha1.ScaleTriggers{ + { + Type: "cron", + Metadata: map[string]string{ + "timezone": "UTC", + "start": "0 * * * *", + "end": "1 * * * *", + "desiredReplicas": "1", + }, + }, + }, + }, + } + pollingInterval := int32(5) + sj.Spec.PollingInterval = &pollingInterval + err := k8sClient.Create(context.Background(), sj) + Expect(err).ToNot(HaveOccurred()) + falseAnnotationValue := "false" + // set annotation + Eventually(func() error { + err = k8sClient.Get(context.Background(), types.NamespacedName{Name: sjName, Namespace: "default"}, sj) + Expect(err).ToNot(HaveOccurred()) + annotations := make(map[string]string) + annotations[kedav1alpha1.PausedAnnotation] = falseAnnotationValue + sj.SetAnnotations(annotations) + pollingInterval := int32(6) + sj.Spec.PollingInterval = &pollingInterval + return k8sClient.Update(context.Background(), sj) + }).WithTimeout(1 * time.Minute).WithPolling(10 * time.Second).ShouldNot(HaveOccurred()) + testLogger.Info("annotation is set") + + // validate annotation is set correctly + Eventually(func() bool { + err := k8sClient.Get(context.Background(), types.NamespacedName{Name: sjName, Namespace: "default"}, sj) + Expect(err).ToNot(HaveOccurred()) + value, hasPausedAnnotation := sj.GetAnnotations()[kedav1alpha1.PausedAnnotation] + if !hasPausedAnnotation { + return false + } + return value == falseAnnotationValue + }).WithTimeout(1 * time.Minute).WithPolling(2 * time.Second).Should(BeTrue()) + + // TODO(nappelson) - update assertion to be ConditionFalse + // https://github.com/kedacore/keda/issues/5251 prevents Condition from updating appropriately + Eventually(func() metav1.ConditionStatus { + err := k8sClient.Get(context.Background(), types.NamespacedName{Name: sjName, Namespace: "default"}, sj) + if err != nil { + return metav1.ConditionUnknown + } + return sj.Status.Conditions.GetPausedCondition().Status + }).WithTimeout(1 * time.Minute).WithPolling(10 * time.Second).Should(Equal(metav1.ConditionUnknown)) + }) + + }) +}) + +func generateJobSpec(name string) *batchv1.JobSpec { + return &batchv1.JobSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": name, + }, + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "app": name, + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: name, + Image: name, + }, + }, + }, + }, + } +} diff --git a/controllers/keda/scaledobject_controller.go b/controllers/keda/scaledobject_controller.go index 3b6ff37ff05..55d83a03b8f 100755 --- a/controllers/keda/scaledobject_controller.go +++ b/controllers/keda/scaledobject_controller.go @@ -216,10 +216,10 @@ func (r *ScaledObjectReconciler) Reconcile(ctx context.Context, req ctrl.Request // reconcileScaledObject implements reconciler logic for ScaledObject func (r *ScaledObjectReconciler) reconcileScaledObject(ctx context.Context, logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject, conditions *kedav1alpha1.Conditions) (string, error) { - // Check the presence of "autoscaling.keda.sh/paused-replicas" annotation on the scaledObject (since the presence of this annotation will pause + // Check the presence of "autoscaling.keda.sh/paused" annotation on the scaledObject (since the presence of this annotation will pause // autoscaling no matter what number of replicas is provided), and if so, stop the scale loop and delete the HPA on the scaled object. - pausedAnnotationFound := scaledObject.HasPausedAnnotation() - if pausedAnnotationFound { + needsToPause := scaledObject.NeedToBePausedByAnnotation() + if needsToPause { scaledToPausedCount := true if conditions.GetPausedCondition().Status == metav1.ConditionTrue { // If scaledobject is in paused condition but replica count is not equal to paused replica count, the following scaling logic needs to be trigger again. @@ -228,7 +228,7 @@ func (r *ScaledObjectReconciler) reconcileScaledObject(ctx context.Context, logg return kedav1alpha1.ScaledObjectConditionReadySuccessMessage, nil } } - if scaledObject.NeedToBePausedByAnnotation() && scaledToPausedCount { + if scaledToPausedCount { msg := kedav1alpha1.ScaledObjectConditionPausedMessage if err := r.stopScaleLoop(ctx, logger, scaledObject); err != nil { msg = "failed to stop the scale loop for paused ScaledObject" @@ -298,7 +298,7 @@ func (r *ScaledObjectReconciler) reconcileScaledObject(ctx context.Context, logg } logger.Info("Initializing Scaling logic according to ScaledObject Specification") } - if pausedAnnotationFound && conditions.GetPausedCondition().Status != metav1.ConditionTrue { + if scaledObject.HasPausedReplicaAnnotation() && conditions.GetPausedCondition().Status != metav1.ConditionTrue { return "ScaledObject paused replicas are being scaled", fmt.Errorf("ScaledObject paused replicas are being scaled") } return kedav1alpha1.ScaledObjectConditionReadySuccessMessage, nil diff --git a/controllers/keda/scaledobject_controller_test.go b/controllers/keda/scaledobject_controller_test.go index c346b59f724..fd6e3cfe98e 100644 --- a/controllers/keda/scaledobject_controller_test.go +++ b/controllers/keda/scaledobject_controller_test.go @@ -934,6 +934,124 @@ var _ = Describe("ScaledObjectController", func() { return so.Status.Conditions.GetPausedCondition().Status }).WithTimeout(2 * time.Minute).WithPolling(10 * time.Second).Should(Equal(metav1.ConditionTrue)) }) + It("scaledObject paused status switches to false when annotation is set to false", func() { + // Create the scaling target. + deploymentName := "toggled-to-paused-annotation-false-name" + soName := "so-" + deploymentName + err := k8sClient.Create(context.Background(), generateDeployment(deploymentName)) + Expect(err).ToNot(HaveOccurred()) + + // Create the ScaledObject without specifying name. + so := &kedav1alpha1.ScaledObject{ + ObjectMeta: metav1.ObjectMeta{ + Name: soName, + Namespace: "default", + }, + Spec: kedav1alpha1.ScaledObjectSpec{ + ScaleTargetRef: &kedav1alpha1.ScaleTarget{ + Name: deploymentName, + }, + Advanced: &kedav1alpha1.AdvancedConfig{ + HorizontalPodAutoscalerConfig: &kedav1alpha1.HorizontalPodAutoscalerConfig{}, + }, + Triggers: []kedav1alpha1.ScaleTriggers{ + { + Type: "cron", + Metadata: map[string]string{ + "timezone": "UTC", + "start": "0 * * * *", + "end": "1 * * * *", + "desiredReplicas": "1", + }, + }, + }, + }, + } + pollingInterval := int32(5) + so.Spec.PollingInterval = &pollingInterval + err = k8sClient.Create(context.Background(), so) + Expect(err).ToNot(HaveOccurred()) + + // And validate that hpa is created. + hpa := &autoscalingv2.HorizontalPodAutoscaler{} + Eventually(func() error { + return k8sClient.Get(context.Background(), types.NamespacedName{Name: fmt.Sprintf("keda-hpa-%s", soName), Namespace: "default"}, hpa) + }).ShouldNot(HaveOccurred()) + + // wait so's ready condition Ready + Eventually(func() metav1.ConditionStatus { + err := k8sClient.Get(context.Background(), types.NamespacedName{Name: soName, Namespace: "default"}, so) + if err != nil { + return metav1.ConditionUnknown + } + return so.Status.Conditions.GetReadyCondition().Status + }).Should(Equal(metav1.ConditionTrue)) + + Eventually(func() metav1.ConditionStatus { + err := k8sClient.Get(context.Background(), types.NamespacedName{Name: soName, Namespace: "default"}, so) + if err != nil { + return metav1.ConditionTrue + } + return so.Status.Conditions.GetPausedCondition().Status + }, 5*time.Second).Should(Or(Equal(metav1.ConditionFalse), Equal(metav1.ConditionUnknown))) + + // set annotation to true at first + Eventually(func() error { + err = k8sClient.Get(context.Background(), types.NamespacedName{Name: soName, Namespace: "default"}, so) + Expect(err).ToNot(HaveOccurred()) + annotations := make(map[string]string) + annotations[kedav1alpha1.PausedAnnotation] = "true" + so.SetAnnotations(annotations) + pollingInterval := int32(6) + so.Spec.PollingInterval = &pollingInterval + return k8sClient.Update(context.Background(), so) + }).WithTimeout(1 * time.Minute).WithPolling(10 * time.Second).ShouldNot(HaveOccurred()) + testLogger.Info("annotation is set") + + // validate annotation is set correctly + Eventually(func() bool { + err = k8sClient.Get(context.Background(), types.NamespacedName{Name: soName, Namespace: "default"}, so) + Expect(err).ToNot(HaveOccurred()) + return so.HasPausedAnnotation() + }).WithTimeout(1 * time.Minute).WithPolling(2 * time.Second).Should(BeTrue()) + + Eventually(func() metav1.ConditionStatus { + err := k8sClient.Get(context.Background(), types.NamespacedName{Name: soName, Namespace: "default"}, so) + if err != nil { + return metav1.ConditionUnknown + } + return so.Status.Conditions.GetPausedCondition().Status + }).WithTimeout(2 * time.Minute).WithPolling(10 * time.Second).Should(Equal(metav1.ConditionTrue)) + + // set annotation to false and confirm that ScaledObject is no longer paused + Eventually(func() error { + err = k8sClient.Get(context.Background(), types.NamespacedName{Name: soName, Namespace: "default"}, so) + Expect(err).ToNot(HaveOccurred()) + annotations := make(map[string]string) + annotations[kedav1alpha1.PausedAnnotation] = "false" + so.SetAnnotations(annotations) + pollingInterval := int32(6) + so.Spec.PollingInterval = &pollingInterval + return k8sClient.Update(context.Background(), so) + }).WithTimeout(1 * time.Minute).WithPolling(10 * time.Second).ShouldNot(HaveOccurred()) + testLogger.Info("annotation is set") + + // validate annotation is set correctly + Eventually(func() bool { + err = k8sClient.Get(context.Background(), types.NamespacedName{Name: soName, Namespace: "default"}, so) + Expect(err).ToNot(HaveOccurred()) + return so.HasPausedAnnotation() + }).WithTimeout(1 * time.Minute).WithPolling(2 * time.Second).Should(BeTrue()) + + // ensure object is no longer paused + Eventually(func() metav1.ConditionStatus { + err := k8sClient.Get(context.Background(), types.NamespacedName{Name: soName, Namespace: "default"}, so) + if err != nil { + return metav1.ConditionUnknown + } + return so.Status.Conditions.GetPausedCondition().Status + }).WithTimeout(2 * time.Minute).WithPolling(10 * time.Second).Should(Equal(metav1.ConditionFalse)) + }) // Fix issue 4253 It("deletes hpa when scaledobject has pause annotation", func() { diff --git a/controllers/keda/suite_test.go b/controllers/keda/suite_test.go index 799cd46cacc..55dba8db166 100644 --- a/controllers/keda/suite_test.go +++ b/controllers/keda/suite_test.go @@ -101,6 +101,13 @@ var _ = BeforeSuite(func() { }).SetupWithManager(k8sManager, controller.Options{}) Expect(err).ToNot(HaveOccurred()) + err = (&ScaledJobReconciler{ + Client: k8sManager.GetClient(), + Scheme: k8sManager.GetScheme(), + Recorder: k8sManager.GetEventRecorderFor("keda-operator"), + }).SetupWithManager(k8sManager, controller.Options{}) + Expect(err).ToNot(HaveOccurred()) + k8sClient, err = client.New(cfg, client.Options{Scheme: scheme.Scheme}) Expect(err).ToNot(HaveOccurred()) Expect(k8sClient).ToNot(BeNil()) diff --git a/tests/internals/pause_scaledjob/pause_scaledjob_test.go b/tests/internals/pause_scaledjob/pause_scaledjob_test.go index 1cdacacc2ea..4a3f77ba3bc 100644 --- a/tests/internals/pause_scaledjob/pause_scaledjob_test.go +++ b/tests/internals/pause_scaledjob/pause_scaledjob_test.go @@ -165,6 +165,9 @@ func TestScaler(t *testing.T) { testPause(t, kc, listOptions) testUnpause(t, kc, data, listOptions) + testPause(t, kc, listOptions) + testUnpauseWithBool(t, kc, data, listOptions) + // cleanup DeleteKubernetesResources(t, testNamespace, data, templates) } @@ -211,3 +214,16 @@ func testUnpause(t *testing.T, kc *kubernetes.Clientset, data templateData, list assert.True(t, WaitForJobByFilterCountUntilIteration(t, kc, testNamespace, expectedTarget, iterationCountLatter, 1, listOptions), "job count should be %d after %d iterations", expectedTarget, iterationCountLatter) } + +func testUnpauseWithBool(t *testing.T, kc *kubernetes.Clientset, data templateData, listOptions metav1.ListOptions) { + t.Log("--- test setting Paused annotation to false ---") + + _, err := ExecuteCommand(fmt.Sprintf("kubectl annotate scaledjob %s autoscaling.keda.sh/paused=false --namespace %s --overwrite=true", scaledJobName, testNamespace)) + assert.NoErrorf(t, err, "cannot execute command - %s", err) + + t.Log("job count increases from zero as job is no longer paused") + + expectedTarget := data.MetricThreshold + assert.True(t, WaitForJobByFilterCountUntilIteration(t, kc, testNamespace, expectedTarget, iterationCountLatter, 1, listOptions), + "job count should be %d after %d iterations", expectedTarget, iterationCountLatter) +} diff --git a/tests/internals/pause_scaledobject_explicitly/pause_scaledobject_explicitly_test.go b/tests/internals/pause_scaledobject_explicitly/pause_scaledobject_explicitly_test.go index a1e7523c5f9..cb5462c718f 100644 --- a/tests/internals/pause_scaledobject_explicitly/pause_scaledobject_explicitly_test.go +++ b/tests/internals/pause_scaledobject_explicitly/pause_scaledobject_explicitly_test.go @@ -113,21 +113,24 @@ func TestScaler(t *testing.T) { kc := GetKubernetesClient(t) data, templates := getTemplateData() - CreateKubernetesResources(t, kc, testNamespace, data, templates) - - // scaling to paused replica count - assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 0, 60, 1), - "replica count should be 0 after 1 minute") - - // test scaling - testPauseWhenScaleOut(t, kc) - testScaleOut(t, kc) - testPauseWhenScaleIn(t, kc) - testScaleIn(t, kc) - testBothPauseAnnotationActive(t, kc) - - // cleanup - DeleteKubernetesResources(t, testNamespace, data, templates) + unpausedMethods := [](func(assert.TestingT)){removeScaledObjectPausedAnnotation, setScaledObjectPausedAnnotationFalse} + + for _, unpauseMethod := range unpausedMethods { + CreateKubernetesResources(t, kc, testNamespace, data, templates) + + // scaling to paused replica count + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 0, 60, 1), + "replica count should be 0 after 1 minute") + // test scaling + testPauseWhenScaleOut(t, kc) + testScaleOut(t, kc, unpauseMethod) + testPauseWhenScaleIn(t, kc) + testScaleIn(t, kc, unpauseMethod) + testBothPauseAnnotationActive(t, kc) + + // cleanup + DeleteKubernetesResources(t, testNamespace, data, templates) + } } func getTemplateData() (templateData, []Template) { @@ -144,7 +147,7 @@ func getTemplateData() (templateData, []Template) { } func upsertScaledObjectPausedAnnotation(t assert.TestingT) { - _, err := ExecuteCommand(fmt.Sprintf("kubectl annotate scaledobject/%s -n %s autoscaling.keda.sh/paused='true' --overwrite", scaledObjectName, testNamespace)) + _, err := ExecuteCommand(fmt.Sprintf("kubectl annotate scaledobject/%s -n %s autoscaling.keda.sh/paused=true --overwrite", scaledObjectName, testNamespace)) assert.NoErrorf(t, err, "cannot execute command - %s", err) } @@ -152,6 +155,12 @@ func removeScaledObjectPausedAnnotation(t assert.TestingT) { _, err := ExecuteCommand(fmt.Sprintf("kubectl annotate scaledobject/%s -n %s autoscaling.keda.sh/paused- --overwrite", scaledObjectName, testNamespace)) assert.NoErrorf(t, err, "cannot execute command - %s", err) } + +func setScaledObjectPausedAnnotationFalse(t assert.TestingT) { + _, err := ExecuteCommand(fmt.Sprintf("kubectl annotate scaledobject/%s -n %s autoscaling.keda.sh/paused=false --overwrite", scaledObjectName, testNamespace)) + assert.NoErrorf(t, err, "cannot execute command - %s", err) +} + func upsertScaledObjectPausedReplicasAnnotation(t assert.TestingT, value int) { _, err := ExecuteCommand(fmt.Sprintf("kubectl annotate scaledobject/%s -n %s autoscaling.keda.sh/paused-replicas=%d --overwrite", scaledObjectName, testNamespace, value)) assert.NoErrorf(t, err, "cannot execute command - %s", err) @@ -178,10 +187,10 @@ func testPauseWhenScaleOut(t *testing.T, kc *kubernetes.Clientset) { AssertReplicaCountNotChangeDuringTimePeriod(t, kc, deploymentName, testNamespace, 0, 60) } -func testScaleOut(t *testing.T, kc *kubernetes.Clientset) { +func testScaleOut(t *testing.T, kc *kubernetes.Clientset, unpauseMethod func(assert.TestingT)) { t.Log("--- testing scale out ---") - removeScaledObjectPausedAnnotation(t) + unpauseMethod(t) KubernetesScaleDeployment(t, kc, monitoredDeploymentName, 5, testNamespace) assert.Truef(t, WaitForDeploymentReplicaReadyCount(t, kc, monitoredDeploymentName, testNamespace, 5, 60, testScaleOutWaitMin), @@ -209,10 +218,10 @@ func testPauseWhenScaleIn(t *testing.T, kc *kubernetes.Clientset) { "replica count should be 5 after %d minute(s)", testPauseAtNWaitMin) } -func testScaleIn(t *testing.T, kc *kubernetes.Clientset) { +func testScaleIn(t *testing.T, kc *kubernetes.Clientset, unpauseMethod func(assert.TestingT)) { t.Log("--- testing scale in ---") - removeScaledObjectPausedAnnotation(t) + unpauseMethod(t) assert.Truef(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 0, 60, testScaleInWaitMin), "replica count should be 0 after %d minutes", testScaleInWaitMin) }