Skip to content

Commit

Permalink
add kinesis stream scaler
Browse files Browse the repository at this point in the history
  • Loading branch information
msfuko authored and chloel committed Jan 7, 2020
1 parent 61d7959 commit de0cc5e
Show file tree
Hide file tree
Showing 4 changed files with 329 additions and 2 deletions.
2 changes: 2 additions & 0 deletions pkg/handler/scale_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,8 @@ func (h *ScaleHandler) getScaler(name, namespace, triggerType string, resolvedEn
return scalers.NewAwsSqsQueueScaler(resolvedEnv, triggerMetadata, authParams)
case "aws-cloudwatch":
return scalers.NewAwsCloudwatchScaler(resolvedEnv, triggerMetadata, authParams)
case "aws-kinesis-stream":
return scalers.NewAwsKinesisStreamScaler(resolvedEnv, triggerMetadata, authParams)
case "kafka":
return scalers.NewKafkaScaler(resolvedEnv, triggerMetadata, authParams)
case "rabbitmq":
Expand Down
7 changes: 5 additions & 2 deletions pkg/scalers/aws_iam_authorization.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@ func getAwsAuthorization(authParams, metadata, resolvedEnv map[string]string) (a

if authParams["awsRoleArn"] != "" {
meta.awsRoleArn = authParams["awsRoleArn"]
} else if authParams["awsAccessKeyId"] != "" && authParams["awsSecretAccessKey"] != "" {
meta.awsAccessKeyID = authParams["awsAccessKeyId"]
} else if (authParams["awsAccessKeyID"] != "" || authParams["awsAccessKeyId"] != "") && authParams["awsSecretAccessKey"] != "" {
meta.awsAccessKeyID = authParams["awsAccessKeyID"]
if meta.awsAccessKeyID == "" {
meta.awsAccessKeyID = authParams["awsAccessKeyId"]
}
meta.awsSecretAccessKey = authParams["awsSecretAccessKey"]
} else {
var keyName string
Expand Down
155 changes: 155 additions & 0 deletions pkg/scalers/aws_kinesis_stream_scaler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
package scalers

import (
"context"
"fmt"
"strconv"

"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/credentials/stscreds"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/kinesis"
v2beta1 "k8s.io/api/autoscaling/v2beta1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/metrics/pkg/apis/external_metrics"
logf "sigs.k8s.io/controller-runtime/pkg/log"
)

const (
awsKinesisStreamMetricName = "ShardCount"
targetShardCountDefault = 2
)

type awsKinesisStreamScaler struct {
metadata *awsKinesisStreamMetadata
}

type awsKinesisStreamMetadata struct {
targetShardCount int
streamName string
awsRegion string
awsAuthorization awsAuthorizationMetadata
}

var kinesisStreamLog = logf.Log.WithName("aws_kinesis_stream_scaler")

// NewAwsKinesisStreamScaler creates a new awsKinesisStreamScaler
func NewAwsKinesisStreamScaler(resolvedEnv, metadata map[string]string, authParams map[string]string) (Scaler, error) {
meta, err := parseAwsKinesisStreamMetadata(metadata, resolvedEnv, authParams)
if err != nil {
return nil, fmt.Errorf("Error parsing Kinesis stream metadata: %s", err)
}

return &awsKinesisStreamScaler{
metadata: meta,
}, nil
}

func parseAwsKinesisStreamMetadata(metadata, resolvedEnv, authParams map[string]string) (*awsKinesisStreamMetadata, error) {
meta := awsKinesisStreamMetadata{}
meta.targetShardCount = targetShardCountDefault

if val, ok := metadata["shardCount"]; ok && val != "" {
shardCount, err := strconv.Atoi(val)
if err != nil {
meta.targetShardCount = targetShardCountDefault
kinesisStreamLog.Error(err, "Error parsing Kinesis stream metadata shardCount, using default %n", targetShardCountDefault)
} else {
meta.targetShardCount = shardCount
}
}

if val, ok := metadata["streamName"]; ok && val != "" {
meta.streamName = val
} else {
return nil, fmt.Errorf("no streamName given")
}

if val, ok := metadata["awsRegion"]; ok && val != "" {
meta.awsRegion = val
} else {
return nil, fmt.Errorf("no awsRegion given")
}

auth, err := getAwsAuthorization(authParams, metadata, resolvedEnv)
if err != nil {
return nil, err
}

meta.awsAuthorization = auth

return &meta, nil
}

// IsActive determines if we need to scale from zero
func (s *awsKinesisStreamScaler) IsActive(ctx context.Context) (bool, error) {
count, err := s.GetAwsKinesisOpenShardCount()

if err != nil {
return false, err
}

return count > 0, nil
}

func (s *awsKinesisStreamScaler) Close() error {
return nil
}

func (s *awsKinesisStreamScaler) GetMetricSpecForScaling() []v2beta1.MetricSpec {
targetShardCountQty := resource.NewQuantity(int64(s.metadata.targetShardCount), resource.DecimalSI)
externalMetric := &v2beta1.ExternalMetricSource{MetricName: fmt.Sprintf("%s-%s-%s", "AWS-Kinesis-Stream", awsKinesisStreamMetricName, s.metadata.streamName),
TargetAverageValue: targetShardCountQty}
metricSpec := v2beta1.MetricSpec{External: externalMetric, Type: externalMetricType}
return []v2beta1.MetricSpec{metricSpec}
}

//GetMetrics returns value for a supported metric and an error if there is a problem getting the metric
func (s *awsKinesisStreamScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
shardCount, err := s.GetAwsKinesisOpenShardCount()

if err != nil {
kinesisStreamLog.Error(err, "Error getting shard count")
return []external_metrics.ExternalMetricValue{}, err
}

metric := external_metrics.ExternalMetricValue{
MetricName: metricName,
Value: *resource.NewQuantity(int64(shardCount), resource.DecimalSI),
Timestamp: metav1.Now(),
}

return append([]external_metrics.ExternalMetricValue{}, metric), nil
}

// Get Kinesis open shard count
func (s *awsKinesisStreamScaler) GetAwsKinesisOpenShardCount() (int64, error) {
input := &kinesis.DescribeStreamSummaryInput{
StreamName: &s.metadata.streamName,
}

sess := session.Must(session.NewSession(&aws.Config{
Region: aws.String(s.metadata.awsRegion),
}))
creds := credentials.NewStaticCredentials(s.metadata.awsAuthorization.awsAccessKeyID, s.metadata.awsAuthorization.awsSecretAccessKey, "")

if s.metadata.awsAuthorization.awsRoleArn != "" {
creds = stscreds.NewCredentials(sess, s.metadata.awsAuthorization.awsRoleArn)
}

kinesisClinent := kinesis.New(sess, &aws.Config{
Region: aws.String(s.metadata.awsRegion),
Credentials: creds,
})

output, err := kinesisClinent.DescribeStreamSummary(input)
if err != nil {
return -1, err
}

return *output.StreamDescriptionSummary.OpenShardCount, nil
}
167 changes: 167 additions & 0 deletions pkg/scalers/aws_kinesis_stream_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
package scalers

import (
"reflect"
"testing"
)

const (
testAWSKinesisRoleArn = "none"
testAWSKinesisAccessKeyID = "none"
testAWSKinesisSecretAccessKey = "none"
testAWSKinesisStreamName = "test"
testAWSRegion = "eu-west-1"
)

var testAWSKinesisResolvedEnv = map[string]string{
"AWS_ACCESS_KEY": "none",
"AWS_SECRET_ACCESS_KEY": "none",
}

var testAWSKinesisAuthentication = map[string]string{
"awsAccessKeyID": testAWSKinesisAccessKeyID,
"awsSecretAccessKey": testAWSKinesisSecretAccessKey,
}

type parseAWSKinesisMetadataTestData struct {
metadata map[string]string
expected *awsKinesisStreamMetadata
authParams map[string]string
isError bool
comment string
}

var testAWSKinesisMetadata = []parseAWSKinesisMetadataTestData{
{
metadata: map[string]string{},
authParams: testAWSKinesisAuthentication,
expected: &awsKinesisStreamMetadata{},
isError: true,
comment: "metadata empty"},
{
metadata: map[string]string{
"streamName": testAWSKinesisStreamName,
"shardCount": "2",
"awsRegion": testAWSRegion},
authParams: testAWSKinesisAuthentication,
expected: &awsKinesisStreamMetadata{
targetShardCount: 2,
streamName: testAWSKinesisStreamName,
awsRegion: testAWSRegion,
awsAuthorization: awsAuthorizationMetadata{
awsAccessKeyID: testAWSKinesisAccessKeyID,
awsSecretAccessKey: testAWSKinesisSecretAccessKey,
},
},
isError: false,
comment: "properly formed stream name and region"},
{
metadata: map[string]string{
"streamName": "",
"shardCount": "2",
"awsRegion": testAWSRegion},
authParams: testAWSKinesisAuthentication,
expected: &awsKinesisStreamMetadata{},
isError: true,
comment: "missing stream name"},
{
metadata: map[string]string{
"streamName": testAWSKinesisStreamName,
"shardCount": "2",
"awsRegion": ""},
authParams: testAWSKinesisAuthentication,
expected: &awsKinesisStreamMetadata{},
isError: true,
comment: "properly formed stream name, empty region"},
{
metadata: map[string]string{
"streamName": testAWSKinesisStreamName,
"shardCount": "",
"awsRegion": testAWSRegion},
authParams: testAWSKinesisAuthentication,
expected: &awsKinesisStreamMetadata{
targetShardCount: 2,
streamName: testAWSKinesisStreamName,
awsRegion: testAWSRegion,
awsAuthorization: awsAuthorizationMetadata{
awsAccessKeyID: testAWSKinesisAccessKeyID,
awsSecretAccessKey: testAWSKinesisSecretAccessKey,
},
},
isError: false,
comment: "properly formed stream name and region, empty shard count"},
{
metadata: map[string]string{
"streamName": testAWSKinesisStreamName,
"shardCount": "a",
"awsRegion": testAWSRegion},
authParams: testAWSKinesisAuthentication,
expected: &awsKinesisStreamMetadata{
targetShardCount: 2,
streamName: testAWSKinesisStreamName,
awsRegion: testAWSRegion,
awsAuthorization: awsAuthorizationMetadata{
awsAccessKeyID: testAWSKinesisAccessKeyID,
awsSecretAccessKey: testAWSKinesisSecretAccessKey,
},
},
isError: false,
comment: "properly formed stream name and region, wrong shard count"},

{
metadata: map[string]string{
"streamName": testAWSKinesisStreamName,
"shardCount": "2",
"awsRegion": testAWSRegion},
authParams: map[string]string{
"awsAccessKeyID": "",
"awsSecretAccessKey": testAWSKinesisSecretAccessKey,
},
expected: &awsKinesisStreamMetadata{},
isError: true,
comment: "with AWS Credentials from TriggerAuthentication, missing Access Key Id"},
{metadata: map[string]string{
"streamName": testAWSKinesisStreamName,
"shardCount": "2",
"awsRegion": testAWSRegion},
authParams: map[string]string{
"awsAccessKeyID": testAWSKinesisAccessKeyID,
"awsSecretAccessKey": "",
},
expected: &awsKinesisStreamMetadata{},
isError: true,
comment: "with AWS Credentials from TriggerAuthentication, missing Secret Access Key"},
{metadata: map[string]string{
"streamName": testAWSKinesisStreamName,
"shardCount": "2",
"awsRegion": testAWSRegion},
authParams: map[string]string{
"awsRoleArn": testAWSKinesisRoleArn,
},
expected: &awsKinesisStreamMetadata{
targetShardCount: 2,
streamName: testAWSKinesisStreamName,
awsRegion: testAWSRegion,
awsAuthorization: awsAuthorizationMetadata{
awsRoleArn: testAWSKinesisRoleArn,
},
},
isError: false,
comment: "with AWS Role from TriggerAuthentication"},
}

func TestKinesisParseMetadata(t *testing.T) {
for _, testData := range testAWSKinesisMetadata {
result, err := parseAwsKinesisStreamMetadata(testData.metadata, testAWSKinesisAuthentication, testData.authParams)
if err != nil && !testData.isError {
t.Errorf("Expected success because %s got error, %s", testData.comment, err)
}
if testData.isError && err == nil {
t.Errorf("Expected error because %s but got success, %#v", testData.comment, testData)
}

if !testData.isError && !reflect.DeepEqual(testData.expected, result) {
t.Fatalf("Expected %#v but got %+#v", testData.expected, result)
}
}
}

0 comments on commit de0cc5e

Please sign in to comment.