Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor aws dynamodb scaler #5961

Merged
merged 5 commits into from
Sep 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 34 additions & 96 deletions pkg/scalers/aws_dynamodb_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"encoding/json"
"errors"
"fmt"
"strconv"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
Expand All @@ -28,18 +27,18 @@ type awsDynamoDBScaler struct {
}

type awsDynamoDBMetadata struct {
tableName string
awsRegion string
awsEndpoint string
keyConditionExpression string
expressionAttributeNames map[string]string
expressionAttributeValues map[string]types.AttributeValue
indexName string
targetValue int64
activationTargetValue int64
awsAuthorization awsutils.AuthorizationMetadata
expressionAttributeValues map[string]types.AttributeValue
expressionAttributeNames map[string]string
triggerIndex int
metricName string
TableName string `keda:"name=tableName, order=triggerMetadata"`
AwsRegion string `keda:"name=awsRegion, order=triggerMetadata"`
AwsEndpoint string `keda:"name=awsEndpoint, order=triggerMetadata, optional"`
KeyConditionExpression string `keda:"name=keyConditionExpression, order=triggerMetadata"`
IndexName string `keda:"name=indexName, order=triggerMetadata, optional"`
TargetValue int64 `keda:"name=targetValue, order=triggerMetadata, optional, default=-1"`
ActivationTargetValue int64 `keda:"name=activationTargetValue, order=triggerMetadata, default=0"`
}

func NewAwsDynamoDBScaler(ctx context.Context, config *scalersconfig.ScalerConfig) (Scaler, error) {
Expand All @@ -65,63 +64,24 @@ func NewAwsDynamoDBScaler(ctx context.Context, config *scalersconfig.ScalerConfi
}

var (
// ErrAwsDynamoNoTableName is returned when "tableName" is missing from the config.
ErrAwsDynamoNoTableName = errors.New("no tableName given")

// ErrAwsDynamoNoAwsRegion is returned when "awsRegion" is missing from the config.
ErrAwsDynamoNoAwsRegion = errors.New("no awsRegion given")

// ErrAwsDynamoNoKeyConditionExpression is returned when "keyConditionExpression" is missing from the config.
ErrAwsDynamoNoKeyConditionExpression = errors.New("no keyConditionExpression given")

// ErrAwsDynamoEmptyExpressionAttributeNames is returned when "expressionAttributeNames" is empty.
ErrAwsDynamoEmptyExpressionAttributeNames = errors.New("empty map")

// ErrAwsDynamoInvalidExpressionAttributeNames is returned when "expressionAttributeNames" is an invalid JSON.
ErrAwsDynamoInvalidExpressionAttributeNames = errors.New("invalid expressionAttributeNames")

// ErrAwsDynamoNoExpressionAttributeNames is returned when "expressionAttributeNames" is missing from the config.
ErrAwsDynamoNoExpressionAttributeNames = errors.New("no expressionAttributeNames given")

ErrAwsDynamoNoTargetValue = errors.New("no targetValue given")
// ErrAwsDynamoInvalidExpressionAttributeValues is returned when "expressionAttributeNames" is missing an invalid JSON.
ErrAwsDynamoInvalidExpressionAttributeValues = errors.New("invalid expressionAttributeValues")

// ErrAwsDynamoNoExpressionAttributeValues is returned when "expressionAttributeValues" is missing from the config.
ErrAwsDynamoNoExpressionAttributeValues = errors.New("no expressionAttributeValues given")

// ErrAwsDynamoNoTargetValue is returned when "targetValue" is missing from the config.
ErrAwsDynamoNoTargetValue = errors.New("no targetValue given")
// ErrAwsDynamoInvalidExpressionAttributeNames is returned when "expressionAttributeNames" is an invalid JSON.
ErrAwsDynamoInvalidExpressionAttributeNames = errors.New("invalid expressionAttributeNames")
// ErrAwsDynamoEmptyExpressionAttributeNames is returned when "expressionAttributeNames" is empty.
ErrAwsDynamoEmptyExpressionAttributeNames = errors.New("empty map")
// ErrAwsDynamoNoExpressionAttributeNames is returned when "expressionAttributeNames" is missing from the config.
ErrAwsDynamoNoExpressionAttributeNames = errors.New("no expressionAttributeNames given")
)

func parseAwsDynamoDBMetadata(config *scalersconfig.ScalerConfig) (*awsDynamoDBMetadata, error) {
meta := awsDynamoDBMetadata{}

if val, ok := config.TriggerMetadata["tableName"]; ok && val != "" {
meta.tableName = val
} else {
return nil, ErrAwsDynamoNoTableName
}

if val, ok := config.TriggerMetadata["awsRegion"]; ok && val != "" {
meta.awsRegion = val
} else {
return nil, ErrAwsDynamoNoAwsRegion
}

if val, ok := config.TriggerMetadata["awsEndpoint"]; ok {
meta.awsEndpoint = val
}

if val, ok := config.TriggerMetadata["indexName"]; ok {
meta.indexName = val
}

if val, ok := config.TriggerMetadata["keyConditionExpression"]; ok && val != "" {
meta.keyConditionExpression = val
} else {
return nil, ErrAwsDynamoNoKeyConditionExpression
meta := &awsDynamoDBMetadata{}
if err := config.TypedConfig(meta); err != nil {
return nil, fmt.Errorf("error parsing DynamoDb metadata: %w", err)
}

if val, ok := config.TriggerMetadata["expressionAttributeNames"]; ok && val != "" {
names, err := json2Map(val)

Expand All @@ -133,7 +93,6 @@ func parseAwsDynamoDBMetadata(config *scalersconfig.ScalerConfig) (*awsDynamoDBM
} else {
return nil, ErrAwsDynamoNoExpressionAttributeNames
}

if val, ok := config.TriggerMetadata["expressionAttributeValues"]; ok && val != "" {
values, err := json2DynamoMap(val)

Expand All @@ -145,31 +104,10 @@ func parseAwsDynamoDBMetadata(config *scalersconfig.ScalerConfig) (*awsDynamoDBM
} else {
return nil, ErrAwsDynamoNoExpressionAttributeValues
}

if val, ok := config.TriggerMetadata["targetValue"]; ok && val != "" {
n, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return nil, fmt.Errorf("error parsing metadata targetValue: %w", err)
}

meta.targetValue = n
} else {
if config.AsMetricSource {
meta.targetValue = 0
} else {
return nil, ErrAwsDynamoNoTargetValue
}
}

if val, ok := config.TriggerMetadata["activationTargetValue"]; ok && val != "" {
n, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return nil, fmt.Errorf("error parsing metadata activationTargetValue: %w", err)
}

meta.activationTargetValue = n
} else {
meta.activationTargetValue = 0
if meta.TargetValue == -1 && config.AsMetricSource {
meta.TargetValue = 0
} else if meta.TargetValue == -1 && !config.AsMetricSource {
return nil, ErrAwsDynamoNoTargetValue
}

auth, err := awsutils.GetAwsAuthorization(config.TriggerUniqueKey, config.PodIdentity, config.TriggerMetadata, config.AuthParams, config.ResolvedEnv)
Expand All @@ -181,20 +119,20 @@ func parseAwsDynamoDBMetadata(config *scalersconfig.ScalerConfig) (*awsDynamoDBM
meta.triggerIndex = config.TriggerIndex

meta.metricName = GenerateMetricNameWithIndex(config.TriggerIndex,
kedautil.NormalizeString(fmt.Sprintf("aws-dynamodb-%s", meta.tableName)))
kedautil.NormalizeString(fmt.Sprintf("aws-dynamodb-%s", meta.TableName)))

return &meta, nil
return meta, nil
}

func createDynamoDBClient(ctx context.Context, metadata *awsDynamoDBMetadata) (*dynamodb.Client, error) {
cfg, err := awsutils.GetAwsConfig(ctx, metadata.awsRegion, metadata.awsAuthorization)
cfg, err := awsutils.GetAwsConfig(ctx, metadata.AwsRegion, metadata.awsAuthorization)
if err != nil {
return nil, err
}

return dynamodb.NewFromConfig(*cfg, func(options *dynamodb.Options) {
if metadata.awsEndpoint != "" {
options.BaseEndpoint = aws.String(metadata.awsEndpoint)
if metadata.AwsEndpoint != "" {
options.BaseEndpoint = aws.String(metadata.AwsEndpoint)
}
}), nil
}
Expand All @@ -208,15 +146,15 @@ func (s *awsDynamoDBScaler) GetMetricsAndActivity(ctx context.Context, metricNam

metric := GenerateMetricInMili(metricName, metricValue)

return []external_metrics.ExternalMetricValue{metric}, metricValue > float64(s.metadata.activationTargetValue), nil
return []external_metrics.ExternalMetricValue{metric}, metricValue > float64(s.metadata.ActivationTargetValue), nil
}

func (s *awsDynamoDBScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec {
externalMetric := &v2.ExternalMetricSource{
Metric: v2.MetricIdentifier{
Name: s.metadata.metricName,
},
Target: GetMetricTarget(s.metricType, s.metadata.targetValue),
Target: GetMetricTarget(s.metricType, s.metadata.TargetValue),
}
metricSpec := v2.MetricSpec{External: externalMetric, Type: externalMetricType}

Expand All @@ -232,14 +170,14 @@ func (s *awsDynamoDBScaler) Close(context.Context) error {

func (s *awsDynamoDBScaler) GetQueryMetrics(ctx context.Context) (float64, error) {
dimensions := dynamodb.QueryInput{
TableName: aws.String(s.metadata.tableName),
KeyConditionExpression: aws.String(s.metadata.keyConditionExpression),
TableName: aws.String(s.metadata.TableName),
KeyConditionExpression: aws.String(s.metadata.KeyConditionExpression),
ExpressionAttributeNames: s.metadata.expressionAttributeNames,
ExpressionAttributeValues: s.metadata.expressionAttributeValues,
}

if s.metadata.indexName != "" {
dimensions.IndexName = aws.String(s.metadata.indexName)
if s.metadata.IndexName != "" {
dimensions.IndexName = aws.String(s.metadata.IndexName)
}

res, err := s.dbClient.Query(ctx, &dimensions)
Expand Down
Loading
Loading