Skip to content

Commit

Permalink
Feature/add sqs scaler (#191)
Browse files Browse the repository at this point in the history
* Initial sqs scaler

* Update aws_sqs_queue_scaler.go

* Add AWS SQS scaler.

* Add example AwsSqsQueue Scaled Object.

* Update Aws Sqs Scaler test.

* Add vedored libraries


Former-commit-id: 33ff409
  • Loading branch information
zach-dunton-sf authored and yaron2 committed May 26, 2019
1 parent 855bab3 commit b62c4a2
Show file tree
Hide file tree
Showing 155 changed files with 31,992 additions and 0 deletions.
50 changes: 50 additions & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,7 @@
version = "0.2.0"



[[constraint]]
name = "github.com/aws/aws-sdk-go"
version = "1.19.27"
40 changes: 40 additions & 0 deletions examples/awssqsqueue_scaledobject.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: nginx-deployment
labels:
app: nginx
spec:
replicas: 0
selector:
matchLabels:
app: nginx
template:
metadata:
labels:
app: nginx
spec:
containers:
- name: nginx
image: nginx:1.7.9
ports:
- containerPort: 80
---
apiVersion: keda.k8s.io/v1alpha1
kind: ScaledObject
metadata:
name: aws-sqs-queue-scaledobject
namespace: default
labels:
deploymentName: nginx-deployment
test: nginx-deployment
spec:
scaleTargetRef:
deploymentName: nginx-deployment
triggers:
- type: aws-sqs-queue
metadata:
# Required: queueURL
queueURL: https://sqs.eu-west-1.amazonaws.com/<acccount_id>/testQueue
# Optional
queueLength: "5" # default 5
2 changes: 2 additions & 0 deletions pkg/handler/scale_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,8 @@ func (h *ScaleHandler) getScaler(trigger keda_v1alpha1.ScaleTriggers, resolvedEn
return scalers.NewAzureQueueScaler(resolvedEnv, trigger.Metadata)
case "azure-servicebus":
return scalers.NewAzureServiceBusScaler(resolvedEnv, trigger.Metadata)
case "aws-sqs-queue":
return scalers.NewAwsSqsQueueScaler(resolvedEnv, trigger.Metadata)
case "kafka":
return scalers.NewKafkaScaler(resolvedEnv, trigger.Metadata)
case "rabbitmq":
Expand Down
37 changes: 37 additions & 0 deletions pkg/scalers/aws_sqs_queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package scalers

import (
"context"
"errors"
"strconv"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sqs"
)

func GetAwsSqsQueueLength(ctx context.Context, queueURL string) (int32, error) {
sess, err := session.NewSession(aws.NewConfig().WithRegion("eu-west-1"))
sqsClient := sqs.New(sess)

if len(queueURL) == 0 {
return -1, errors.New("empty queueURL is not valid")
}

input := &sqs.GetQueueAttributesInput{
AttributeNames: aws.StringSlice([]string{"All"}),
QueueUrl: aws.String(queueURL),
}

output, err := sqsClient.GetQueueAttributes(input)
if err != nil {
return -1, nil
}

approximateNumberOfMessages, err := strconv.Atoi(*output.Attributes["ApproximateNumberOfMessages"])
if err != nil {
return -1, nil
}

return int32(approximateNumberOfMessages), nil
}
107 changes: 107 additions & 0 deletions pkg/scalers/aws_sqs_queue_scaler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package scalers

import (
"context"
"errors"
"fmt"
"strconv"

log "github.com/Sirupsen/logrus"
v2beta1 "k8s.io/api/autoscaling/v2beta1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/metrics/pkg/apis/external_metrics"
)

const (
awsSqsQueueMetricName = "ApproximateNumberOfMessages"
)

type awsSqsQueueScaler struct {
metadata *awsSqsQueueMetadata
}

type awsSqsQueueMetadata struct {
targetQueueLength int
queueURL string
}

// NewawsSqsQueueScaler creates a new awsSqsQueueScaler
func NewAwsSqsQueueScaler(resolvedEnv, metadata map[string]string) (Scaler, error) {
meta, err := parseAwsSqsQueueMetadata(metadata, resolvedEnv)
if err != nil {
return nil, fmt.Errorf("Error parsing SQS queue metadata: %s", err)
}

return &awsSqsQueueScaler{
metadata: meta,
}, nil
}

func parseAwsSqsQueueMetadata(metadata, resolvedEnv map[string]string) (*awsSqsQueueMetadata, error) {
meta := awsSqsQueueMetadata{}
meta.targetQueueLength = defaultTargetQueueLength

if val, ok := metadata["queueLength"]; ok {
queueLength, err := strconv.Atoi(val)
if err != nil {
log.Errorf("Error parsing SQS queue metadata %s: %s", "queueLength", err)
} else {
meta.targetQueueLength = queueLength
}
}

if val, ok := metadata["queueURL"]; ok {
queueURL := val

if len(val) <= 0 {
log.Errorf("Error parsing SQS queue metadata %s: %s", "queueURL", errors.New("Empty queueURL is not valid"))
} else {
meta.queueURL = queueURL
}
}

return &meta, nil
}

// GetScaleDecision is a func
func (s *awsSqsQueueScaler) IsActive(ctx context.Context) (bool, error) {
length, err := GetAwsSqsQueueLength(ctx, s.metadata.queueURL)

if err != nil {
log.Errorf("Error %s", err)
return false, err
}

return length > 0, nil
}

func (s *awsSqsQueueScaler) Close() error {
return nil
}

func (s *awsSqsQueueScaler) GetMetricSpecForScaling() []v2beta1.MetricSpec {
targetQueueLengthQty := resource.NewQuantity(int64(s.metadata.targetQueueLength), resource.DecimalSI)
externalMetric := &v2beta1.ExternalMetricSource{MetricName: awsSqsQueueMetricName, TargetAverageValue: targetQueueLengthQty}
metricSpec := v2beta1.MetricSpec{External: externalMetric, Type: externalMetricType}
return []v2beta1.MetricSpec{metricSpec}
}

//GetMetrics returns value for a supported metric and an error if there is a problem getting the metric
func (s *awsSqsQueueScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
queuelen, err := GetAwsSqsQueueLength(ctx, s.metadata.queueURL)

if err != nil {
log.Errorf("Error getting queue length %s", err)
return []external_metrics.ExternalMetricValue{}, err
}

metric := external_metrics.ExternalMetricValue{
MetricName: metricName,
Value: *resource.NewQuantity(int64(queuelen), resource.DecimalSI),
Timestamp: metav1.Now(),
}

return append([]external_metrics.ExternalMetricValue{}, metric), nil
}
26 changes: 26 additions & 0 deletions pkg/scalers/aws_sqs_queue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package scalers

import (
"context"
"os"
"testing"
)

func TestGetQueueLengthReal(t *testing.T) {
queueURL := os.Getenv("AWS_SQS_QUEUE_URL")

t.Log("This test will use the environment variable AWS_SQS_QUEUE_URL if it is set")
t.Log("Ensure that AWS credentials are configured to be able to access the queue")
t.Log("If set, it will connect to the specified SQS Queue & check:")
t.Logf("\tQueue '%s' has 0 message\n", queueURL)

length, err := GetAwsSqsQueueLength(context.TODO(), queueURL)
if err != nil {
t.Error(err)
}
t.Log("QueueLength = ", length)

if length != 0 {
t.Error("Expected length to be 0, but got", length)
}
}
Loading

0 comments on commit b62c4a2

Please sign in to comment.