Skip to content

Commit

Permalink
Support custom AWS endpoint
Browse files Browse the repository at this point in the history
Signed-off-by: Phan Duc <[email protected]>
  • Loading branch information
yuyuvn committed Oct 28, 2022
1 parent 83f882e commit 4306cfa
Show file tree
Hide file tree
Showing 10 changed files with 147 additions and 13 deletions.
14 changes: 11 additions & 3 deletions pkg/scalers/aws_cloudwatch_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ type awsCloudwatchMetadata struct {
metricStatPeriod int64
metricEndTimeOffset int64

awsRegion string
awsRegion string
awsEndpoint string

awsAuthorization awsAuthorizationMetadata

Expand Down Expand Up @@ -113,7 +114,8 @@ func getFloatMetadataValue(metadata map[string]string, key string, required bool

func createCloudwatchClient(metadata *awsCloudwatchMetadata) *cloudwatch.CloudWatch {
sess := session.Must(session.NewSession(&aws.Config{
Region: aws.String(metadata.awsRegion),
Region: aws.String(metadata.awsRegion),
Endpoint: aws.String(metadata.awsEndpoint),
}))

var cloudwatchClient *cloudwatch.CloudWatch
Expand All @@ -126,11 +128,13 @@ func createCloudwatchClient(metadata *awsCloudwatchMetadata) *cloudwatch.CloudWa

cloudwatchClient = cloudwatch.New(sess, &aws.Config{
Region: aws.String(metadata.awsRegion),
Endpoint: aws.String(metadata.awsEndpoint),
Credentials: creds,
})
} else {
cloudwatchClient = cloudwatch.New(sess, &aws.Config{
Region: aws.String(metadata.awsRegion),
Region: aws.String(metadata.awsRegion),
Endpoint: aws.String(metadata.awsEndpoint),
})
}

Expand Down Expand Up @@ -234,6 +238,10 @@ func parseAwsCloudwatchMetadata(config *ScalerConfig) (*awsCloudwatchMetadata, e
return nil, fmt.Errorf("no awsRegion given")
}

if val, ok := config.TriggerMetadata["awsEndpoint"]; ok {
meta.awsEndpoint = val
}

meta.awsAuthorization, err = getAwsAuthorization(config.AuthParams, config.TriggerMetadata, config.ResolvedEnv)
if err != nil {
return nil, err
Expand Down
3 changes: 2 additions & 1 deletion pkg/scalers/aws_cloudwatch_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ var testAWSCloudwatchMetadata = []parseAWSCloudwatchMetadataTestData{
"metricCollectionTime": "300",
"metricStat": "Average",
"metricStatPeriod": "300",
"awsRegion": "eu-west-1"},
"awsRegion": "eu-west-1",
"awsEndpoint": "http://localhost:4566"},
testAWSAuthentication, false,
"Properly formed cloudwatch query with optional parameters"},
// properly formed cloudwatch query but Region is empty
Expand Down
12 changes: 10 additions & 2 deletions pkg/scalers/aws_dynamodb_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type awsDynamoDBScaler struct {
type awsDynamoDBMetadata struct {
tableName string
awsRegion string
awsEndpoint string
keyConditionExpression string
expressionAttributeNames map[string]*string
expressionAttributeValues map[string]*dynamodb.AttributeValue
Expand Down Expand Up @@ -76,6 +77,10 @@ func parseAwsDynamoDBMetadata(config *ScalerConfig) (*awsDynamoDBMetadata, error
return nil, fmt.Errorf("no awsRegion given")
}

if val, ok := config.TriggerMetadata["awsEndpoint"]; ok {
meta.awsEndpoint = val
}

if val, ok := config.TriggerMetadata["keyConditionExpression"]; ok && val != "" {
meta.keyConditionExpression = val
} else {
Expand Down Expand Up @@ -144,14 +149,16 @@ func parseAwsDynamoDBMetadata(config *ScalerConfig) (*awsDynamoDBMetadata, error

func createDynamoDBClient(meta *awsDynamoDBMetadata) *dynamodb.DynamoDB {
sess := session.Must(session.NewSession(&aws.Config{
Region: aws.String(meta.awsRegion),
Region: aws.String(meta.awsRegion),
Endpoint: aws.String(meta.awsEndpoint),
}))

var dbClient *dynamodb.DynamoDB

if !meta.awsAuthorization.podIdentityOwner {
dbClient = dynamodb.New(sess, &aws.Config{
Region: aws.String(meta.awsRegion),
Region: aws.String(meta.awsRegion),
Endpoint: aws.String(meta.awsEndpoint),
})

return dbClient
Expand All @@ -165,6 +172,7 @@ func createDynamoDBClient(meta *awsDynamoDBMetadata) *dynamodb.DynamoDB {

dbClient = dynamodb.New(sess, &aws.Config{
Region: aws.String(meta.awsRegion),
Endpoint: aws.String(meta.awsEndpoint),
Credentials: creds,
})

Expand Down
30 changes: 30 additions & 0 deletions pkg/scalers/aws_dynamodb_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,36 @@ var dynamoTestCases = []parseDynamoDBMetadataTestData{
},
},
},
{
name: "properly formed dynamo name and region with custom endpoint",
metadata: map[string]string{
"tableName": "test",
"awsRegion": "eu-west-1",
"awsEndpoint": "http://localhost:4566",
"keyConditionExpression": "#yr = :yyyy",
"expressionAttributeNames": "{ \"#yr\" : \"year\" }",
"expressionAttributeValues": "{\":yyyy\": {\"N\": \"1994\"}}",
"targetValue": "3",
},
authParams: testAWSDynamoAuthentication,
expectedError: nil,
expectedMetadata: &awsDynamoDBMetadata{
tableName: "test",
awsRegion: "eu-west-1",
awsEndpoint: "http://localhost:4566",
keyConditionExpression: "#yr = :yyyy",
expressionAttributeNames: map[string]*string{"#yr": &year},
expressionAttributeValues: map[string]*dynamodb.AttributeValue{":yyyy": &yearAttr},
targetValue: 3,
scalerIndex: 1,
metricName: "s1-aws-dynamodb-test",
awsAuthorization: awsAuthorizationMetadata{
awsAccessKeyID: "none",
awsSecretAccessKey: "none",
podIdentityOwner: true,
},
},
},
}

func TestParseDynamoMetadata(t *testing.T) {
Expand Down
16 changes: 13 additions & 3 deletions pkg/scalers/aws_dynamodb_streams_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type awsDynamoDBStreamsMetadata struct {
activationTargetShardCount int64
tableName string
awsRegion string
awsEndpoint string
awsAuthorization awsAuthorizationMetadata
scalerIndex int
}
Expand Down Expand Up @@ -85,6 +86,10 @@ func parseAwsDynamoDBStreamsMetadata(config *ScalerConfig, logger logr.Logger) (
return nil, fmt.Errorf("no awsRegion given")
}

if val, ok := config.TriggerMetadata["awsEndpoint"]; ok {
meta.awsEndpoint = val
}

if val, ok := config.TriggerMetadata["tableName"]; ok && val != "" {
meta.tableName = val
} else {
Expand Down Expand Up @@ -123,7 +128,8 @@ func parseAwsDynamoDBStreamsMetadata(config *ScalerConfig, logger logr.Logger) (

func createClientsForDynamoDBStreamsScaler(metadata *awsDynamoDBStreamsMetadata) (*dynamodb.DynamoDB, *dynamodbstreams.DynamoDBStreams) {
sess := session.Must(session.NewSession(&aws.Config{
Region: aws.String(metadata.awsRegion),
Region: aws.String(metadata.awsRegion),
Endpoint: aws.String(metadata.awsEndpoint),
}))

var dbClient *dynamodb.DynamoDB
Expand All @@ -136,18 +142,22 @@ func createClientsForDynamoDBStreamsScaler(metadata *awsDynamoDBStreamsMetadata)
}
dbClient = dynamodb.New(sess, &aws.Config{
Region: aws.String(metadata.awsRegion),
Endpoint: aws.String(metadata.awsEndpoint),
Credentials: creds,
})
dbStreamClient = dynamodbstreams.New(sess, &aws.Config{
Region: aws.String(metadata.awsRegion),
Endpoint: aws.String(metadata.awsEndpoint),
Credentials: creds,
})
} else {
dbClient = dynamodb.New(sess, &aws.Config{
Region: aws.String(metadata.awsRegion),
Region: aws.String(metadata.awsRegion),
Endpoint: aws.String(metadata.awsEndpoint),
})
dbStreamClient = dynamodbstreams.New(sess, &aws.Config{
Region: aws.String(metadata.awsRegion),
Region: aws.String(metadata.awsRegion),
Endpoint: aws.String(metadata.awsEndpoint),
})
}
return dbClient, dbStreamClient
Expand Down
26 changes: 26 additions & 0 deletions pkg/scalers/aws_dynamodb_streams_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ const (
testAWSDynamoDBStreamsSecretAccessKey = "none"
testAWSDynamoDBStreamsSessionToken = "none"
testAWSDynamoDBStreamsRegion = "ap-northeast-1"
testAWSDynamoDBStreamsEndpoint = "http://localhost:4566"
testAWSDynamoDBStreamsArnForSmallTable = "smallstreamarn"
testAWSDynamoDBStreamsArnForBigTable = "bigstreamarn"
testAWSDynamoDBStreamsErrorArn = "errorarn"
Expand Down Expand Up @@ -150,6 +151,31 @@ var testAwsDynamoDBStreamMetadata = []parseAwsDynamoDBStreamsMetadataTestData{
comment: "properly formed dynamodb table name and region",
scalerIndex: 0,
},
{
metadata: map[string]string{
"tableName": testAWSDynamoDBSmallTable,
"shardCount": "2",
"activationShardCount": "1",
"awsRegion": testAWSDynamoDBStreamsRegion,
"awsEndpoint": testAWSDynamoDBStreamsEndpoint},
authParams: testAWSKinesisAuthentication,
expected: &awsDynamoDBStreamsMetadata{
targetShardCount: 2,
activationTargetShardCount: 1,
tableName: testAWSDynamoDBSmallTable,
awsRegion: testAWSDynamoDBStreamsRegion,
awsEndpoint: testAWSDynamoDBStreamsEndpoint,
awsAuthorization: awsAuthorizationMetadata{
awsAccessKeyID: testAWSDynamoDBStreamsAccessKeyID,
awsSecretAccessKey: testAWSDynamoDBStreamsSecretAccessKey,
podIdentityOwner: true,
},
scalerIndex: 0,
},
isError: false,
comment: "properly formed dynamodb table name and region",
scalerIndex: 0,
},
{
metadata: map[string]string{
"tableName": "",
Expand Down
12 changes: 10 additions & 2 deletions pkg/scalers/aws_kinesis_stream_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type awsKinesisStreamMetadata struct {
activationTargetShardCount int64
streamName string
awsRegion string
awsEndpoint string
awsAuthorization awsAuthorizationMetadata
scalerIndex int
}
Expand Down Expand Up @@ -98,6 +99,10 @@ func parseAwsKinesisStreamMetadata(config *ScalerConfig, logger logr.Logger) (*a
return nil, fmt.Errorf("no awsRegion given")
}

if val, ok := config.TriggerMetadata["awsEndpoint"]; ok {
meta.awsEndpoint = val
}

auth, err := getAwsAuthorization(config.AuthParams, config.TriggerMetadata, config.ResolvedEnv)
if err != nil {
return nil, err
Expand All @@ -112,7 +117,8 @@ func parseAwsKinesisStreamMetadata(config *ScalerConfig, logger logr.Logger) (*a

func createKinesisClient(metadata *awsKinesisStreamMetadata) *kinesis.Kinesis {
sess := session.Must(session.NewSession(&aws.Config{
Region: aws.String(metadata.awsRegion),
Region: aws.String(metadata.awsRegion),
Endpoint: aws.String(metadata.awsEndpoint),
}))

var kinesisClinent *kinesis.Kinesis
Expand All @@ -125,11 +131,13 @@ func createKinesisClient(metadata *awsKinesisStreamMetadata) *kinesis.Kinesis {

kinesisClinent = kinesis.New(sess, &aws.Config{
Region: aws.String(metadata.awsRegion),
Endpoint: aws.String(metadata.awsEndpoint),
Credentials: creds,
})
} else {
kinesisClinent = kinesis.New(sess, &aws.Config{
Region: aws.String(metadata.awsRegion),
Region: aws.String(metadata.awsRegion),
Endpoint: aws.String(metadata.awsEndpoint),
})
}
return kinesisClinent
Expand Down
26 changes: 26 additions & 0 deletions pkg/scalers/aws_kinesis_stream_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ const (
testAWSKinesisSessionToken = "none"
testAWSKinesisStreamName = "test"
testAWSRegion = "eu-west-1"
testAWSEndpoint = "http://localhost:4566"
testAWSKinesisErrorStream = "Error"
)

Expand Down Expand Up @@ -90,6 +91,31 @@ var testAWSKinesisMetadata = []parseAWSKinesisMetadataTestData{
comment: "properly formed stream name and region",
scalerIndex: 0,
},
{
metadata: map[string]string{
"streamName": testAWSKinesisStreamName,
"shardCount": "2",
"activationShardCount": "1",
"awsRegion": testAWSRegion,
"awsEndpoint": testAWSEndpoint},
authParams: testAWSKinesisAuthentication,
expected: &awsKinesisStreamMetadata{
targetShardCount: 2,
activationTargetShardCount: 1,
streamName: testAWSKinesisStreamName,
awsRegion: testAWSRegion,
awsEndpoint: testAWSEndpoint,
awsAuthorization: awsAuthorizationMetadata{
awsAccessKeyID: testAWSKinesisAccessKeyID,
awsSecretAccessKey: testAWSKinesisSecretAccessKey,
podIdentityOwner: true,
},
scalerIndex: 0,
},
isError: false,
comment: "properly formed stream name and region with custom endpoint",
scalerIndex: 0,
},
{
metadata: map[string]string{
"streamName": "",
Expand Down
12 changes: 10 additions & 2 deletions pkg/scalers/aws_sqs_queue_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type awsSqsQueueMetadata struct {
queueURL string
queueName string
awsRegion string
awsEndpoint string
awsAuthorization awsAuthorizationMetadata
scalerIndex int
scaleOnInFlight bool
Expand Down Expand Up @@ -145,6 +146,10 @@ func parseAwsSqsQueueMetadata(config *ScalerConfig, logger logr.Logger) (*awsSqs
return nil, fmt.Errorf("no awsRegion given")
}

if val, ok := config.TriggerMetadata["awsEndpoint"]; ok {
meta.awsEndpoint = val
}

auth, err := getAwsAuthorization(config.AuthParams, config.TriggerMetadata, config.ResolvedEnv)
if err != nil {
return nil, err
Expand All @@ -159,7 +164,8 @@ func parseAwsSqsQueueMetadata(config *ScalerConfig, logger logr.Logger) (*awsSqs

func createSqsClient(metadata *awsSqsQueueMetadata) *sqs.SQS {
sess := session.Must(session.NewSession(&aws.Config{
Region: aws.String(metadata.awsRegion),
Region: aws.String(metadata.awsRegion),
Endpoint: aws.String(metadata.awsEndpoint),
}))

var sqsClient *sqs.SQS
Expand All @@ -172,11 +178,13 @@ func createSqsClient(metadata *awsSqsQueueMetadata) *sqs.SQS {

sqsClient = sqs.New(sess, &aws.Config{
Region: aws.String(metadata.awsRegion),
Endpoint: aws.String(metadata.awsEndpoint),
Credentials: creds,
})
} else {
sqsClient = sqs.New(sess, &aws.Config{
Region: aws.String(metadata.awsRegion),
Region: aws.String(metadata.awsRegion),
Endpoint: aws.String(metadata.awsEndpoint),
})
}
return sqsClient
Expand Down
9 changes: 9 additions & 0 deletions pkg/scalers/aws_sqs_queue_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,15 @@ var testAWSSQSMetadata = []parseAWSSQSMetadataTestData{
testAWSSQSEmptyResolvedEnv,
false,
"properly formed queue and region"},
{map[string]string{
"queueURL": testAWSSQSProperQueueURL,
"queueLength": "1",
"awsRegion": "eu-west-1",
"awsEndpoint": "http://localhost:4566"},
testAWSSQSAuthentication,
testAWSSQSEmptyResolvedEnv,
false,
"properly formed queue and region with custom endpoint"},
{map[string]string{
"queueURL": testAWSSQSImproperQueueURL1,
"queueLength": "1",
Expand Down

0 comments on commit 4306cfa

Please sign in to comment.