diff --git a/pkg/scalers/aws_dynamodb_streams_scaler.go b/pkg/scalers/aws_dynamodb_streams_scaler.go index a8448a46408..cdcd8548320 100644 --- a/pkg/scalers/aws_dynamodb_streams_scaler.go +++ b/pkg/scalers/aws_dynamodb_streams_scaler.go @@ -3,7 +3,6 @@ package scalers import ( "context" "fmt" - "strconv" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/dynamodb" @@ -31,11 +30,11 @@ type awsDynamoDBStreamsScaler struct { } type awsDynamoDBStreamsMetadata struct { - targetShardCount int64 - activationTargetShardCount int64 - tableName string - awsRegion string - awsEndpoint string + TargetShardCount int64 `keda:"name=shardCount, order=triggerMetadata, default=2"` + ActivationTargetShardCount int64 `keda:"name=activationShardCount, order=triggerMetadata, default=0"` + TableName string `keda:"name=tableName, order=triggerMetadata"` + AwsRegion string `keda:"name=awsRegion, order=triggerMetadata"` + AwsEndpoint string `keda:"name=awsEndpoint, order=triggerMetadata, optional"` awsAuthorization awsutils.AuthorizationMetadata triggerIndex int } @@ -49,7 +48,7 @@ func NewAwsDynamoDBStreamsScaler(ctx context.Context, config *scalersconfig.Scal logger := InitializeLogger(config, "aws_dynamodb_streams_scaler") - meta, err := parseAwsDynamoDBStreamsMetadata(config, logger) + meta, err := parseAwsDynamoDBStreamsMetadata(config) if err != nil { return nil, fmt.Errorf("error parsing dynamodb stream metadata: %w", err) } @@ -58,7 +57,7 @@ func NewAwsDynamoDBStreamsScaler(ctx context.Context, config *scalersconfig.Scal if err != nil { return nil, fmt.Errorf("error when creating dynamodbstream client: %w", err) } - streamArn, err := getDynamoDBStreamsArn(ctx, dbClient, &meta.tableName) + streamArn, err := getDynamoDBStreamsArn(ctx, dbClient, &meta.TableName) if err != nil { return nil, fmt.Errorf("error dynamodb stream arn: %w", err) } @@ -74,43 +73,11 @@ func NewAwsDynamoDBStreamsScaler(ctx context.Context, config *scalersconfig.Scal }, nil } -func parseAwsDynamoDBStreamsMetadata(config *scalersconfig.ScalerConfig, logger logr.Logger) (*awsDynamoDBStreamsMetadata, error) { +func parseAwsDynamoDBStreamsMetadata(config *scalersconfig.ScalerConfig) (*awsDynamoDBStreamsMetadata, error) { meta := awsDynamoDBStreamsMetadata{} - meta.targetShardCount = defaultTargetDBStreamsShardCount - if val, ok := config.TriggerMetadata["awsRegion"]; ok && val != "" { - meta.awsRegion = val - } else { - return nil, fmt.Errorf("no awsRegion given") - } - - if val, ok := config.TriggerMetadata["awsEndpoint"]; ok { - meta.awsEndpoint = val - } - - if val, ok := config.TriggerMetadata["tableName"]; ok && val != "" { - meta.tableName = val - } else { - return nil, fmt.Errorf("no tableName given") - } - - if val, ok := config.TriggerMetadata["shardCount"]; ok && val != "" { - shardCount, err := strconv.ParseInt(val, 10, 64) - if err != nil { - meta.targetShardCount = defaultTargetDBStreamsShardCount - logger.Error(err, "error parsing dyanmodb stream metadata shardCount, using default %n", defaultTargetDBStreamsShardCount) - } else { - meta.targetShardCount = shardCount - } - } - if val, ok := config.TriggerMetadata["activationShardCount"]; ok && val != "" { - shardCount, err := strconv.ParseInt(val, 10, 64) - if err != nil { - meta.activationTargetShardCount = defaultActivationTargetDBStreamsShardCount - logger.Error(err, "error parsing dyanmodb stream metadata activationTargetShardCount, using default %n", defaultActivationTargetDBStreamsShardCount) - } else { - meta.activationTargetShardCount = shardCount - } + if err := config.TypedConfig(&meta); err != nil { + return nil, fmt.Errorf("error parsing dynamodb stream metadata: %w", err) } auth, err := awsutils.GetAwsAuthorization(config.TriggerUniqueKey, config.PodIdentity, config.TriggerMetadata, config.AuthParams, config.ResolvedEnv) @@ -125,18 +92,18 @@ func parseAwsDynamoDBStreamsMetadata(config *scalersconfig.ScalerConfig, logger } func createClientsForDynamoDBStreamsScaler(ctx context.Context, metadata *awsDynamoDBStreamsMetadata) (*dynamodb.Client, *dynamodbstreams.Client, error) { - cfg, err := awsutils.GetAwsConfig(ctx, metadata.awsRegion, metadata.awsAuthorization) + cfg, err := awsutils.GetAwsConfig(ctx, metadata.AwsRegion, metadata.awsAuthorization) if err != nil { return nil, nil, err } dbClient := dynamodb.NewFromConfig(*cfg, func(options *dynamodb.Options) { - if metadata.awsEndpoint != "" { - options.BaseEndpoint = aws.String(metadata.awsEndpoint) + if metadata.AwsEndpoint != "" { + options.BaseEndpoint = aws.String(metadata.AwsEndpoint) } }) dbStreamClient := dynamodbstreams.NewFromConfig(*cfg, func(options *dynamodbstreams.Options) { - if metadata.awsEndpoint != "" { - options.BaseEndpoint = aws.String(metadata.awsEndpoint) + if metadata.AwsEndpoint != "" { + options.BaseEndpoint = aws.String(metadata.AwsEndpoint) } }) @@ -176,9 +143,9 @@ func (s *awsDynamoDBStreamsScaler) Close(_ context.Context) error { func (s *awsDynamoDBStreamsScaler) GetMetricSpecForScaling(_ context.Context) []v2.MetricSpec { externalMetric := &v2.ExternalMetricSource{ Metric: v2.MetricIdentifier{ - Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, kedautil.NormalizeString(fmt.Sprintf("aws-dynamodb-streams-%s", s.metadata.tableName))), + Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, kedautil.NormalizeString(fmt.Sprintf("aws-dynamodb-streams-%s", s.metadata.TableName))), }, - Target: GetMetricTarget(s.metricType, s.metadata.targetShardCount), + Target: GetMetricTarget(s.metricType, s.metadata.TargetShardCount), } metricSpec := v2.MetricSpec{External: externalMetric, Type: externalMetricType} return []v2.MetricSpec{metricSpec} @@ -195,7 +162,7 @@ func (s *awsDynamoDBStreamsScaler) GetMetricsAndActivity(ctx context.Context, me metric := GenerateMetricInMili(metricName, float64(shardCount)) - return []external_metrics.ExternalMetricValue{metric}, shardCount > s.metadata.activationTargetShardCount, nil + return []external_metrics.ExternalMetricValue{metric}, shardCount > s.metadata.ActivationTargetShardCount, nil } // GetDynamoDBStreamShardCount Get DynamoDB Stream Shard Count diff --git a/pkg/scalers/aws_dynamodb_streams_scaler_test.go b/pkg/scalers/aws_dynamodb_streams_scaler_test.go index 5c87de8d87a..ce1232413a6 100644 --- a/pkg/scalers/aws_dynamodb_streams_scaler_test.go +++ b/pkg/scalers/aws_dynamodb_streams_scaler_test.go @@ -135,10 +135,10 @@ var testAwsDynamoDBStreamMetadata = []parseAwsDynamoDBStreamsMetadataTestData{ "awsRegion": testAWSDynamoDBStreamsRegion}, authParams: testAWSKinesisAuthentication, expected: &awsDynamoDBStreamsMetadata{ - targetShardCount: 2, - activationTargetShardCount: 1, - tableName: testAWSDynamoDBSmallTable, - awsRegion: testAWSDynamoDBStreamsRegion, + TargetShardCount: 2, + ActivationTargetShardCount: 1, + TableName: testAWSDynamoDBSmallTable, + AwsRegion: testAWSDynamoDBStreamsRegion, awsAuthorization: awsutils.AuthorizationMetadata{ AwsAccessKeyID: testAWSDynamoDBStreamsAccessKeyID, AwsSecretAccessKey: testAWSDynamoDBStreamsSecretAccessKey, @@ -159,11 +159,11 @@ var testAwsDynamoDBStreamMetadata = []parseAwsDynamoDBStreamsMetadataTestData{ "awsEndpoint": testAWSDynamoDBStreamsEndpoint}, authParams: testAWSKinesisAuthentication, expected: &awsDynamoDBStreamsMetadata{ - targetShardCount: 2, - activationTargetShardCount: 1, - tableName: testAWSDynamoDBSmallTable, - awsRegion: testAWSDynamoDBStreamsRegion, - awsEndpoint: testAWSDynamoDBStreamsEndpoint, + TargetShardCount: 2, + ActivationTargetShardCount: 1, + TableName: testAWSDynamoDBSmallTable, + AwsRegion: testAWSDynamoDBStreamsRegion, + AwsEndpoint: testAWSDynamoDBStreamsEndpoint, awsAuthorization: awsutils.AuthorizationMetadata{ AwsAccessKeyID: testAWSDynamoDBStreamsAccessKeyID, AwsSecretAccessKey: testAWSDynamoDBStreamsSecretAccessKey, @@ -204,10 +204,10 @@ var testAwsDynamoDBStreamMetadata = []parseAwsDynamoDBStreamsMetadataTestData{ "awsRegion": testAWSDynamoDBStreamsRegion}, authParams: testAWSKinesisAuthentication, expected: &awsDynamoDBStreamsMetadata{ - targetShardCount: defaultTargetDBStreamsShardCount, - activationTargetShardCount: defaultActivationTargetDBStreamsShardCount, - tableName: testAWSDynamoDBSmallTable, - awsRegion: testAWSDynamoDBStreamsRegion, + TargetShardCount: defaultTargetDBStreamsShardCount, + ActivationTargetShardCount: defaultActivationTargetDBStreamsShardCount, + TableName: testAWSDynamoDBSmallTable, + AwsRegion: testAWSDynamoDBStreamsRegion, awsAuthorization: awsutils.AuthorizationMetadata{ AwsAccessKeyID: testAWSDynamoDBStreamsAccessKeyID, AwsSecretAccessKey: testAWSDynamoDBStreamsSecretAccessKey, @@ -224,20 +224,10 @@ var testAwsDynamoDBStreamMetadata = []parseAwsDynamoDBStreamsMetadataTestData{ "tableName": testAWSDynamoDBSmallTable, "shardCount": "a", "awsRegion": testAWSDynamoDBStreamsRegion}, - authParams: testAWSKinesisAuthentication, - expected: &awsDynamoDBStreamsMetadata{ - targetShardCount: defaultTargetDBStreamsShardCount, - tableName: testAWSDynamoDBSmallTable, - awsRegion: testAWSDynamoDBStreamsRegion, - awsAuthorization: awsutils.AuthorizationMetadata{ - AwsAccessKeyID: testAWSDynamoDBStreamsAccessKeyID, - AwsSecretAccessKey: testAWSDynamoDBStreamsSecretAccessKey, - PodIdentityOwner: true, - }, - triggerIndex: 4, - }, - isError: false, - comment: "properly formed table name and region, wrong shard count", + authParams: testAWSKinesisAuthentication, + expected: &awsDynamoDBStreamsMetadata{}, + isError: true, + comment: "invalid value - should cause error", triggerIndex: 4, }, { @@ -278,9 +268,9 @@ var testAwsDynamoDBStreamMetadata = []parseAwsDynamoDBStreamsMetadataTestData{ "awsSessionToken": testAWSDynamoDBStreamsSessionToken, }, expected: &awsDynamoDBStreamsMetadata{ - targetShardCount: 2, - tableName: testAWSDynamoDBSmallTable, - awsRegion: testAWSDynamoDBStreamsRegion, + TargetShardCount: 2, + TableName: testAWSDynamoDBSmallTable, + AwsRegion: testAWSDynamoDBStreamsRegion, awsAuthorization: awsutils.AuthorizationMetadata{ AwsAccessKeyID: testAWSDynamoDBStreamsAccessKeyID, AwsSecretAccessKey: testAWSDynamoDBStreamsSecretAccessKey, @@ -330,9 +320,9 @@ var testAwsDynamoDBStreamMetadata = []parseAwsDynamoDBStreamsMetadataTestData{ "awsRoleArn": testAWSDynamoDBStreamsRoleArn, }, expected: &awsDynamoDBStreamsMetadata{ - targetShardCount: 2, - tableName: testAWSDynamoDBSmallTable, - awsRegion: testAWSDynamoDBStreamsRegion, + TargetShardCount: 2, + TableName: testAWSDynamoDBSmallTable, + AwsRegion: testAWSDynamoDBStreamsRegion, awsAuthorization: awsutils.AuthorizationMetadata{ AwsRoleArn: testAWSDynamoDBStreamsRoleArn, PodIdentityOwner: true, @@ -350,9 +340,9 @@ var testAwsDynamoDBStreamMetadata = []parseAwsDynamoDBStreamsMetadataTestData{ "identityOwner": "operator"}, authParams: map[string]string{}, expected: &awsDynamoDBStreamsMetadata{ - targetShardCount: 2, - tableName: testAWSDynamoDBSmallTable, - awsRegion: testAWSDynamoDBStreamsRegion, + TargetShardCount: 2, + TableName: testAWSDynamoDBSmallTable, + AwsRegion: testAWSDynamoDBStreamsRegion, awsAuthorization: awsutils.AuthorizationMetadata{ PodIdentityOwner: false, }, @@ -370,15 +360,15 @@ var awsDynamoDBStreamMetricIdentifiers = []awsDynamoDBStreamsMetricIdentifier{ } var awsDynamoDBStreamsGetMetricTestData = []*awsDynamoDBStreamsMetadata{ - {tableName: testAWSDynamoDBBigTable}, - {tableName: testAWSDynamoDBSmallTable}, - {tableName: testAWSDynamoDBErrorTable}, - {tableName: testAWSDynamoDBInvalidTable}, + {TableName: testAWSDynamoDBBigTable}, + {TableName: testAWSDynamoDBSmallTable}, + {TableName: testAWSDynamoDBErrorTable}, + {TableName: testAWSDynamoDBInvalidTable}, } func TestParseAwsDynamoDBStreamsMetadata(t *testing.T) { for _, testData := range testAwsDynamoDBStreamMetadata { - result, err := parseAwsDynamoDBStreamsMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadata, ResolvedEnv: testAwsDynamoDBStreamAuthentication, AuthParams: testData.authParams, TriggerIndex: testData.triggerIndex}, logr.Discard()) + result, err := parseAwsDynamoDBStreamsMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadata, ResolvedEnv: testAwsDynamoDBStreamAuthentication, AuthParams: testData.authParams, TriggerIndex: testData.triggerIndex}) if err != nil && !testData.isError { t.Errorf("Expected success because %s got error, %s", testData.comment, err) } @@ -395,11 +385,11 @@ func TestParseAwsDynamoDBStreamsMetadata(t *testing.T) { func TestAwsDynamoDBStreamsGetMetricSpecForScaling(t *testing.T) { for _, testData := range awsDynamoDBStreamMetricIdentifiers { ctx := context.Background() - meta, err := parseAwsDynamoDBStreamsMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, ResolvedEnv: testAwsDynamoDBStreamAuthentication, AuthParams: testData.metadataTestData.authParams, TriggerIndex: testData.triggerIndex}, logr.Discard()) + meta, err := parseAwsDynamoDBStreamsMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, ResolvedEnv: testAwsDynamoDBStreamAuthentication, AuthParams: testData.metadataTestData.authParams, TriggerIndex: testData.triggerIndex}) if err != nil { t.Fatal("Could not parse metadata:", err) } - streamArn, err := getDynamoDBStreamsArn(ctx, &mockAwsDynamoDB{}, &meta.tableName) + streamArn, err := getDynamoDBStreamsArn(ctx, &mockAwsDynamoDB{}, &meta.TableName) if err != nil { t.Fatal("Could not get dynamodb stream arn:", err) } @@ -418,12 +408,12 @@ func TestAwsDynamoDBStreamsScalerGetMetrics(t *testing.T) { var err error var streamArn *string ctx := context.Background() - streamArn, err = getDynamoDBStreamsArn(ctx, &mockAwsDynamoDB{}, &meta.tableName) + streamArn, err = getDynamoDBStreamsArn(ctx, &mockAwsDynamoDB{}, &meta.TableName) if err == nil { scaler := awsDynamoDBStreamsScaler{"", meta, streamArn, &mockAwsDynamoDBStreams{}, logr.Discard()} value, _, err = scaler.GetMetricsAndActivity(context.Background(), "MetricName") } - switch meta.tableName { + switch meta.TableName { case testAWSDynamoDBErrorTable: assert.Error(t, err, "expect error because of dynamodb stream api error") case testAWSDynamoDBInvalidTable: @@ -442,12 +432,12 @@ func TestAwsDynamoDBStreamsScalerIsActive(t *testing.T) { var err error var streamArn *string ctx := context.Background() - streamArn, err = getDynamoDBStreamsArn(ctx, &mockAwsDynamoDB{}, &meta.tableName) + streamArn, err = getDynamoDBStreamsArn(ctx, &mockAwsDynamoDB{}, &meta.TableName) if err == nil { scaler := awsDynamoDBStreamsScaler{"", meta, streamArn, &mockAwsDynamoDBStreams{}, logr.Discard()} _, value, err = scaler.GetMetricsAndActivity(context.Background(), "MetricName") } - switch meta.tableName { + switch meta.TableName { case testAWSDynamoDBErrorTable: assert.Error(t, err, "expect error because of dynamodb stream api error") case testAWSDynamoDBInvalidTable: