diff --git a/CHANGELOG.md b/CHANGELOG.md index 274f4e45080..1c5161e71b7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -53,6 +53,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio ### Fixes +- **AWS SQS Scaler**: Respect `scaleOnDelayed` value ([#4377](https://github.com/kedacore/keda/issue/4377)) - **AWS SQS Scaler**: Respect `scaleOnInFlight` value ([#4276](https://github.com/kedacore/keda/issue/4276)) ### Deprecations diff --git a/pkg/scalers/aws_sqs_queue_scaler.go b/pkg/scalers/aws_sqs_queue_scaler.go index be1c50cb87e..da99be9ddc0 100644 --- a/pkg/scalers/aws_sqs_queue_scaler.go +++ b/pkg/scalers/aws_sqs_queue_scaler.go @@ -21,14 +21,10 @@ const ( targetQueueLengthDefault = 5 activationTargetQueueLengthDefault = 0 defaultScaleOnInFlight = true + defaultScaleOnDelayed = false ) -var awsSqsQueueMetricNamesForScalingInFlight = []string{ - "ApproximateNumberOfMessages", - "ApproximateNumberOfMessagesNotVisible", -} - -var awsSqsQueueMetricNamesForNotScalingInFlight = []string{ +var awsSqsQueueMetricNamesForScaling = []string{ "ApproximateNumberOfMessages", } @@ -49,6 +45,7 @@ type awsSqsQueueMetadata struct { awsAuthorization awsAuthorizationMetadata scalerIndex int scaleOnInFlight bool + scaleOnDelayed bool awsSqsQueueMetricNames []string } @@ -78,6 +75,7 @@ func parseAwsSqsQueueMetadata(config *ScalerConfig, logger logr.Logger) (*awsSqs meta := awsSqsQueueMetadata{} meta.targetQueueLength = defaultTargetQueueLength meta.scaleOnInFlight = defaultScaleOnInFlight + meta.scaleOnDelayed = defaultScaleOnDelayed if val, ok := config.TriggerMetadata["queueLength"]; ok && val != "" { queueLength, err := strconv.ParseInt(val, 10, 64) @@ -109,10 +107,22 @@ func parseAwsSqsQueueMetadata(config *ScalerConfig, logger logr.Logger) (*awsSqs } } + if val, ok := config.TriggerMetadata["scaleOnDelayed"]; ok && val != "" { + scaleOnDelayed, err := strconv.ParseBool(val) + if err != nil { + meta.scaleOnDelayed = defaultScaleOnDelayed + logger.Error(err, "Error parsing SQS queue metadata scaleOnDelayed, using default %n", defaultScaleOnDelayed) + } else { + meta.scaleOnDelayed = scaleOnDelayed + } + } + + meta.awsSqsQueueMetricNames = awsSqsQueueMetricNamesForScaling if meta.scaleOnInFlight { - meta.awsSqsQueueMetricNames = awsSqsQueueMetricNamesForScalingInFlight - } else { - meta.awsSqsQueueMetricNames = awsSqsQueueMetricNamesForNotScalingInFlight + meta.awsSqsQueueMetricNames = append(meta.awsSqsQueueMetricNames, "ApproximateNumberOfMessagesNotVisible") + } + if meta.scaleOnDelayed { + meta.awsSqsQueueMetricNames = append(meta.awsSqsQueueMetricNames, "ApproximateNumberOfMessagesDelayed") } if val, ok := config.TriggerMetadata["queueURL"]; ok && val != "" { diff --git a/pkg/scalers/aws_sqs_queue_scaler_test.go b/pkg/scalers/aws_sqs_queue_scaler_test.go index fc7e4cdd971..ba5b7b566f0 100644 --- a/pkg/scalers/aws_sqs_queue_scaler_test.go +++ b/pkg/scalers/aws_sqs_queue_scaler_test.go @@ -65,6 +65,7 @@ func (m *mockSqs) GetQueueAttributes(input *sqs.GetQueueAttributesInput) (*sqs.G Attributes: map[string]*string{ "ApproximateNumberOfMessages": aws.String("NotInt"), "ApproximateNumberOfMessagesNotVisible": aws.String("NotInt"), + "ApproximateNumberOfMessagesDelayed": aws.String("NotInt"), }, }, nil } @@ -73,6 +74,7 @@ func (m *mockSqs) GetQueueAttributes(input *sqs.GetQueueAttributesInput) (*sqs.G Attributes: map[string]*string{ "ApproximateNumberOfMessages": aws.String("200"), "ApproximateNumberOfMessagesNotVisible": aws.String("100"), + "ApproximateNumberOfMessagesDelayed": aws.String("400"), }, }, nil } @@ -275,6 +277,24 @@ var testAWSSQSMetadata = []parseAWSSQSMetadataTestData{ testAWSSQSEmptyResolvedEnv, false, "properly formed queue and region"}, + {map[string]string{ + "queueURL": testAWSSimpleQueueURL, + "queueLength": "1", + "awsRegion": "eu-west-1", + "scaleOnDelayed": "true"}, + testAWSSQSAuthentication, + testAWSSQSEmptyResolvedEnv, + false, + "properly formed queue and region"}, + {map[string]string{ + "queueURL": testAWSSimpleQueueURL, + "queueLength": "1", + "awsRegion": "eu-west-1", + "scaleOnDelayed": "false"}, + testAWSSQSAuthentication, + testAWSSQSEmptyResolvedEnv, + false, + "properly formed queue and region"}, {map[string]string{ "queueURLFromEnv": "QUEUE_URL", "queueLength": "1", @@ -326,6 +346,24 @@ var awsSQSGetMetricTestData = []*parseAWSSQSMetadataTestData{ testAWSSQSEmptyResolvedEnv, false, "not error with scaleOnInFlight enabled"}, + {map[string]string{ + "queueURL": testAWSSQSProperQueueURL, + "queueLength": "1", + "awsRegion": "eu-west-1", + "scaleOnDelayed": "false"}, + testAWSSQSAuthentication, + testAWSSQSEmptyResolvedEnv, + false, + "not error with scaleOnDelayed disabled"}, + {map[string]string{ + "queueURL": testAWSSQSProperQueueURL, + "queueLength": "1", + "awsRegion": "eu-west-1", + "scaleOnDelayed": "true"}, + testAWSSQSAuthentication, + testAWSSQSEmptyResolvedEnv, + false, + "not error with scaleOnDelayed enabled"}, {map[string]string{ "queueURL": testAWSSQSErrorQueueURL, "queueLength": "1", @@ -390,9 +428,13 @@ func TestAWSSQSScalerGetMetrics(t *testing.T) { case testAWSSQSBadDataQueueURL: assert.Error(t, err, "expect error because of bad data return from sqs") default: - if meta.scaleOnInFlight { + if meta.scaleOnInFlight && meta.scaleOnDelayed { + assert.EqualValues(t, int64(700.0), value[0].Value.Value()) + } else if meta.scaleOnInFlight && !meta.scaleOnDelayed { assert.EqualValues(t, int64(300.0), value[0].Value.Value()) - } else { + } else if !meta.scaleOnInFlight && meta.scaleOnDelayed { + assert.EqualValues(t, int64(600.0), value[0].Value.Value()) + } else if !meta.scaleOnInFlight && !meta.scaleOnDelayed { assert.EqualValues(t, int64(200.0), value[0].Value.Value()) } }