Skip to content

Commit

Permalink
External Scaler Support for KEDA (#294)
Browse files Browse the repository at this point in the history
* update only when spec is changed


Former-commit-id: f640458

* update only when spec is changed

* remove unused script


Former-commit-id: ac7b28f

* remove unused script

* HPA update on scaled object change


Former-commit-id: 78a1d6d

* HPA update on scaled object change

* updating readme (#137)



Former-commit-id: 36eada5

* updating readme (#137)

* Update README.md

Minor replacement from scalar to scaler on readme

Former-commit-id: aa306ad

* Update README.md

Minor replacement from scalar to scaler on readme

* adding GIF to README (#140)



Former-commit-id: 82b32c1

* adding GIF to README (#140)

* Update default tag in the helm chart

Former-commit-id: 6b55d7e

* Update default tag in the helm chart

* Update default tag in the helm chart

Former-commit-id: 661b106

* Update default tag in the helm chart

* Update Makefile

Former-commit-id: 9118d22

* Update Makefile

* README changing default helm install


Former-commit-id: b4890f2

* README changing default helm install

* Add a boilerplate.go.txt for autogenrated files

Closes #143


Former-commit-id: 925c508

* Add a boilerplate.go.txt for autogenrated files

Closes #143

* kafka - don't scale beyond number of partitions


Former-commit-id: 0eaa52c

* kafka - don't scale beyond number of partitions

* Fix scaler tests error re: default connection setting change


Former-commit-id: cfb8f17

* Fix scaler tests error re: default connection setting change

* don't scale if deployment is nil


Former-commit-id: a6cf946

* don't scale if deployment is nil

* return before iterating scalers if deployment is nil


Former-commit-id: a44eeba

* return before iterating scalers if deployment is nil

* Use configmap/secret key names instead of env var name in function calls


Former-commit-id: da0007f

* Use configmap/secret key names instead of env var name in function calls

* Update README.md

Former-commit-id: a538f81

* Update README.md

* updating README


Former-commit-id: 2d321a9

* updating README

* replacing image


Former-commit-id: aaf0a08

* replacing image

* Emphasize Project Status (#163)



Former-commit-id: a5b86b2

* Emphasize Project Status (#163)

* Tweak logo so it works on white backgrounds. (#174)



Former-commit-id: 5efba2a

* Tweak logo so it works on white backgrounds. (#174)

* fixing example labels


Former-commit-id: fb81ff7

* fixing example labels

* prevent hpa from getting 0 for MinReplicaCount


Former-commit-id: a7f8ff0

* prevent hpa from getting 0 for MinReplicaCount

* Configure Kubevalidator


Former-commit-id: 97d6dbc

* Configure Kubevalidator

* fix update ScaledObject


Former-commit-id: 62cad00

* fix update ScaledObject

* fix typos (#183)

* Update scaler.go

fix typo

* Update kafka_scaler.go

fix typo

* Update rabbitmq_scaler.go

fix typo

* Update kafka_scaler.go

resolve requested change


Former-commit-id: 7d31b26

* fix typos (#183)

* Update scaler.go

fix typo

* Update kafka_scaler.go

fix typo

* Update rabbitmq_scaler.go

fix typo

* Update kafka_scaler.go

resolve requested change

* update scaledObject


Former-commit-id: b1ea9ca

* update scaledObject

*  Fix glob for examples for Kubevalidator (#193)

Fix glob for examples for Kubevalidator but it won't be able to validate our own objects.

Kudos to @zach-dunton-sf for pointing this out


Former-commit-id: 7eead79

*  Fix glob for examples for Kubevalidator (#193)

Fix glob for examples for Kubevalidator but it won't be able to validate our own objects.

Kudos to @zach-dunton-sf for pointing this out

* Update kubevalidator.yaml

Former-commit-id: efa7f9d

* Update kubevalidator.yaml

* Improved kafka scaler debug log


Former-commit-id: 09018fe

* Improved kafka scaler debug log

* simplify


Former-commit-id: aba95d0

* simplify

* Updating README meeting timezone


Former-commit-id: cab3cde

* Updating README meeting timezone

* Multiple logos


Former-commit-id: cebce8f

* Multiple logos

* Add Keda Logo to Helm Chart


Former-commit-id: 291169f

* Add Keda Logo to Helm Chart

* Basic Helm chart README (#207)



Former-commit-id: 0e41b2d

* Basic Helm chart README (#207)

* Add singular and short names to scaledobject spec (#211)

* Add singular and short names to scaledobject spec

* Delete second shortname


Former-commit-id: 0feeec0

* Add singular and short names to scaledobject spec (#211)

* Add singular and short names to scaledobject spec

* Delete second shortname

* Feature/add sqs scaler (#191)

* Initial sqs scaler

* Update aws_sqs_queue_scaler.go

* Add AWS SQS scaler.

* Add example AwsSqsQueue Scaled Object.

* Update Aws Sqs Scaler test.

* Add vedored libraries


Former-commit-id: 11ff32f

* Feature/add sqs scaler (#191)

* Initial sqs scaler

* Update aws_sqs_queue_scaler.go

* Add AWS SQS scaler.

* Add example AwsSqsQueue Scaled Object.

* Update Aws Sqs Scaler test.

* Add vedored libraries

* fix sqs auth, remove hardcoded region


Former-commit-id: 3cef947

* fix sqs auth, remove hardcoded region

* fixed sqs test


Former-commit-id: c4cc7bc

* fixed sqs test

* fix scale from zero when minReplicaCount is 0 (#227)



Former-commit-id: 8feec5f

* fix scale from zero when minReplicaCount is 0 (#227)

* update kafka test with latst metadata (#229)



Former-commit-id: fc43cee

* update kafka test with latst metadata (#229)

* allow multiple containers on deployment


Former-commit-id: d7853e5

* allow multiple containers on deployment

* handle nil container if containerName not found


Former-commit-id: 57bb8af

* handle nil container if containerName not found

* Updated scaledobject.yaml with containerName

Former-commit-id: a8598b9

* Updated scaledobject.yaml with containerName

* Update pkg/scalers/aws_sqs_queue_scaler.go

Co-Authored-By: Ahmed ElSayed <[email protected]>
Former-commit-id: 0352b3a

* Update pkg/scalers/aws_sqs_queue_scaler.go

Co-Authored-By: Ahmed ElSayed <[email protected]>

* tab fix, remove redundant check


Former-commit-id: f9174a4

* tab fix, remove redundant check

* Read me updates for minikube, HPA


Former-commit-id: adcf29a

* Read me updates for minikube, HPA

* Update README.md

Co-Authored-By: Jeff Hollan <[email protected]>
Former-commit-id: 89375aa

* Update README.md

Co-Authored-By: Jeff Hollan <[email protected]>

* Update README.md

Co-Authored-By: Jeff Hollan <[email protected]>
Former-commit-id: ec7dc52

* Update README.md

Co-Authored-By: Jeff Hollan <[email protected]>

* Add prometheus scaler (#252)

* Add prometheus scaler

* remove disableScaleToZero


Former-commit-id: 35c0cc4

* Add prometheus scaler (#252)

* Add prometheus scaler

* remove disableScaleToZero

* enable ARM deployment


Former-commit-id: 592932f

* enable ARM deployment

* Custom metrics API service removal and external metrics list (#258)

* Removing custom metrics adapater and adding external metrics list.

* Change refCount check to ==0 instead of <=0 for removal from ExternalMetricsList


Former-commit-id: 0cf7c44

* Custom metrics API service removal and external metrics list (#258)

* Removing custom metrics adapater and adding external metrics list.

* Change refCount check to ==0 instead of <=0 for removal from ExternalMetricsList

* fix aws sqs test (#256)



Former-commit-id: 47d62a6

* fix aws sqs test (#256)

* HPA creation retry when scaled object validation fails


Former-commit-id: 2d6e2d7

* HPA creation retry when scaled object validation fails

* Provide list of configuration values for Helm Chart

Signed-off-by: Tom Kerkhove <[email protected]>

Former-commit-id: 0aea9da

* Provide list of configuration values for Helm Chart

Signed-off-by: Tom Kerkhove <[email protected]>

* Added Redis scaler (#266)

* Added redis scaler

* Made changes for review comments

* Added doc strings to methods in redis scaler


Former-commit-id: 793440c

* Added Redis scaler (#266)

* Added redis scaler

* Made changes for review comments

* Added doc strings to methods in redis scaler

* Added test for RabbitMQ


Former-commit-id: ff8f43f

* Added test for RabbitMQ

* RabbitMQ Tests - Fixed hostname


Former-commit-id: c68247c

* RabbitMQ Tests - Fixed hostname

* Fix tests


Former-commit-id: 6528b8c

* Fix tests

* Enable CI unit tests


Former-commit-id: 4e27a75

* Enable CI unit tests

* Remove imagePullSecrets (#272)



Former-commit-id: 0cb6bfd

* Remove imagePullSecrets (#272)

* Update README.md

Former-commit-id: 66efaa3

* Update README.md

* Scaler for Azure Event Hub (#259)

* Created scaler for Azure Event Hub

* Returned more errors and created Azure Storage file

* Change image back to kedacore

* Small changes to code

* Read blob storage set up by eventhub-triggered functions

* Change trigger name back to azure-eventhub

* Change event hub metadata setting and constants, other minor changes

* Change ParseStorageConnection method to also parse endpoint protocol and endpoint suffix

* Create example yaml for event hub scaled object

* Add comments


Former-commit-id: 98b50e7

* Scaler for Azure Event Hub (#259)

* Created scaler for Azure Event Hub

* Returned more errors and created Azure Storage file

* Change image back to kedacore

* Small changes to code

* Read blob storage set up by eventhub-triggered functions

* Change trigger name back to azure-eventhub

* Change event hub metadata setting and constants, other minor changes

* Change ParseStorageConnection method to also parse endpoint protocol and endpoint suffix

* Create example yaml for event hub scaled object

* Add comments

* Event Hub Scaler (#276)

* Created scaler for Azure Event Hub

* Returned more errors and created Azure Storage file

* Change image back to kedacore

* Small changes to code

* Read blob storage set up by eventhub-triggered functions

* Change trigger name back to azure-eventhub

* Change event hub metadata setting and constants, other minor changes

* Change ParseStorageConnection method to also parse endpoint protocol and endpoint suffix

* Create example yaml for event hub scaled object

* Add comments

* Comment out test


Former-commit-id: 526d109

* Event Hub Scaler (#276)

* Created scaler for Azure Event Hub

* Returned more errors and created Azure Storage file

* Change image back to kedacore

* Small changes to code

* Read blob storage set up by eventhub-triggered functions

* Change trigger name back to azure-eventhub

* Change event hub metadata setting and constants, other minor changes

* Change ParseStorageConnection method to also parse endpoint protocol and endpoint suffix

* Create example yaml for event hub scaled object

* Add comments

* Comment out test

* Update tests that use ParseStorageConnectionString (#277)

* Created scaler for Azure Event Hub

* Returned more errors and created Azure Storage file

* Change image back to kedacore

* Small changes to code

* Read blob storage set up by eventhub-triggered functions

* Change trigger name back to azure-eventhub

* Change event hub metadata setting and constants, other minor changes

* Change ParseStorageConnection method to also parse endpoint protocol and endpoint suffix

* Create example yaml for event hub scaled object

* Add comments

* Comment out test

* Update tests that use ParseStorageConnectionString


Former-commit-id: 69b3810

* Update tests that use ParseStorageConnectionString (#277)

* Created scaler for Azure Event Hub

* Returned more errors and created Azure Storage file

* Change image back to kedacore

* Small changes to code

* Read blob storage set up by eventhub-triggered functions

* Change trigger name back to azure-eventhub

* Change event hub metadata setting and constants, other minor changes

* Change ParseStorageConnection method to also parse endpoint protocol and endpoint suffix

* Create example yaml for event hub scaled object

* Add comments

* Comment out test

* Update tests that use ParseStorageConnectionString

* Update return values for ParseAzureStorageConnectionString (#278)

* Created scaler for Azure Event Hub

* Returned more errors and created Azure Storage file

* Change image back to kedacore

* Small changes to code

* Read blob storage set up by eventhub-triggered functions

* Change trigger name back to azure-eventhub

* Change event hub metadata setting and constants, other minor changes

* Change ParseStorageConnection method to also parse endpoint protocol and endpoint suffix

* Create example yaml for event hub scaled object

* Add comments

* Comment out test

* Update tests that use ParseStorageConnectionString

* Update return values for ParseAzureStorageConnectionString


Former-commit-id: 8a2ba37

* Update return values for ParseAzureStorageConnectionString (#278)

* Created scaler for Azure Event Hub

* Returned more errors and created Azure Storage file

* Change image back to kedacore

* Small changes to code

* Read blob storage set up by eventhub-triggered functions

* Change trigger name back to azure-eventhub

* Change event hub metadata setting and constants, other minor changes

* Change ParseStorageConnection method to also parse endpoint protocol and endpoint suffix

* Create example yaml for event hub scaled object

* Add comments

* Comment out test

* Update tests that use ParseStorageConnectionString

* Update return values for ParseAzureStorageConnectionString

* Update E2E tests (#271)



Former-commit-id: d8a8ef7

* Update E2E tests (#271)

* Update README.md with Redis (#273)



Former-commit-id: a759e6b

* Update README.md with Redis (#273)

* Update AWS authentication methods (#280)



Former-commit-id: 63cd04f

* Update AWS authentication methods (#280)

* Enable builds for forks (#285)



Former-commit-id: fbc1773

* Enable builds for forks (#285)

* Scaler for aws cloudwatch (#282)

* Adding cloudwatch metric as a scaler

* Adding examples and fixing the error on null metrics received

* Updating the Gopkg, removing redundant logs and adding tests

* Updating the formatting for the test data

* Renaming region to awsRegion

* Updating the default region name and value


Former-commit-id: 9a6591a

* Scaler for aws cloudwatch (#282)

* Adding cloudwatch metric as a scaler

* Adding examples and fixing the error on null metrics received

* Updating the Gopkg, removing redundant logs and adding tests

* Updating the formatting for the test data

* Renaming region to awsRegion

* Updating the default region name and value

* Update README.md

Former-commit-id: 2ef94c1

* Update README.md

* Update config.yml

Former-commit-id: 21a562e

* Update config.yml

* Fix SQS tests (#283)

* Fix SQS tests

* Add missing vendor files


Former-commit-id: 382583e

* Fix SQS tests (#283)

* Fix SQS tests

* Add missing vendor files

* Update config.yml

Former-commit-id: c380a5b

* Update config.yml

* Update README.md

Former-commit-id: 81cb843

* Update README.md

* Update README.md

Adding that EventHub is supported now.

Former-commit-id: f81b92b

* Update README.md

Adding that EventHub is supported now.

* Scaler for GCP PubSub (#274)

* Added GCP PubSub scaler

* Added GCP PubSub scaler

* Ran dep ensure again after merging with GCP Scaler


Former-commit-id: c7dc513

* Scaler for GCP PubSub (#274)

* Added GCP PubSub scaler

* Added GCP PubSub scaler

* Ran dep ensure again after merging with GCP Scaler

* Azure AD Pod Identity support for Azure Storage Queue (#275)

* Added Azure Pod Identity support for Azure Queues

* Pod Identity for Azure Queues - Updated Keda Helm chart to include labels on pods

* Fixed GetAzureQueueLength to call new ParseAzureStorageConnectionString method

* Fixed tests for AAd Pod Identity Metadata checks

* Removed test_cases directory


Former-commit-id: ba76e15

* Azure AD Pod Identity support for Azure Storage Queue (#275)

* Added Azure Pod Identity support for Azure Queues

* Pod Identity for Azure Queues - Updated Keda Helm chart to include labels on pods

* Fixed GetAzureQueueLength to call new ParseAzureStorageConnectionString method

* Fixed tests for AAd Pod Identity Metadata checks

* Removed test_cases directory

* Eventhub scaler test (#286)



Former-commit-id: f051953

* Eventhub scaler test (#286)

* Add GCP PubSub and alphabetize scalers list

Former-commit-id: 731f576

* Add GCP PubSub and alphabetize scalers list

* set umask value explicitly for MkdirAll (#290)

- set umask value explicitly for MkdirAll
- pass a file to UploadFileToBlockBob instead of a dir

Former-commit-id: 864ddb5

* set umask value explicitly for MkdirAll (#290)

- set umask value explicitly for MkdirAll
- pass a file to UploadFileToBlockBob instead of a dir

* Bump lodash from 4.17.11 to 4.17.14 in /tests (#291)

Bumps [lodash](https://github.com/lodash/lodash) from 4.17.11 to 4.17.14.
- [Release notes](https://github.com/lodash/lodash/releases)
- [Commits](lodash/lodash@4.17.11...4.17.14)

Signed-off-by: dependabot[bot] <[email protected]>
Former-commit-id: 2cac44f

* Fix variable shadow in azure_queue.go (#292)



Former-commit-id: 87817e9

* Added external scaler

* Added example for external scaler

* Fixed conflict in Azure queue

* Fixed Merge conflict in scale handler

* Added tests for external scaler

* External Scaler - Added TLS Support

* External Scaler - Fixed comments

* Proposal - Move scaler specifications to GitHub repo instead of wiki (#304)

* Migrate wiki to repo

Signed-off-by: Tom Kerkhove <[email protected]>

* Provide spec intro + seperate sample

Signed-off-by: Tom Kerkhove <[email protected]>

* Link to triggers from ScaledObject spec

Signed-off-by: Tom Kerkhove <[email protected]>

* Finalize first version

Signed-off-by: Tom Kerkhove <[email protected]>

* Changed serviceURI to scalerAddress. Fixed gRPC connection close

* Fixed tests after changing serviceURI to scalerAddress
  • Loading branch information
patnaikshekhar authored and jeffhollan committed Sep 19, 2019
1 parent 7646924 commit 906de58
Show file tree
Hide file tree
Showing 8 changed files with 990 additions and 3 deletions.
11 changes: 11 additions & 0 deletions chart/keda/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,14 @@ spec:
name: https
- containerPort: 8080
name: http
{{- if .Values.grpcTLSCertsSecret }}
volumeMounts:
- name: grpc-certs
mountPath: /grpccerts
{{- end }}
{{- if .Values.grpcTLSCertsSecret }}
volumes:
- name: grpc-certs
secret:
secretName: {{ .Values.grpcTLSCertsSecret }}
{{- end }}
7 changes: 6 additions & 1 deletion chart/keda/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,9 @@ glogLevel: 2

# Set to the value of the Azure Active Directory Pod Identity
# This will be set as a label on the Keda Pod(s)
aadPodIdentity: ""
aadPodIdentity: ""

# Set this if you are using an external scaler and want to communicate
# over TLS (recommended). This variable holds the name of the secret that
# will be mounted to the /grpccerts path on the Pod
grpcTLSCertsSecret: ""
18 changes: 18 additions & 0 deletions examples/externalscaler_scaledobject.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
apiVersion: keda.k8s.io/v1alpha1
kind: ScaledObject
metadata:
name: redis-scaledobject
namespace: keda-redis-test
labels:
deploymentName: keda-redis-node
spec:
scaleTargetRef:
deploymentName: keda-redis-node
triggers:
- type: external
metadata:
scalerAddress: redis-external-scaler-service:8080
address: REDIS_HOST # Required host:port format
password: REDIS_PASSWORD
listName: mylist # Required
listLength: "5" # Required
6 changes: 4 additions & 2 deletions pkg/handler/scale_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -595,7 +595,7 @@ func (h *ScaleHandler) getScalers(scaledObject *keda_v1alpha1.ScaledObject) ([]s
}

for i, trigger := range scaledObject.Spec.Triggers {
scaler, err := h.getScaler(trigger, resolvedEnv)
scaler, err := h.getScaler(scaledObject, trigger, resolvedEnv)
if err != nil {
return scalers, nil, fmt.Errorf("error getting scaler for trigger #%d: %s", i, err)
}
Expand All @@ -606,7 +606,7 @@ func (h *ScaleHandler) getScalers(scaledObject *keda_v1alpha1.ScaledObject) ([]s
return scalers, deployment, nil
}

func (h *ScaleHandler) getScaler(trigger keda_v1alpha1.ScaleTriggers, resolvedEnv map[string]string) (scalers.Scaler, error) {
func (h *ScaleHandler) getScaler(scaledObject *keda_v1alpha1.ScaledObject, trigger keda_v1alpha1.ScaleTriggers, resolvedEnv map[string]string) (scalers.Scaler, error) {
switch trigger.Type {
case "azure-queue":
return scalers.NewAzureQueueScaler(resolvedEnv, trigger.Metadata)
Expand All @@ -628,6 +628,8 @@ func (h *ScaleHandler) getScaler(trigger keda_v1alpha1.ScaleTriggers, resolvedEn
return scalers.NewRedisScaler(resolvedEnv, trigger.Metadata)
case "gcp-pubsub":
return scalers.NewPubSubScaler(resolvedEnv, trigger.Metadata)
case "external":
return scalers.NewExternalScaler(scaledObject, resolvedEnv, trigger.Metadata)
case "liiklus":
return scalers.NewLiiklusScaler(resolvedEnv, trigger.Metadata)
default:
Expand Down
218 changes: 218 additions & 0 deletions pkg/scalers/external_scaler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
package scalers

import (
"context"
"fmt"

log "github.com/Sirupsen/logrus"
keda_v1alpha1 "github.com/kedacore/keda/pkg/apis/keda/v1alpha1"
pb "github.com/kedacore/keda/pkg/scalers/externalscaler"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
v2beta1 "k8s.io/api/autoscaling/v2beta1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/metrics/pkg/apis/external_metrics"
)

type externalScaler struct {
metadata *externalScalerMetadata
scaledObjectRef pb.ScaledObjectRef
grpcClient pb.ExternalScalerClient
grpcConnection *grpc.ClientConn
}

type externalScalerMetadata struct {
scalerAddress string
tlsCertFile string
metadata map[string]string
}

// NewExternalScaler creates a new external scaler - calls the GRPC interface
// to create a new scaler
func NewExternalScaler(scaledObject *keda_v1alpha1.ScaledObject, resolvedEnv, metadata map[string]string) (Scaler, error) {

meta, err := parseExternalScalerMetadata(metadata, resolvedEnv)
if err != nil {
return nil, fmt.Errorf("error parsing external scaler metadata: %s", err)
}

scaler := &externalScaler{
metadata: meta,
scaledObjectRef: pb.ScaledObjectRef{
Name: scaledObject.Name,
Namespace: scaledObject.Namespace,
},
}

// TODO: Pass Context
ctx := context.Background()

// Call GRPC Interface to parse metadata
err = scaler.getGRPCClient()
if err != nil {
return nil, err
}

request := &pb.NewRequest{
ScaledObjectRef: &scaler.scaledObjectRef,
Metadata: scaler.metadata.metadata,
}

_, err = scaler.grpcClient.New(ctx, request)
if err != nil {
return nil, err
}

return scaler, nil
}

func parseExternalScalerMetadata(metadata, resolvedEnv map[string]string) (*externalScalerMetadata, error) {
meta := externalScalerMetadata{}

// Check if scalerAddress is present
if val, ok := metadata["scalerAddress"]; ok && val != "" {
meta.scalerAddress = val
} else {
return nil, fmt.Errorf("Scaler Address is a required field")
}

if val, ok := metadata["tlsCertFile"]; ok && val != "" {
meta.tlsCertFile = val
}

meta.metadata = make(map[string]string)

// Add elements to metadata
for key, value := range metadata {
// Check if key is in resolved environment and resolve
if val, ok := resolvedEnv[value]; ok && val != "" {
meta.metadata[key] = val
} else {
meta.metadata[key] = value
}
}

return &meta, nil
}

// IsActive checks if there are any messages in the subscription
func (s *externalScaler) IsActive(ctx context.Context) (bool, error) {

// Call GRPC Interface to check if active
response, err := s.grpcClient.IsActive(ctx, &s.scaledObjectRef)
if err != nil {
log.Errorf("error %s", err)
return false, err
}

return response.Result, nil
}

func (s *externalScaler) Close() error {
// Call GRPC Interface to close connection

// TODO: Pass Context
ctx := context.Background()

_, err := s.grpcClient.Close(ctx, &s.scaledObjectRef)
if err != nil {
log.Errorf("error %s", err)
return err
}
defer s.grpcConnection.Close()

return nil
}

// GetMetricSpecForScaling returns the metric spec for the HPA
func (s *externalScaler) GetMetricSpecForScaling() []v2beta1.MetricSpec {

// TODO: Pass Context
ctx := context.Background()

// Call GRPC Interface to get metric specs
response, err := s.grpcClient.GetMetricSpec(ctx, &s.scaledObjectRef)
if err != nil {
log.Errorf("error %s", err)
return nil
}

var result []v2beta1.MetricSpec

for _, spec := range response.MetricSpecs {
// Construct the target subscription size as a quantity
qty := resource.NewQuantity(int64(spec.TargetSize), resource.DecimalSI)

externalMetric := &v2beta1.ExternalMetricSource{
MetricName: spec.MetricName,
TargetAverageValue: qty,
}

// Create the metric spec for the HPA
metricSpec := v2beta1.MetricSpec{
External: externalMetric,
Type: externalMetricType,
}

result = append(result, metricSpec)
}

return result
}

// GetMetrics connects calls the gRPC interface to get the metrics with a specific name
func (s *externalScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) {

var metrics []external_metrics.ExternalMetricValue
// Call GRPC Interface to get metric specs

request := &pb.GetMetricsRequest{
MetricName: metricName,
ScaledObjectRef: &s.scaledObjectRef,
}

response, err := s.grpcClient.GetMetrics(ctx, request)
if err != nil {
log.Errorf("error %s", err)
return []external_metrics.ExternalMetricValue{}, err
}

for _, metricResult := range response.MetricValues {
metric := external_metrics.ExternalMetricValue{
MetricName: metricResult.MetricName,
Value: *resource.NewQuantity(metricResult.MetricValue, resource.DecimalSI),
Timestamp: metav1.Now(),
}

metrics = append(metrics, metric)
}

return metrics, nil
}

// getGRPCClient creates a new gRPC client
func (s *externalScaler) getGRPCClient() error {

var err error

if s.metadata.tlsCertFile != "" {
certFile := fmt.Sprintf("/grpccerts/%s", s.metadata.tlsCertFile)
creds, err := credentials.NewClientTLSFromFile(certFile, "")
if err != nil {
return err
}
s.grpcConnection, err = grpc.Dial(s.metadata.scalerAddress, grpc.WithTransportCredentials(creds))
} else {
s.grpcConnection, err = grpc.Dial(s.metadata.scalerAddress, grpc.WithInsecure())
}

if err != nil {
return fmt.Errorf("cannot connect to external scaler over grpc interface: %s", err)
}

s.grpcClient = pb.NewExternalScalerClient(s.grpcConnection)

return nil
}
32 changes: 32 additions & 0 deletions pkg/scalers/external_scaler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package scalers

import (
"testing"
)

var testExternalScalerResolvedEnv map[string]string

type parseExternalScalerMetadataTestData struct {
metadata map[string]string
isError bool
}

var testExternalScalerMetadata = []parseExternalScalerMetadataTestData{
{map[string]string{}, true},
// all properly formed
{map[string]string{"scalerAddress": "myservice", "test1": "7", "test2": "SAMPLE_CREDS"}, false},
// missing scalerAddress
{map[string]string{"test1": "1", "test2": "SAMPLE_CREDS"}, true},
}

func TestExternalScalerParseMetadata(t *testing.T) {
for _, testData := range testExternalScalerMetadata {
_, err := parseExternalScalerMetadata(testData.metadata, testExternalScalerResolvedEnv)
if err != nil && !testData.isError {
t.Error("Expected success but got error", err)
}
if testData.isError && err == nil {
t.Error("Expected error but got success")
}
}
}
Loading

0 comments on commit 906de58

Please sign in to comment.