diff --git a/pkg/handler/scale_handler.go b/pkg/handler/scale_handler.go index 66715dc37ed..eb718b44698 100644 --- a/pkg/handler/scale_handler.go +++ b/pkg/handler/scale_handler.go @@ -272,6 +272,8 @@ func (h *ScaleHandler) getScaler(name, namespace, triggerType string, resolvedEn return scalers.NewAwsSqsQueueScaler(resolvedEnv, triggerMetadata, authParams) case "aws-cloudwatch": return scalers.NewAwsCloudwatchScaler(resolvedEnv, triggerMetadata, authParams) + case "aws-kinesis-stream": + return scalers.NewAwsKinesisStreamScaler(resolvedEnv, triggerMetadata, authParams) case "kafka": return scalers.NewKafkaScaler(resolvedEnv, triggerMetadata, authParams) case "rabbitmq": diff --git a/pkg/scalers/aws_iam_authorization.go b/pkg/scalers/aws_iam_authorization.go index 9e8449fb56d..624c50700e1 100644 --- a/pkg/scalers/aws_iam_authorization.go +++ b/pkg/scalers/aws_iam_authorization.go @@ -21,8 +21,11 @@ func getAwsAuthorization(authParams, metadata, resolvedEnv map[string]string) (a if authParams["awsRoleArn"] != "" { meta.awsRoleArn = authParams["awsRoleArn"] - } else if authParams["awsAccessKeyId"] != "" && authParams["awsSecretAccessKey"] != "" { - meta.awsAccessKeyID = authParams["awsAccessKeyId"] + } else if (authParams["awsAccessKeyID"] != "" || authParams["awsAccessKeyId"] != "") && authParams["awsSecretAccessKey"] != "" { + meta.awsAccessKeyID = authParams["awsAccessKeyID"] + if meta.awsAccessKeyID == "" { + meta.awsAccessKeyID = authParams["awsAccessKeyId"] + } meta.awsSecretAccessKey = authParams["awsSecretAccessKey"] } else { var keyName string diff --git a/pkg/scalers/aws_kinesis_stream_scaler.go b/pkg/scalers/aws_kinesis_stream_scaler.go new file mode 100644 index 00000000000..846d4ccac1d --- /dev/null +++ b/pkg/scalers/aws_kinesis_stream_scaler.go @@ -0,0 +1,155 @@ +package scalers + +import ( + "context" + "fmt" + "strconv" + + "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/credentials/stscreds" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/kinesis" + v2beta1 "k8s.io/api/autoscaling/v2beta1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/metrics/pkg/apis/external_metrics" + logf "sigs.k8s.io/controller-runtime/pkg/log" +) + +const ( + awsKinesisStreamMetricName = "ShardCount" + targetShardCountDefault = 2 +) + +type awsKinesisStreamScaler struct { + metadata *awsKinesisStreamMetadata +} + +type awsKinesisStreamMetadata struct { + targetShardCount int + streamName string + awsRegion string + awsAuthorization awsAuthorizationMetadata +} + +var kinesisStreamLog = logf.Log.WithName("aws_kinesis_stream_scaler") + +// NewAwsKinesisStreamScaler creates a new awsKinesisStreamScaler +func NewAwsKinesisStreamScaler(resolvedEnv, metadata map[string]string, authParams map[string]string) (Scaler, error) { + meta, err := parseAwsKinesisStreamMetadata(metadata, resolvedEnv, authParams) + if err != nil { + return nil, fmt.Errorf("Error parsing Kinesis stream metadata: %s", err) + } + + return &awsKinesisStreamScaler{ + metadata: meta, + }, nil +} + +func parseAwsKinesisStreamMetadata(metadata, resolvedEnv, authParams map[string]string) (*awsKinesisStreamMetadata, error) { + meta := awsKinesisStreamMetadata{} + meta.targetShardCount = targetShardCountDefault + + if val, ok := metadata["shardCount"]; ok && val != "" { + shardCount, err := strconv.Atoi(val) + if err != nil { + meta.targetShardCount = targetShardCountDefault + kinesisStreamLog.Error(err, "Error parsing Kinesis stream metadata shardCount, using default %n", targetShardCountDefault) + } else { + meta.targetShardCount = shardCount + } + } + + if val, ok := metadata["streamName"]; ok && val != "" { + meta.streamName = val + } else { + return nil, fmt.Errorf("no streamName given") + } + + if val, ok := metadata["awsRegion"]; ok && val != "" { + meta.awsRegion = val + } else { + return nil, fmt.Errorf("no awsRegion given") + } + + auth, err := getAwsAuthorization(authParams, metadata, resolvedEnv) + if err != nil { + return nil, err + } + + meta.awsAuthorization = auth + + return &meta, nil +} + +// IsActive determines if we need to scale from zero +func (s *awsKinesisStreamScaler) IsActive(ctx context.Context) (bool, error) { + count, err := s.GetAwsKinesisOpenShardCount() + + if err != nil { + return false, err + } + + return count > 0, nil +} + +func (s *awsKinesisStreamScaler) Close() error { + return nil +} + +func (s *awsKinesisStreamScaler) GetMetricSpecForScaling() []v2beta1.MetricSpec { + targetShardCountQty := resource.NewQuantity(int64(s.metadata.targetShardCount), resource.DecimalSI) + externalMetric := &v2beta1.ExternalMetricSource{MetricName: fmt.Sprintf("%s-%s-%s", "AWS-Kinesis-Stream", awsKinesisStreamMetricName, s.metadata.streamName), + TargetAverageValue: targetShardCountQty} + metricSpec := v2beta1.MetricSpec{External: externalMetric, Type: externalMetricType} + return []v2beta1.MetricSpec{metricSpec} +} + +//GetMetrics returns value for a supported metric and an error if there is a problem getting the metric +func (s *awsKinesisStreamScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) { + shardCount, err := s.GetAwsKinesisOpenShardCount() + + if err != nil { + kinesisStreamLog.Error(err, "Error getting shard count") + return []external_metrics.ExternalMetricValue{}, err + } + + metric := external_metrics.ExternalMetricValue{ + MetricName: metricName, + Value: *resource.NewQuantity(int64(shardCount), resource.DecimalSI), + Timestamp: metav1.Now(), + } + + return append([]external_metrics.ExternalMetricValue{}, metric), nil +} + +// Get Kinesis open shard count +func (s *awsKinesisStreamScaler) GetAwsKinesisOpenShardCount() (int64, error) { + input := &kinesis.DescribeStreamSummaryInput{ + StreamName: &s.metadata.streamName, + } + + sess := session.Must(session.NewSession(&aws.Config{ + Region: aws.String(s.metadata.awsRegion), + })) + creds := credentials.NewStaticCredentials(s.metadata.awsAuthorization.awsAccessKeyID, s.metadata.awsAuthorization.awsSecretAccessKey, "") + + if s.metadata.awsAuthorization.awsRoleArn != "" { + creds = stscreds.NewCredentials(sess, s.metadata.awsAuthorization.awsRoleArn) + } + + kinesisClinent := kinesis.New(sess, &aws.Config{ + Region: aws.String(s.metadata.awsRegion), + Credentials: creds, + }) + + output, err := kinesisClinent.DescribeStreamSummary(input) + if err != nil { + return -1, err + } + + return *output.StreamDescriptionSummary.OpenShardCount, nil +} diff --git a/pkg/scalers/aws_kinesis_stream_test.go b/pkg/scalers/aws_kinesis_stream_test.go new file mode 100644 index 00000000000..eb62bc99c67 --- /dev/null +++ b/pkg/scalers/aws_kinesis_stream_test.go @@ -0,0 +1,167 @@ +package scalers + +import ( + "reflect" + "testing" +) + +const ( + testAWSKinesisRoleArn = "none" + testAWSKinesisAccessKeyID = "none" + testAWSKinesisSecretAccessKey = "none" + testAWSKinesisStreamName = "test" + testAWSRegion = "eu-west-1" +) + +var testAWSKinesisResolvedEnv = map[string]string{ + "AWS_ACCESS_KEY": "none", + "AWS_SECRET_ACCESS_KEY": "none", +} + +var testAWSKinesisAuthentication = map[string]string{ + "awsAccessKeyID": testAWSKinesisAccessKeyID, + "awsSecretAccessKey": testAWSKinesisSecretAccessKey, +} + +type parseAWSKinesisMetadataTestData struct { + metadata map[string]string + expected *awsKinesisStreamMetadata + authParams map[string]string + isError bool + comment string +} + +var testAWSKinesisMetadata = []parseAWSKinesisMetadataTestData{ + { + metadata: map[string]string{}, + authParams: testAWSKinesisAuthentication, + expected: &awsKinesisStreamMetadata{}, + isError: true, + comment: "metadata empty"}, + { + metadata: map[string]string{ + "streamName": testAWSKinesisStreamName, + "shardCount": "2", + "awsRegion": testAWSRegion}, + authParams: testAWSKinesisAuthentication, + expected: &awsKinesisStreamMetadata{ + targetShardCount: 2, + streamName: testAWSKinesisStreamName, + awsRegion: testAWSRegion, + awsAuthorization: awsAuthorizationMetadata{ + awsAccessKeyID: testAWSKinesisAccessKeyID, + awsSecretAccessKey: testAWSKinesisSecretAccessKey, + }, + }, + isError: false, + comment: "properly formed stream name and region"}, + { + metadata: map[string]string{ + "streamName": "", + "shardCount": "2", + "awsRegion": testAWSRegion}, + authParams: testAWSKinesisAuthentication, + expected: &awsKinesisStreamMetadata{}, + isError: true, + comment: "missing stream name"}, + { + metadata: map[string]string{ + "streamName": testAWSKinesisStreamName, + "shardCount": "2", + "awsRegion": ""}, + authParams: testAWSKinesisAuthentication, + expected: &awsKinesisStreamMetadata{}, + isError: true, + comment: "properly formed stream name, empty region"}, + { + metadata: map[string]string{ + "streamName": testAWSKinesisStreamName, + "shardCount": "", + "awsRegion": testAWSRegion}, + authParams: testAWSKinesisAuthentication, + expected: &awsKinesisStreamMetadata{ + targetShardCount: 2, + streamName: testAWSKinesisStreamName, + awsRegion: testAWSRegion, + awsAuthorization: awsAuthorizationMetadata{ + awsAccessKeyID: testAWSKinesisAccessKeyID, + awsSecretAccessKey: testAWSKinesisSecretAccessKey, + }, + }, + isError: false, + comment: "properly formed stream name and region, empty shard count"}, + { + metadata: map[string]string{ + "streamName": testAWSKinesisStreamName, + "shardCount": "a", + "awsRegion": testAWSRegion}, + authParams: testAWSKinesisAuthentication, + expected: &awsKinesisStreamMetadata{ + targetShardCount: 2, + streamName: testAWSKinesisStreamName, + awsRegion: testAWSRegion, + awsAuthorization: awsAuthorizationMetadata{ + awsAccessKeyID: testAWSKinesisAccessKeyID, + awsSecretAccessKey: testAWSKinesisSecretAccessKey, + }, + }, + isError: false, + comment: "properly formed stream name and region, wrong shard count"}, + + { + metadata: map[string]string{ + "streamName": testAWSKinesisStreamName, + "shardCount": "2", + "awsRegion": testAWSRegion}, + authParams: map[string]string{ + "awsAccessKeyID": "", + "awsSecretAccessKey": testAWSKinesisSecretAccessKey, + }, + expected: &awsKinesisStreamMetadata{}, + isError: true, + comment: "with AWS Credentials from TriggerAuthentication, missing Access Key Id"}, + {metadata: map[string]string{ + "streamName": testAWSKinesisStreamName, + "shardCount": "2", + "awsRegion": testAWSRegion}, + authParams: map[string]string{ + "awsAccessKeyID": testAWSKinesisAccessKeyID, + "awsSecretAccessKey": "", + }, + expected: &awsKinesisStreamMetadata{}, + isError: true, + comment: "with AWS Credentials from TriggerAuthentication, missing Secret Access Key"}, + {metadata: map[string]string{ + "streamName": testAWSKinesisStreamName, + "shardCount": "2", + "awsRegion": testAWSRegion}, + authParams: map[string]string{ + "awsRoleArn": testAWSKinesisRoleArn, + }, + expected: &awsKinesisStreamMetadata{ + targetShardCount: 2, + streamName: testAWSKinesisStreamName, + awsRegion: testAWSRegion, + awsAuthorization: awsAuthorizationMetadata{ + awsRoleArn: testAWSKinesisRoleArn, + }, + }, + isError: false, + comment: "with AWS Role from TriggerAuthentication"}, +} + +func TestKinesisParseMetadata(t *testing.T) { + for _, testData := range testAWSKinesisMetadata { + result, err := parseAwsKinesisStreamMetadata(testData.metadata, testAWSKinesisAuthentication, testData.authParams) + if err != nil && !testData.isError { + t.Errorf("Expected success because %s got error, %s", testData.comment, err) + } + if testData.isError && err == nil { + t.Errorf("Expected error because %s but got success, %#v", testData.comment, testData) + } + + if !testData.isError && !reflect.DeepEqual(testData.expected, result) { + t.Fatalf("Expected %#v but got %+#v", testData.expected, result) + } + } +}