Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/add sqs scaler #191

Merged
merged 6 commits into from
May 26, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,7 @@
version = "0.2.0"



[[constraint]]
name = "github.com/aws/aws-sdk-go"
version = "1.19.27"
40 changes: 40 additions & 0 deletions examples/awssqsqueue_scaledobject.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: nginx-deployment
labels:
app: nginx
spec:
replicas: 0
selector:
matchLabels:
app: nginx
template:
metadata:
labels:
app: nginx
spec:
containers:
- name: nginx
image: nginx:1.7.9
ports:
- containerPort: 80
---
apiVersion: keda.k8s.io/v1alpha1
kind: ScaledObject
metadata:
name: aws-sqs-queue-scaledobject
namespace: default
labels:
deploymentName: nginx-deployment
test: nginx-deployment
spec:
scaleTargetRef:
deploymentName: nginx-deployment
triggers:
- type: aws-sqs-queue
metadata:
# Required: queueURL
queueURL: https://sqs.eu-west-1.amazonaws.com/<acccount_id>/testQueue
# Optional
queueLength: "5" # default 5
2 changes: 2 additions & 0 deletions pkg/handler/scale_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,8 @@ func (h *ScaleHandler) getScaler(trigger keda_v1alpha1.ScaleTriggers, resolvedEn
return scalers.NewAzureQueueScaler(resolvedEnv, trigger.Metadata)
case "azure-servicebus":
return scalers.NewAzureServiceBusScaler(resolvedEnv, trigger.Metadata)
case "aws-sqs-queue":
return scalers.NewAwsSqsQueueScaler(resolvedEnv, trigger.Metadata)
case "kafka":
return scalers.NewKafkaScaler(resolvedEnv, trigger.Metadata)
case "rabbitmq":
Expand Down
37 changes: 37 additions & 0 deletions pkg/scalers/aws_sqs_queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package scalers

import (
"context"
"errors"
"strconv"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sqs"
)

func GetAwsSqsQueueLength(ctx context.Context, queueURL string) (int32, error) {
sess, err := session.NewSession(aws.NewConfig().WithRegion("eu-west-1"))
sqsClient := sqs.New(sess)

if len(queueURL) == 0 {
return -1, errors.New("empty queueURL is not valid")
}

input := &sqs.GetQueueAttributesInput{
AttributeNames: aws.StringSlice([]string{"All"}),
QueueUrl: aws.String(queueURL),
}

output, err := sqsClient.GetQueueAttributes(input)
if err != nil {
return -1, nil
}

approximateNumberOfMessages, err := strconv.Atoi(*output.Attributes["ApproximateNumberOfMessages"])
if err != nil {
return -1, nil
}

return int32(approximateNumberOfMessages), nil
}
107 changes: 107 additions & 0 deletions pkg/scalers/aws_sqs_queue_scaler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package scalers

import (
"context"
"errors"
"fmt"
"strconv"

log "github.com/Sirupsen/logrus"
v2beta1 "k8s.io/api/autoscaling/v2beta1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/metrics/pkg/apis/external_metrics"
)

const (
awsSqsQueueMetricName = "ApproximateNumberOfMessages"
)

type awsSqsQueueScaler struct {
metadata *awsSqsQueueMetadata
}

type awsSqsQueueMetadata struct {
targetQueueLength int
queueURL string
}

// NewawsSqsQueueScaler creates a new awsSqsQueueScaler
func NewAwsSqsQueueScaler(resolvedEnv, metadata map[string]string) (Scaler, error) {
meta, err := parseAwsSqsQueueMetadata(metadata, resolvedEnv)
if err != nil {
return nil, fmt.Errorf("Error parsing SQS queue metadata: %s", err)
}

return &awsSqsQueueScaler{
metadata: meta,
}, nil
}

func parseAwsSqsQueueMetadata(metadata, resolvedEnv map[string]string) (*awsSqsQueueMetadata, error) {
meta := awsSqsQueueMetadata{}
meta.targetQueueLength = defaultTargetQueueLength

if val, ok := metadata["queueLength"]; ok {
queueLength, err := strconv.Atoi(val)
if err != nil {
log.Errorf("Error parsing SQS queue metadata %s: %s", "queueLength", err)
} else {
meta.targetQueueLength = queueLength
}
}

if val, ok := metadata["queueURL"]; ok {
queueURL := val

if len(val) <= 0 {
log.Errorf("Error parsing SQS queue metadata %s: %s", "queueURL", errors.New("Empty queueURL is not valid"))
} else {
meta.queueURL = queueURL
}
}

return &meta, nil
}

// GetScaleDecision is a func
func (s *awsSqsQueueScaler) IsActive(ctx context.Context) (bool, error) {
length, err := GetAwsSqsQueueLength(ctx, s.metadata.queueURL)

if err != nil {
log.Errorf("Error %s", err)
return false, err
}

return length > 0, nil
}

func (s *awsSqsQueueScaler) Close() error {
return nil
}

func (s *awsSqsQueueScaler) GetMetricSpecForScaling() []v2beta1.MetricSpec {
targetQueueLengthQty := resource.NewQuantity(int64(s.metadata.targetQueueLength), resource.DecimalSI)
externalMetric := &v2beta1.ExternalMetricSource{MetricName: awsSqsQueueMetricName, TargetAverageValue: targetQueueLengthQty}
metricSpec := v2beta1.MetricSpec{External: externalMetric, Type: externalMetricType}
return []v2beta1.MetricSpec{metricSpec}
}

//GetMetrics returns value for a supported metric and an error if there is a problem getting the metric
func (s *awsSqsQueueScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
queuelen, err := GetAwsSqsQueueLength(ctx, s.metadata.queueURL)

if err != nil {
log.Errorf("Error getting queue length %s", err)
return []external_metrics.ExternalMetricValue{}, err
}

metric := external_metrics.ExternalMetricValue{
MetricName: metricName,
Value: *resource.NewQuantity(int64(queuelen), resource.DecimalSI),
Timestamp: metav1.Now(),
}

return append([]external_metrics.ExternalMetricValue{}, metric), nil
}
26 changes: 26 additions & 0 deletions pkg/scalers/aws_sqs_queue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package scalers

import (
"context"
"os"
"testing"
)

func TestGetQueueLengthReal(t *testing.T) {
queueURL := os.Getenv("AWS_SQS_QUEUE_URL")

t.Log("This test will use the environment variable AWS_SQS_QUEUE_URL if it is set")
t.Log("Ensure that AWS credentials are configured to be able to access the queue")
t.Log("If set, it will connect to the specified SQS Queue & check:")
t.Logf("\tQueue '%s' has 0 message\n", queueURL)

length, err := GetAwsSqsQueueLength(context.TODO(), queueURL)
if err != nil {
t.Error(err)
}
t.Log("QueueLength = ", length)

if length != 0 {
t.Error("Expected length to be 0, but got", length)
}
}
Loading