Skip to content

Commit

Permalink
Refactor aws dynamodb scaler (kedacore#5961)
Browse files Browse the repository at this point in the history
Signed-off-by: dttung2905 <[email protected]>
Signed-off-by: Jorge Turrado <[email protected]>
  • Loading branch information
dttung2905 authored and JorTurFer committed Oct 7, 2024
1 parent 1f144f2 commit 67b4ef4
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 143 deletions.
130 changes: 34 additions & 96 deletions pkg/scalers/aws_dynamodb_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"encoding/json"
"errors"
"fmt"
"strconv"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
Expand All @@ -28,18 +27,18 @@ type awsDynamoDBScaler struct {
}

type awsDynamoDBMetadata struct {
tableName string
awsRegion string
awsEndpoint string
keyConditionExpression string
expressionAttributeNames map[string]string
expressionAttributeValues map[string]types.AttributeValue
indexName string
targetValue int64
activationTargetValue int64
awsAuthorization awsutils.AuthorizationMetadata
expressionAttributeValues map[string]types.AttributeValue
expressionAttributeNames map[string]string
triggerIndex int
metricName string
TableName string `keda:"name=tableName, order=triggerMetadata"`
AwsRegion string `keda:"name=awsRegion, order=triggerMetadata"`
AwsEndpoint string `keda:"name=awsEndpoint, order=triggerMetadata, optional"`
KeyConditionExpression string `keda:"name=keyConditionExpression, order=triggerMetadata"`
IndexName string `keda:"name=indexName, order=triggerMetadata, optional"`
TargetValue int64 `keda:"name=targetValue, order=triggerMetadata, optional, default=-1"`
ActivationTargetValue int64 `keda:"name=activationTargetValue, order=triggerMetadata, default=0"`
}

func NewAwsDynamoDBScaler(ctx context.Context, config *scalersconfig.ScalerConfig) (Scaler, error) {
Expand All @@ -65,63 +64,24 @@ func NewAwsDynamoDBScaler(ctx context.Context, config *scalersconfig.ScalerConfi
}

var (
// ErrAwsDynamoNoTableName is returned when "tableName" is missing from the config.
ErrAwsDynamoNoTableName = errors.New("no tableName given")

// ErrAwsDynamoNoAwsRegion is returned when "awsRegion" is missing from the config.
ErrAwsDynamoNoAwsRegion = errors.New("no awsRegion given")

// ErrAwsDynamoNoKeyConditionExpression is returned when "keyConditionExpression" is missing from the config.
ErrAwsDynamoNoKeyConditionExpression = errors.New("no keyConditionExpression given")

// ErrAwsDynamoEmptyExpressionAttributeNames is returned when "expressionAttributeNames" is empty.
ErrAwsDynamoEmptyExpressionAttributeNames = errors.New("empty map")

// ErrAwsDynamoInvalidExpressionAttributeNames is returned when "expressionAttributeNames" is an invalid JSON.
ErrAwsDynamoInvalidExpressionAttributeNames = errors.New("invalid expressionAttributeNames")

// ErrAwsDynamoNoExpressionAttributeNames is returned when "expressionAttributeNames" is missing from the config.
ErrAwsDynamoNoExpressionAttributeNames = errors.New("no expressionAttributeNames given")

ErrAwsDynamoNoTargetValue = errors.New("no targetValue given")
// ErrAwsDynamoInvalidExpressionAttributeValues is returned when "expressionAttributeNames" is missing an invalid JSON.
ErrAwsDynamoInvalidExpressionAttributeValues = errors.New("invalid expressionAttributeValues")

// ErrAwsDynamoNoExpressionAttributeValues is returned when "expressionAttributeValues" is missing from the config.
ErrAwsDynamoNoExpressionAttributeValues = errors.New("no expressionAttributeValues given")

// ErrAwsDynamoNoTargetValue is returned when "targetValue" is missing from the config.
ErrAwsDynamoNoTargetValue = errors.New("no targetValue given")
// ErrAwsDynamoInvalidExpressionAttributeNames is returned when "expressionAttributeNames" is an invalid JSON.
ErrAwsDynamoInvalidExpressionAttributeNames = errors.New("invalid expressionAttributeNames")
// ErrAwsDynamoEmptyExpressionAttributeNames is returned when "expressionAttributeNames" is empty.
ErrAwsDynamoEmptyExpressionAttributeNames = errors.New("empty map")
// ErrAwsDynamoNoExpressionAttributeNames is returned when "expressionAttributeNames" is missing from the config.
ErrAwsDynamoNoExpressionAttributeNames = errors.New("no expressionAttributeNames given")
)

func parseAwsDynamoDBMetadata(config *scalersconfig.ScalerConfig) (*awsDynamoDBMetadata, error) {
meta := awsDynamoDBMetadata{}

if val, ok := config.TriggerMetadata["tableName"]; ok && val != "" {
meta.tableName = val
} else {
return nil, ErrAwsDynamoNoTableName
}

if val, ok := config.TriggerMetadata["awsRegion"]; ok && val != "" {
meta.awsRegion = val
} else {
return nil, ErrAwsDynamoNoAwsRegion
}

if val, ok := config.TriggerMetadata["awsEndpoint"]; ok {
meta.awsEndpoint = val
}

if val, ok := config.TriggerMetadata["indexName"]; ok {
meta.indexName = val
}

if val, ok := config.TriggerMetadata["keyConditionExpression"]; ok && val != "" {
meta.keyConditionExpression = val
} else {
return nil, ErrAwsDynamoNoKeyConditionExpression
meta := &awsDynamoDBMetadata{}
if err := config.TypedConfig(meta); err != nil {
return nil, fmt.Errorf("error parsing DynamoDb metadata: %w", err)
}

if val, ok := config.TriggerMetadata["expressionAttributeNames"]; ok && val != "" {
names, err := json2Map(val)

Expand All @@ -133,7 +93,6 @@ func parseAwsDynamoDBMetadata(config *scalersconfig.ScalerConfig) (*awsDynamoDBM
} else {
return nil, ErrAwsDynamoNoExpressionAttributeNames
}

if val, ok := config.TriggerMetadata["expressionAttributeValues"]; ok && val != "" {
values, err := json2DynamoMap(val)

Expand All @@ -145,31 +104,10 @@ func parseAwsDynamoDBMetadata(config *scalersconfig.ScalerConfig) (*awsDynamoDBM
} else {
return nil, ErrAwsDynamoNoExpressionAttributeValues
}

if val, ok := config.TriggerMetadata["targetValue"]; ok && val != "" {
n, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return nil, fmt.Errorf("error parsing metadata targetValue: %w", err)
}

meta.targetValue = n
} else {
if config.AsMetricSource {
meta.targetValue = 0
} else {
return nil, ErrAwsDynamoNoTargetValue
}
}

if val, ok := config.TriggerMetadata["activationTargetValue"]; ok && val != "" {
n, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return nil, fmt.Errorf("error parsing metadata activationTargetValue: %w", err)
}

meta.activationTargetValue = n
} else {
meta.activationTargetValue = 0
if meta.TargetValue == -1 && config.AsMetricSource {
meta.TargetValue = 0
} else if meta.TargetValue == -1 && !config.AsMetricSource {
return nil, ErrAwsDynamoNoTargetValue
}

auth, err := awsutils.GetAwsAuthorization(config.TriggerUniqueKey, config.PodIdentity, config.TriggerMetadata, config.AuthParams, config.ResolvedEnv)
Expand All @@ -181,20 +119,20 @@ func parseAwsDynamoDBMetadata(config *scalersconfig.ScalerConfig) (*awsDynamoDBM
meta.triggerIndex = config.TriggerIndex

meta.metricName = GenerateMetricNameWithIndex(config.TriggerIndex,
kedautil.NormalizeString(fmt.Sprintf("aws-dynamodb-%s", meta.tableName)))
kedautil.NormalizeString(fmt.Sprintf("aws-dynamodb-%s", meta.TableName)))

return &meta, nil
return meta, nil
}

func createDynamoDBClient(ctx context.Context, metadata *awsDynamoDBMetadata) (*dynamodb.Client, error) {
cfg, err := awsutils.GetAwsConfig(ctx, metadata.awsRegion, metadata.awsAuthorization)
cfg, err := awsutils.GetAwsConfig(ctx, metadata.AwsRegion, metadata.awsAuthorization)
if err != nil {
return nil, err
}

return dynamodb.NewFromConfig(*cfg, func(options *dynamodb.Options) {
if metadata.awsEndpoint != "" {
options.BaseEndpoint = aws.String(metadata.awsEndpoint)
if metadata.AwsEndpoint != "" {
options.BaseEndpoint = aws.String(metadata.AwsEndpoint)
}
}), nil
}
Expand All @@ -208,15 +146,15 @@ func (s *awsDynamoDBScaler) GetMetricsAndActivity(ctx context.Context, metricNam

metric := GenerateMetricInMili(metricName, metricValue)

return []external_metrics.ExternalMetricValue{metric}, metricValue > float64(s.metadata.activationTargetValue), nil
return []external_metrics.ExternalMetricValue{metric}, metricValue > float64(s.metadata.ActivationTargetValue), nil
}

func (s *awsDynamoDBScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec {
externalMetric := &v2.ExternalMetricSource{
Metric: v2.MetricIdentifier{
Name: s.metadata.metricName,
},
Target: GetMetricTarget(s.metricType, s.metadata.targetValue),
Target: GetMetricTarget(s.metricType, s.metadata.TargetValue),
}
metricSpec := v2.MetricSpec{External: externalMetric, Type: externalMetricType}

Expand All @@ -232,14 +170,14 @@ func (s *awsDynamoDBScaler) Close(context.Context) error {

func (s *awsDynamoDBScaler) GetQueryMetrics(ctx context.Context) (float64, error) {
dimensions := dynamodb.QueryInput{
TableName: aws.String(s.metadata.tableName),
KeyConditionExpression: aws.String(s.metadata.keyConditionExpression),
TableName: aws.String(s.metadata.TableName),
KeyConditionExpression: aws.String(s.metadata.KeyConditionExpression),
ExpressionAttributeNames: s.metadata.expressionAttributeNames,
ExpressionAttributeValues: s.metadata.expressionAttributeValues,
}

if s.metadata.indexName != "" {
dimensions.IndexName = aws.String(s.metadata.indexName)
if s.metadata.IndexName != "" {
dimensions.IndexName = aws.String(s.metadata.IndexName)
}

res, err := s.dbClient.Query(ctx, &dimensions)
Expand Down
Loading

0 comments on commit 67b4ef4

Please sign in to comment.