diff --git a/CHANGELOG.md b/CHANGELOG.md index 963665fd4b7..4712c1720cd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -59,6 +59,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio ### New +- **General**: Add Fallback option `behavior` for dynamic fallback calculation ([#6450](https://github.com/kedacore/keda/issues/6450)) - **General**: Enable OpenSSF Scorecard to enhance security practices across the project ([#5913](https://github.com/kedacore/keda/issues/5913)) - **General**: Introduce new NSQ scaler ([#3281](https://github.com/kedacore/keda/issues/3281)) - **General**: Operator flag to control patching of webhook resources certificates ([#6184](https://github.com/kedacore/keda/issues/6184)) diff --git a/apis/keda/v1alpha1/scaledobject_types.go b/apis/keda/v1alpha1/scaledobject_types.go index 2ffbcc4932e..4d45fd2789a 100644 --- a/apis/keda/v1alpha1/scaledobject_types.go +++ b/apis/keda/v1alpha1/scaledobject_types.go @@ -56,6 +56,9 @@ const ScaledObjectTransferHpaOwnershipAnnotation = "scaledobject.keda.sh/transfe const ValidationsHpaOwnershipAnnotation = "validations.keda.sh/hpa-ownership" const PausedReplicasAnnotation = "autoscaling.keda.sh/paused-replicas" const PausedAnnotation = "autoscaling.keda.sh/paused" +const FallbackBehaviorStatic = "Static" +const FallbackBehaviorCurrentReplicasIfHigher = "CurrentReplicasIfHigher" +const FallbackBehaviorCurrentReplicasIfLower = "CurrentReplicasIfLower" // HealthStatus is the status for a ScaledObject's health type HealthStatus struct { @@ -109,6 +112,10 @@ type ScaledObjectSpec struct { type Fallback struct { FailureThreshold int32 `json:"failureThreshold"` Replicas int32 `json:"replicas"` + // +optional + // +kubebuilder:default=Static + // +kubebuilder:validation:Enum=Static;CurrentReplicasIfHigher;CurrentReplicasIfLower + Behavior string `json:"behavior,omitempty"` } // AdvancedConfig specifies advance scaling options diff --git a/config/crd/bases/keda.sh_scaledobjects.yaml b/config/crd/bases/keda.sh_scaledobjects.yaml index 5d8c9ae234d..c25e2361763 100644 --- a/config/crd/bases/keda.sh_scaledobjects.yaml +++ b/config/crd/bases/keda.sh_scaledobjects.yaml @@ -225,6 +225,13 @@ spec: fallback: description: Fallback is the spec for fallback options properties: + behavior: + default: Static + enum: + - Static + - CurrentReplicasIfHigher + - CurrentReplicasIfLower + type: string failureThreshold: format: int32 type: integer diff --git a/pkg/fallback/fallback.go b/pkg/fallback/fallback.go index 74469f45c2d..05f31299717 100644 --- a/pkg/fallback/fallback.go +++ b/pkg/fallback/fallback.go @@ -24,11 +24,13 @@ import ( v2 "k8s.io/api/autoscaling/v2" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/scale" "k8s.io/metrics/pkg/apis/external_metrics" runtimeclient "sigs.k8s.io/controller-runtime/pkg/client" logf "sigs.k8s.io/controller-runtime/pkg/log" kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1" + "github.com/kedacore/keda/v2/pkg/scaling/resolver" ) var log = logf.Log.WithName("fallback") @@ -46,7 +48,7 @@ func isFallbackEnabled(scaledObject *kedav1alpha1.ScaledObject, metricSpec v2.Me return true } -func GetMetricsWithFallback(ctx context.Context, client runtimeclient.Client, metrics []external_metrics.ExternalMetricValue, suppressedError error, metricName string, scaledObject *kedav1alpha1.ScaledObject, metricSpec v2.MetricSpec) ([]external_metrics.ExternalMetricValue, bool, error) { +func GetMetricsWithFallback(ctx context.Context, client runtimeclient.Client, scaleClient scale.ScalesGetter, metrics []external_metrics.ExternalMetricValue, suppressedError error, metricName string, scaledObject *kedav1alpha1.ScaledObject, metricSpec v2.MetricSpec) ([]external_metrics.ExternalMetricValue, bool, error) { status := scaledObject.Status.DeepCopy() initHealthStatus(status) @@ -76,7 +78,11 @@ func GetMetricsWithFallback(ctx context.Context, client runtimeclient.Client, me log.Info("Failed to validate ScaledObject Spec. Please check that parameters are positive integers", "scaledObject.Namespace", scaledObject.Namespace, "scaledObject.Name", scaledObject.Name) return nil, false, suppressedError case *healthStatus.NumberOfFailures > scaledObject.Spec.Fallback.FailureThreshold: - return doFallback(scaledObject, metricSpec, metricName, suppressedError), true, nil + currentReplicas, err := resolver.GetCurrentReplicas(ctx, client, scaleClient, scaledObject) + if err != nil { + return nil, false, suppressedError + } + return doFallback(scaledObject, metricSpec, metricName, currentReplicas, suppressedError), true, nil default: return nil, false, suppressedError } @@ -103,8 +109,32 @@ func HasValidFallback(scaledObject *kedav1alpha1.ScaledObject) bool { modifierChecking } -func doFallback(scaledObject *kedav1alpha1.ScaledObject, metricSpec v2.MetricSpec, metricName string, suppressedError error) []external_metrics.ExternalMetricValue { - replicas := int64(scaledObject.Spec.Fallback.Replicas) +func doFallback(scaledObject *kedav1alpha1.ScaledObject, metricSpec v2.MetricSpec, metricName string, currentReplicas int32, suppressedError error) []external_metrics.ExternalMetricValue { + fallbackBehavior := scaledObject.Spec.Fallback.Behavior + fallbackReplicas := int64(scaledObject.Spec.Fallback.Replicas) + var replicas int64 + + switch fallbackBehavior { + case kedav1alpha1.FallbackBehaviorStatic: + replicas = fallbackReplicas + case kedav1alpha1.FallbackBehaviorCurrentReplicasIfHigher: + currentReplicasCount := int64(currentReplicas) + if currentReplicasCount > fallbackReplicas { + replicas = currentReplicasCount + } else { + replicas = fallbackReplicas + } + case kedav1alpha1.FallbackBehaviorCurrentReplicasIfLower: + currentReplicasCount := int64(currentReplicas) + if currentReplicasCount < fallbackReplicas { + replicas = currentReplicasCount + } else { + replicas = fallbackReplicas + } + default: + replicas = fallbackReplicas + } + var normalisationValue int64 if !scaledObject.IsUsingModifiers() { normalisationValue = int64(metricSpec.External.Target.AverageValue.AsApproximateFloat64()) @@ -121,7 +151,13 @@ func doFallback(scaledObject *kedav1alpha1.ScaledObject, metricSpec v2.MetricSpe } fallbackMetrics := []external_metrics.ExternalMetricValue{metric} - log.Info("Suppressing error, falling back to fallback.replicas", "scaledObject.Namespace", scaledObject.Namespace, "scaledObject.Name", scaledObject.Name, "suppressedError", suppressedError, "fallback.replicas", replicas) + log.Info("Suppressing error, using fallback metrics", + "scaledObject.Namespace", scaledObject.Namespace, + "scaledObject.Name", scaledObject.Name, + "suppressedError", suppressedError, + "fallback.behavior", fallbackBehavior, + "fallback.replicas", fallbackReplicas, + "workload.currentReplicas", currentReplicas) return fallbackMetrics } diff --git a/pkg/fallback/fallback_test.go b/pkg/fallback/fallback_test.go index e111c75dca3..6c923465d1e 100644 --- a/pkg/fallback/fallback_test.go +++ b/pkg/fallback/fallback_test.go @@ -26,13 +26,17 @@ import ( . "github.com/onsi/gomega" "github.com/onsi/gomega/types" "go.uber.org/mock/gomock" + appsv1 "k8s.io/api/apps/v1" + autoscalingv1 "k8s.io/api/autoscaling/v1" v2 "k8s.io/api/autoscaling/v2" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/metrics/pkg/apis/external_metrics" + "k8s.io/utils/ptr" kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1" "github.com/kedacore/keda/v2/pkg/mock/mock_client" + "github.com/kedacore/keda/v2/pkg/mock/mock_scale" mock_scalers "github.com/kedacore/keda/v2/pkg/mock/mock_scaler" ) @@ -40,21 +44,22 @@ const metricName = "some_metric_name" func TestFallback(t *testing.T) { RegisterFailHandler(Fail) - RunSpecs(t, "Controller Suite") } var _ = Describe("fallback", func() { var ( - client *mock_client.MockClient - scaler *mock_scalers.MockScaler - ctrl *gomock.Controller + client *mock_client.MockClient + scaler *mock_scalers.MockScaler + scaleClient *mock_scale.MockScalesGetter + ctrl *gomock.Controller ) BeforeEach(func() { ctrl = gomock.NewController(GinkgoT()) client = mock_client.NewMockClient(ctrl) scaler = mock_scalers.NewMockScaler(ctrl) + scaleClient = mock_scale.NewMockScalesGetter(ctrl) }) AfterEach(func() { @@ -62,14 +67,13 @@ var _ = Describe("fallback", func() { }) It("should return the expected metric when fallback is disabled", func() { - expectedMetricValue := float64(5) primeGetMetrics(scaler, expectedMetricValue) so := buildScaledObject(nil, nil) metricSpec := createMetricSpec(3) metrics, _, err := scaler.GetMetricsAndActivity(context.Background(), metricName) - metrics, _, err = GetMetricsWithFallback(context.Background(), client, metrics, err, metricName, so, metricSpec) + metrics, _, err = GetMetricsWithFallback(context.Background(), client, scaleClient, metrics, err, metricName, so, metricSpec) Expect(err).ToNot(HaveOccurred()) value := metrics[0].Value.AsApproximateFloat64() @@ -100,7 +104,7 @@ var _ = Describe("fallback", func() { expectStatusPatch(ctrl, client) metrics, _, err := scaler.GetMetricsAndActivity(context.Background(), metricName) - metrics, _, err = GetMetricsWithFallback(context.Background(), client, metrics, err, metricName, so, metricSpec) + metrics, _, err = GetMetricsWithFallback(context.Background(), client, scaleClient, metrics, err, metricName, so, metricSpec) Expect(err).ToNot(HaveOccurred()) value := metrics[0].Value.AsApproximateFloat64() @@ -129,7 +133,7 @@ var _ = Describe("fallback", func() { expectNoStatusPatch(ctrl) metrics, _, err := scaler.GetMetricsAndActivity(context.Background(), metricName) - metrics, _, err = GetMetricsWithFallback(context.Background(), client, metrics, err, metricName, so, metricSpec) + metrics, _, err = GetMetricsWithFallback(context.Background(), client, scaleClient, metrics, err, metricName, so, metricSpec) Expect(err).ToNot(HaveOccurred()) value := metrics[0].Value.AsApproximateFloat64() @@ -145,7 +149,7 @@ var _ = Describe("fallback", func() { expectNoStatusPatch(ctrl) metrics, _, err := scaler.GetMetricsAndActivity(context.Background(), metricName) - _, _, err = GetMetricsWithFallback(context.Background(), client, metrics, err, metricName, so, metricSpec) + _, _, err = GetMetricsWithFallback(context.Background(), client, scaleClient, metrics, err, metricName, so, metricSpec) Expect(err).ShouldNot(BeNil()) Expect(err.Error()).Should(Equal("some error")) @@ -174,7 +178,7 @@ var _ = Describe("fallback", func() { expectStatusPatch(ctrl, client) metrics, _, err := scaler.GetMetricsAndActivity(context.Background(), metricName) - _, _, err = GetMetricsWithFallback(context.Background(), client, metrics, err, metricName, so, metricSpec) + _, _, err = GetMetricsWithFallback(context.Background(), client, scaleClient, metrics, err, metricName, so, metricSpec) Expect(err).ShouldNot(BeNil()) Expect(err.Error()).Should(Equal("some error")) @@ -203,8 +207,10 @@ var _ = Describe("fallback", func() { metricSpec := createMetricSpec(10) expectStatusPatch(ctrl, client) + mockScaleAndDeployment(ctrl, client, scaleClient, 5) + metrics, _, err := scaler.GetMetricsAndActivity(context.Background(), metricName) - metrics, _, err = GetMetricsWithFallback(context.Background(), client, metrics, err, metricName, so, metricSpec) + metrics, _, err = GetMetricsWithFallback(context.Background(), client, scaleClient, metrics, err, metricName, so, metricSpec) Expect(err).ToNot(HaveOccurred()) value := metrics[0].Value.AsApproximateFloat64() @@ -259,8 +265,10 @@ var _ = Describe("fallback", func() { statusWriter.EXPECT().Patch(gomock.Any(), gomock.Any(), gomock.Any()).Return(errors.New("some error")) client.EXPECT().Status().Return(statusWriter) + mockScaleAndDeployment(ctrl, client, scaleClient, 5) + metrics, _, err := scaler.GetMetricsAndActivity(context.Background(), metricName) - metrics, _, err = GetMetricsWithFallback(context.Background(), client, metrics, err, metricName, so, metricSpec) + metrics, _, err = GetMetricsWithFallback(context.Background(), client, scaleClient, metrics, err, metricName, so, metricSpec) Expect(err).ToNot(HaveOccurred()) value := metrics[0].Value.AsApproximateFloat64() @@ -289,7 +297,7 @@ var _ = Describe("fallback", func() { metricSpec := createMetricSpec(10) metrics, _, err := scaler.GetMetricsAndActivity(context.Background(), metricName) - _, _, err = GetMetricsWithFallback(context.Background(), client, metrics, err, metricName, so, metricSpec) + _, _, err = GetMetricsWithFallback(context.Background(), client, scaleClient, metrics, err, metricName, so, metricSpec) Expect(err).ShouldNot(BeNil()) Expect(err.Error()).Should(Equal("some error")) @@ -322,8 +330,11 @@ var _ = Describe("fallback", func() { metricSpec := createMetricSpec(10) expectStatusPatch(ctrl, client) + mockScaleAndDeployment(ctrl, client, scaleClient, 5) + metrics, _, err := scaler.GetMetricsAndActivity(context.Background(), metricName) - _, _, err = GetMetricsWithFallback(context.Background(), client, metrics, err, metricName, so, metricSpec) + _, _, err = GetMetricsWithFallback(context.Background(), client, scaleClient, metrics, err, metricName, so, metricSpec) + Expect(err).ToNot(HaveOccurred()) condition := so.Status.Conditions.GetFallbackCondition() Expect(condition.IsTrue()).Should(BeTrue()) @@ -356,14 +367,123 @@ var _ = Describe("fallback", func() { metricSpec := createMetricSpec(10) metrics, _, err := scaler.GetMetricsAndActivity(context.Background(), metricName) - _, _, err = GetMetricsWithFallback(context.Background(), client, metrics, err, metricName, so, metricSpec) + _, _, err = GetMetricsWithFallback(context.Background(), client, scaleClient, metrics, err, metricName, so, metricSpec) + Expect(err).ShouldNot(BeNil()) Expect(err.Error()).Should(Equal("some error")) condition := so.Status.Conditions.GetFallbackCondition() Expect(condition.IsTrue()).Should(BeFalse()) }) + + It("should use fallback replicas when current replicas is lower when behavior is 'CurrentReplicasIfHigher'", func() { + scaler.EXPECT().GetMetricsAndActivity(gomock.Any(), gomock.Eq(metricName)).Return(nil, false, errors.New("some error")) + startingNumberOfFailures := int32(3) + behavior := "CurrentReplicasIfHigher" + + so := buildScaledObject( + &kedav1alpha1.Fallback{ + FailureThreshold: int32(3), + Replicas: int32(10), + Behavior: behavior, + }, + &kedav1alpha1.ScaledObjectStatus{ + Health: map[string]kedav1alpha1.HealthStatus{ + metricName: { + NumberOfFailures: &startingNumberOfFailures, + Status: kedav1alpha1.HealthStatusHappy, + }, + }, + }, + ) + metricSpec := createMetricSpec(10) + expectStatusPatch(ctrl, client) + + mockScaleAndDeployment(ctrl, client, scaleClient, 5) + + metrics, _, err := scaler.GetMetricsAndActivity(context.Background(), metricName) + metrics, _, err = GetMetricsWithFallback(context.Background(), client, scaleClient, metrics, err, metricName, so, metricSpec) + + Expect(err).ToNot(HaveOccurred()) + value := metrics[0].Value.AsApproximateFloat64() + expectedValue := float64(100) // 10 replicas * 10 target value + Expect(value).Should(Equal(expectedValue)) + }) + + It("should ignore current replicas when behavior is 'Static'", func() { + scaler.EXPECT().GetMetricsAndActivity(gomock.Any(), gomock.Eq(metricName)).Return(nil, false, errors.New("some error")) + startingNumberOfFailures := int32(3) + behavior := "Static" + + so := buildScaledObject( + &kedav1alpha1.Fallback{ + FailureThreshold: int32(3), + Replicas: int32(10), + Behavior: behavior, + }, + &kedav1alpha1.ScaledObjectStatus{ + Health: map[string]kedav1alpha1.HealthStatus{ + metricName: { + NumberOfFailures: &startingNumberOfFailures, + Status: kedav1alpha1.HealthStatusHappy, + }, + }, + }, + ) + metricSpec := createMetricSpec(10) + expectStatusPatch(ctrl, client) + + mockScaleAndDeployment(ctrl, client, scaleClient, 15) + + metrics, _, err := scaler.GetMetricsAndActivity(context.Background(), metricName) + metrics, _, err = GetMetricsWithFallback(context.Background(), client, scaleClient, metrics, err, metricName, so, metricSpec) + + Expect(err).ToNot(HaveOccurred()) + value := metrics[0].Value.AsApproximateFloat64() + expectedValue := float64(100) // 10 replicas * 10 target value, ignoring current 15 + Expect(value).Should(Equal(expectedValue)) + }) }) +// Helper functions +func mockScaleAndDeployment( + ctrl *gomock.Controller, + client *mock_client.MockClient, + scaleClient *mock_scale.MockScalesGetter, + replicas int32, +) { + // Mock Scale interface + mockScaleInterface := mock_scale.NewMockScaleInterface(ctrl) + scale := &autoscalingv1.Scale{ + ObjectMeta: metav1.ObjectMeta{ + Name: "myapp", + Namespace: "default", + }, + Spec: autoscalingv1.ScaleSpec{ + Replicas: replicas, + }, + } + + scaleClient.EXPECT().Scales(gomock.Eq("default")).Return(mockScaleInterface).MinTimes(1).MaxTimes(2) + mockScaleInterface.EXPECT().Get( + gomock.Any(), + gomock.Eq("apps"), + gomock.Eq("deployment"), + gomock.Eq("myapp"), + ).Return(scale, nil).MinTimes(1).MaxTimes(2) + + // Mock Deployment lookup + deployment := &appsv1.Deployment{ + Spec: appsv1.DeploymentSpec{ + Replicas: ptr.To[int32](replicas), + }, + } + client.EXPECT().Get( + gomock.Any(), + gomock.Any(), + gomock.Any(), + ).Return(nil).SetArg(2, *deployment) +} + func haveFailureAndStatus(numberOfFailures int, status kedav1alpha1.HealthStatusType) types.GomegaMatcher { return &healthStatusMatcher{numberOfFailures: numberOfFailures, status: status} } @@ -431,10 +551,19 @@ func buildScaledObject(fallbackConfig *kedav1alpha1.Fallback, status *kedav1alph }, Fallback: fallbackConfig, }, + Status: kedav1alpha1.ScaledObjectStatus{ + ScaleTargetGVKR: &kedav1alpha1.GroupVersionKindResource{ + Group: "apps", + Kind: "Deployment", + }, + }, } if status != nil { + // Preserve ScaleTargetGVKR when overwriting status + gvkr := scaledObject.Status.ScaleTargetGVKR scaledObject.Status = *status + scaledObject.Status.ScaleTargetGVKR = gvkr } scaledObject.Status.Conditions = *kedav1alpha1.GetInitializedConditions() diff --git a/pkg/scaling/executor/scale_scaledobjects.go b/pkg/scaling/executor/scale_scaledobjects.go index 8d8f803ddea..cdef0844e68 100644 --- a/pkg/scaling/executor/scale_scaledobjects.go +++ b/pkg/scaling/executor/scale_scaledobjects.go @@ -23,14 +23,13 @@ import ( "time" "github.com/go-logr/logr" - appsv1 "k8s.io/api/apps/v1" autoscalingv1 "k8s.io/api/autoscaling/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "sigs.k8s.io/controller-runtime/pkg/client" kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1" "github.com/kedacore/keda/v2/pkg/eventreason" + "github.com/kedacore/keda/v2/pkg/scaling/resolver" kedastatus "github.com/kedacore/keda/v2/pkg/status" ) @@ -38,37 +37,13 @@ func (e *scaleExecutor) RequestScale(ctx context.Context, scaledObject *kedav1al logger := e.logger.WithValues("scaledobject.Name", scaledObject.Name, "scaledObject.Namespace", scaledObject.Namespace, "scaleTarget.Name", scaledObject.Spec.ScaleTargetRef.Name) - // Get the current replica count. As a special case, Deployments and StatefulSets fetch directly from the object so they can use the informer cache - // to reduce API calls. Everything else uses the scale subresource. var currentScale *autoscalingv1.Scale var currentReplicas int32 - targetName := scaledObject.Spec.ScaleTargetRef.Name - targetGVKR := scaledObject.Status.ScaleTargetGVKR - switch { - case targetGVKR.Group == "apps" && targetGVKR.Kind == "Deployment": - deployment := &appsv1.Deployment{} - err := e.client.Get(ctx, client.ObjectKey{Name: targetName, Namespace: scaledObject.Namespace}, deployment) - if err != nil { - logger.Error(err, "Error getting information on the current Scale (ie. replicas count) on the scaleTarget") - return - } - currentReplicas = *deployment.Spec.Replicas - case targetGVKR.Group == "apps" && targetGVKR.Kind == "StatefulSet": - statefulSet := &appsv1.StatefulSet{} - err := e.client.Get(ctx, client.ObjectKey{Name: targetName, Namespace: scaledObject.Namespace}, statefulSet) - if err != nil { - logger.Error(err, "Error getting information on the current Scale (ie. replicas count) on the scaleTarget") - return - } - currentReplicas = *statefulSet.Spec.Replicas - default: - var err error - currentScale, err = e.getScaleTargetScale(ctx, scaledObject) - if err != nil { - logger.Error(err, "Error getting information on the current Scale (ie. replicas count) on the scaleTarget") - return - } - currentReplicas = currentScale.Spec.Replicas + // Get the current replica count + currentReplicas, err := resolver.GetCurrentReplicas(ctx, e.client, e.scaleClient, scaledObject) + if err != nil { + logger.Error(err, "Error getting information on the current Scale") + return } // if the ScaledObject's triggers aren't in the error state, // but ScaledObject.Status.ReadyCondition is set not set to 'true' -> set it back to 'true' @@ -231,11 +206,25 @@ func (e *scaleExecutor) RequestScale(ctx context.Context, scaledObject *kedav1al } func (e *scaleExecutor) doFallbackScaling(ctx context.Context, scaledObject *kedav1alpha1.ScaledObject, currentScale *autoscalingv1.Scale, logger logr.Logger, currentReplicas int32) { - _, err := e.updateScaleOnScaleTarget(ctx, scaledObject, currentScale, scaledObject.Spec.Fallback.Replicas) + fallbackBehavior := scaledObject.Spec.Fallback.Behavior + fallbackReplicas := scaledObject.Spec.Fallback.Replicas + + switch fallbackBehavior { + case kedav1alpha1.FallbackBehaviorStatic: + // no specifc action needed + case kedav1alpha1.FallbackBehaviorCurrentReplicasIfHigher: + if currentReplicas > fallbackReplicas { + fallbackReplicas = currentReplicas + } + case kedav1alpha1.FallbackBehaviorCurrentReplicasIfLower: + if currentReplicas < fallbackReplicas { + fallbackReplicas = currentReplicas + } + } + + _, err := e.updateScaleOnScaleTarget(ctx, scaledObject, currentScale, fallbackReplicas) if err == nil { - logger.Info("Successfully set ScaleTarget replicas count to ScaledObject fallback.replicas", - "Original Replicas Count", currentReplicas, - "New Replicas Count", scaledObject.Spec.Fallback.Replicas) + logger.Info("Successfully set ScaleTarget replicas count to calculated fallback replicas", "Original Replicas Count", currentReplicas, "New Replicas Count", fallbackReplicas, "Behavior", fallbackBehavior) } if e := e.setFallbackCondition(ctx, logger, scaledObject, metav1.ConditionTrue, "FallbackExists", "At least one trigger is falling back on this scaled object"); e != nil { logger.Error(e, "Error setting fallback condition") diff --git a/pkg/scaling/executor/scale_scaledobjects_test.go b/pkg/scaling/executor/scale_scaledobjects_test.go index 1a426c9cde7..b3fc4607c1f 100644 --- a/pkg/scaling/executor/scale_scaledobjects_test.go +++ b/pkg/scaling/executor/scale_scaledobjects_test.go @@ -522,3 +522,271 @@ func TestEventWitTriggerInfo(t *testing.T) { eventstring := <-recorder.Events assert.Equal(t, "Normal KEDAScaleTargetActivated Scaled namespace/name from 2 to 5, triggered by testTrigger", eventstring) } + +// Behavior is 'CurrentReplicasIfHigher' and current replicas is higher than fallback replicas +func TestBehaviorCurrentReplicasIfHigherWithCurrentReplicasIsHigher(t *testing.T) { + ctrl := gomock.NewController(t) + client := mock_client.NewMockClient(ctrl) + recorder := record.NewFakeRecorder(1) + mockScaleClient := mock_scale.NewMockScalesGetter(ctrl) + mockScaleInterface := mock_scale.NewMockScaleInterface(ctrl) + statusWriter := mock_client.NewMockStatusWriter(ctrl) + + scaleExecutor := NewScaleExecutor(client, mockScaleClient, nil, recorder) + + behavior := "CurrentReplicasIfHigher" + scaledObject := v1alpha1.ScaledObject{ + ObjectMeta: v1.ObjectMeta{ + Name: "name", + Namespace: "namespace", + }, + Spec: v1alpha1.ScaledObjectSpec{ + ScaleTargetRef: &v1alpha1.ScaleTarget{ + Name: "name", + }, + Fallback: &v1alpha1.Fallback{ + FailureThreshold: 3, + Replicas: 5, + Behavior: behavior, + }, + }, + Status: v1alpha1.ScaledObjectStatus{ + ScaleTargetGVKR: &v1alpha1.GroupVersionKindResource{ + Group: "apps", + Kind: "Deployment", + }, + }, + } + + scaledObject.Status.Conditions = *v1alpha1.GetInitializedConditions() + + // Current replicas is higher than fallback replicas + currentReplicas := int32(8) + + client.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any()).SetArg(2, appsv1.Deployment{ + Spec: appsv1.DeploymentSpec{ + Replicas: ¤tReplicas, + }, + }) + + scale := &autoscalingv1.Scale{ + Spec: autoscalingv1.ScaleSpec{ + Replicas: currentReplicas, + }, + } + + mockScaleClient.EXPECT().Scales(gomock.Any()).Return(mockScaleInterface).Times(2) + mockScaleInterface.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(scale, nil) + mockScaleInterface.EXPECT().Update(gomock.Any(), gomock.Any(), gomock.Eq(scale), gomock.Any()) + + client.EXPECT().Status().Times(2).Return(statusWriter) + statusWriter.EXPECT().Patch(gomock.Any(), gomock.Any(), gomock.Any()).Times(2) + + scaleExecutor.RequestScale(context.Background(), &scaledObject, false, true, &ScaleExecutorOptions{}) + + // Should use current replicas as it's higher than fallback replicas + assert.Equal(t, int32(8), scale.Spec.Replicas) + condition := scaledObject.Status.Conditions.GetFallbackCondition() + assert.Equal(t, true, condition.IsTrue()) +} + +// Behavior is 'CurrentReplicasIfLower' and current replicas is higher than fallback replicas +func TestBehaviorCurrentReplicasIfLowerWithCurrentReplicasIsHigher(t *testing.T) { + ctrl := gomock.NewController(t) + client := mock_client.NewMockClient(ctrl) + recorder := record.NewFakeRecorder(1) + mockScaleClient := mock_scale.NewMockScalesGetter(ctrl) + mockScaleInterface := mock_scale.NewMockScaleInterface(ctrl) + statusWriter := mock_client.NewMockStatusWriter(ctrl) + + scaleExecutor := NewScaleExecutor(client, mockScaleClient, nil, recorder) + + behavior := "CurrentReplicasIfLower" + scaledObject := v1alpha1.ScaledObject{ + ObjectMeta: v1.ObjectMeta{ + Name: "name", + Namespace: "namespace", + }, + Spec: v1alpha1.ScaledObjectSpec{ + ScaleTargetRef: &v1alpha1.ScaleTarget{ + Name: "name", + }, + Fallback: &v1alpha1.Fallback{ + FailureThreshold: 3, + Replicas: 5, + Behavior: behavior, + }, + }, + Status: v1alpha1.ScaledObjectStatus{ + ScaleTargetGVKR: &v1alpha1.GroupVersionKindResource{ + Group: "apps", + Kind: "Deployment", + }, + }, + } + + scaledObject.Status.Conditions = *v1alpha1.GetInitializedConditions() + + // Current replicas is higher than fallback replicas + currentReplicas := int32(8) + + client.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any()).SetArg(2, appsv1.Deployment{ + Spec: appsv1.DeploymentSpec{ + Replicas: ¤tReplicas, + }, + }) + + scale := &autoscalingv1.Scale{ + Spec: autoscalingv1.ScaleSpec{ + Replicas: currentReplicas, + }, + } + + mockScaleClient.EXPECT().Scales(gomock.Any()).Return(mockScaleInterface).Times(2) + mockScaleInterface.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(scale, nil) + mockScaleInterface.EXPECT().Update(gomock.Any(), gomock.Any(), gomock.Eq(scale), gomock.Any()) + + client.EXPECT().Status().Times(2).Return(statusWriter) + statusWriter.EXPECT().Patch(gomock.Any(), gomock.Any(), gomock.Any()).Times(2) + + scaleExecutor.RequestScale(context.Background(), &scaledObject, false, true, &ScaleExecutorOptions{}) + + // Should use fallback replicas as it's higher than current replicas + assert.Equal(t, int32(5), scale.Spec.Replicas) + condition := scaledObject.Status.Conditions.GetFallbackCondition() + assert.Equal(t, true, condition.IsTrue()) +} + +// Behavior is 'CurrentReplicasIfHigher' and current replicas is lower than fallback replicas +func TestBehaviorCurrentReplicasIfHigherWithCurrentReplicasisLower(t *testing.T) { + ctrl := gomock.NewController(t) + client := mock_client.NewMockClient(ctrl) + recorder := record.NewFakeRecorder(1) + mockScaleClient := mock_scale.NewMockScalesGetter(ctrl) + mockScaleInterface := mock_scale.NewMockScaleInterface(ctrl) + statusWriter := mock_client.NewMockStatusWriter(ctrl) + + scaleExecutor := NewScaleExecutor(client, mockScaleClient, nil, recorder) + + behavior := "CurrentReplicasIfHigher" + scaledObject := v1alpha1.ScaledObject{ + ObjectMeta: v1.ObjectMeta{ + Name: "name", + Namespace: "namespace", + }, + Spec: v1alpha1.ScaledObjectSpec{ + ScaleTargetRef: &v1alpha1.ScaleTarget{ + Name: "name", + }, + Fallback: &v1alpha1.Fallback{ + FailureThreshold: 3, + Replicas: 5, + Behavior: behavior, + }, + }, + Status: v1alpha1.ScaledObjectStatus{ + ScaleTargetGVKR: &v1alpha1.GroupVersionKindResource{ + Group: "apps", + Kind: "Deployment", + }, + }, + } + + scaledObject.Status.Conditions = *v1alpha1.GetInitializedConditions() + + // Current replicas is lower than fallback replicas + currentReplicas := int32(2) + + client.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any()).SetArg(2, appsv1.Deployment{ + Spec: appsv1.DeploymentSpec{ + Replicas: ¤tReplicas, + }, + }) + + scale := &autoscalingv1.Scale{ + Spec: autoscalingv1.ScaleSpec{ + Replicas: currentReplicas, + }, + } + + mockScaleClient.EXPECT().Scales(gomock.Any()).Return(mockScaleInterface).Times(2) + mockScaleInterface.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(scale, nil) + mockScaleInterface.EXPECT().Update(gomock.Any(), gomock.Any(), gomock.Eq(scale), gomock.Any()) + + client.EXPECT().Status().Times(2).Return(statusWriter) + statusWriter.EXPECT().Patch(gomock.Any(), gomock.Any(), gomock.Any()).Times(2) + + scaleExecutor.RequestScale(context.Background(), &scaledObject, false, true, &ScaleExecutorOptions{}) + + // Should use fallback replicas as it's higher than current replicas + assert.Equal(t, int32(5), scale.Spec.Replicas) + condition := scaledObject.Status.Conditions.GetFallbackCondition() + assert.Equal(t, true, condition.IsTrue()) +} + +// Behavior is Static and current replicas is higher than fallback replicas +func TestBehaviorStaticWithCurrentReplicasisHigher(t *testing.T) { + ctrl := gomock.NewController(t) + client := mock_client.NewMockClient(ctrl) + recorder := record.NewFakeRecorder(1) + mockScaleClient := mock_scale.NewMockScalesGetter(ctrl) + mockScaleInterface := mock_scale.NewMockScaleInterface(ctrl) + statusWriter := mock_client.NewMockStatusWriter(ctrl) + + scaleExecutor := NewScaleExecutor(client, mockScaleClient, nil, recorder) + + behavior := "Static" + scaledObject := v1alpha1.ScaledObject{ + ObjectMeta: v1.ObjectMeta{ + Name: "name", + Namespace: "namespace", + }, + Spec: v1alpha1.ScaledObjectSpec{ + ScaleTargetRef: &v1alpha1.ScaleTarget{ + Name: "name", + }, + Fallback: &v1alpha1.Fallback{ + FailureThreshold: 3, + Replicas: 5, + Behavior: behavior, + }, + }, + Status: v1alpha1.ScaledObjectStatus{ + ScaleTargetGVKR: &v1alpha1.GroupVersionKindResource{ + Group: "apps", + Kind: "Deployment", + }, + }, + } + + scaledObject.Status.Conditions = *v1alpha1.GetInitializedConditions() + + // Current replicas is higher than fallback replicas + currentReplicas := int32(8) + + client.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any()).SetArg(2, appsv1.Deployment{ + Spec: appsv1.DeploymentSpec{ + Replicas: ¤tReplicas, + }, + }) + + scale := &autoscalingv1.Scale{ + Spec: autoscalingv1.ScaleSpec{ + Replicas: currentReplicas, + }, + } + + mockScaleClient.EXPECT().Scales(gomock.Any()).Return(mockScaleInterface).Times(2) + mockScaleInterface.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(scale, nil) + mockScaleInterface.EXPECT().Update(gomock.Any(), gomock.Any(), gomock.Eq(scale), gomock.Any()) + + client.EXPECT().Status().Times(2).Return(statusWriter) + statusWriter.EXPECT().Patch(gomock.Any(), gomock.Any(), gomock.Any()).Times(2) + + scaleExecutor.RequestScale(context.Background(), &scaledObject, false, true, &ScaleExecutorOptions{}) + + // Should use fallback replicas even though current replicas is higher + assert.Equal(t, int32(5), scale.Spec.Replicas) + condition := scaledObject.Status.Conditions.GetFallbackCondition() + assert.Equal(t, true, condition.IsTrue()) +} diff --git a/pkg/scaling/resolver/scale_resolvers.go b/pkg/scaling/resolver/scale_resolvers.go index 6ca40672fc6..4a57002c4fa 100644 --- a/pkg/scaling/resolver/scale_resolvers.go +++ b/pkg/scaling/resolver/scale_resolvers.go @@ -26,9 +26,11 @@ import ( "github.com/go-logr/logr" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/types" corev1listers "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/scale" "knative.dev/pkg/apis/duck" duckv1 "knative.dev/pkg/apis/duck/v1" "sigs.k8s.io/controller-runtime/pkg/client" @@ -45,6 +47,9 @@ const ( boolTrue = true boolFalse = false defaultServiceAccount = "default" + appsGroup = "apps" + deploymentKind = "Deployment" + statefulSetKind = "StatefulSet" ) var ( @@ -615,3 +620,38 @@ func resolveServiceAccountAnnotation(ctx context.Context, client client.Client, } return value, nil } + +// GetCurrentReplicas returns the current replica count for a ScaledObject +func GetCurrentReplicas(ctx context.Context, client client.Client, scaleClient scale.ScalesGetter, scaledObject *kedav1alpha1.ScaledObject) (int32, error) { + targetName := scaledObject.Spec.ScaleTargetRef.Name + targetGVKR := scaledObject.Status.ScaleTargetGVKR + + logger := log.WithValues("scaledObject.Namespace", scaledObject.Namespace, + "scaledObject.Name", scaledObject.Name, + "resource", fmt.Sprintf("%s/%s", targetGVKR.Group, targetGVKR.Kind), + "name", targetName) + + switch { + case targetGVKR.Group == appsGroup && targetGVKR.Kind == deploymentKind: + deployment := &appsv1.Deployment{} + if err := client.Get(ctx, types.NamespacedName{Name: targetName, Namespace: scaledObject.Namespace}, deployment); err != nil { + logger.Error(err, "target deployment doesn't exist") + return 0, err + } + return *deployment.Spec.Replicas, nil + case targetGVKR.Group == appsGroup && targetGVKR.Kind == statefulSetKind: + statefulSet := &appsv1.StatefulSet{} + if err := client.Get(ctx, types.NamespacedName{Name: targetName, Namespace: scaledObject.Namespace}, statefulSet); err != nil { + logger.Error(err, "target statefulset doesn't exist") + return 0, err + } + return *statefulSet.Spec.Replicas, nil + default: + scale, err := scaleClient.Scales(scaledObject.Namespace).Get(ctx, targetGVKR.GroupResource(), targetName, metav1.GetOptions{}) + if err != nil { + logger.Error(err, "error getting scale subresource") + return 0, err + } + return scale.Spec.Replicas, nil + } +} diff --git a/pkg/scaling/scale_handler.go b/pkg/scaling/scale_handler.go index a8c50a057d0..f67496b288c 100644 --- a/pkg/scaling/scale_handler.go +++ b/pkg/scaling/scale_handler.go @@ -66,6 +66,7 @@ type ScaleHandler interface { type scaleHandler struct { client client.Client + scaleClient scale.ScalesGetter scaleLoopContexts *sync.Map scaleExecutor executor.ScaleExecutor globalHTTPTimeout time.Duration @@ -547,7 +548,7 @@ func (h *scaleHandler) GetScaledObjectMetrics(ctx context.Context, scaledObjectN metricTriggerPairList[key] = value } // check if we need to set a fallback - metrics, fallbackActive, err := fallback.GetMetricsWithFallback(ctx, h.client, result.metrics, result.err, result.metricName, scaledObject, result.metricSpec) + metrics, fallbackActive, err := fallback.GetMetricsWithFallback(ctx, h.client, h.scaleClient, result.metrics, result.err, result.metricName, scaledObject, result.metricSpec) if err != nil { isScalerError = true logger.Error(err, "error getting metric for trigger", "trigger", result.triggerName)