Skip to content

Commit

Permalink
GCP Pub/Sub Scaler: add oldest unacked message age metric (#2266)
Browse files Browse the repository at this point in the history
Signed-off-by: Friedrich Albert Kyuri <[email protected]>
  • Loading branch information
fira42073 authored Nov 23, 2021
1 parent 13dfde3 commit 2b43d40
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 45 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,15 @@
- Improve error message if `IdleReplicaCount` are equal to `MinReplicaCount` to be the same as the check ([#2212](https://github.com/kedacore/keda/pull/2212))
- Improve Cloudwatch Scaler metric exporting logic ([#2243](https://github.com/kedacore/keda/pull/2243))
- Refactor aws related scalers to reuse the aws clients instead of creating a new one for every GetMetrics call([#2255](https://github.com/kedacore/keda/pull/2255))
- GCP PubSub scaler may be used in SubscriptionSize and OldestUnackedMessageAge modes
- Cleanup metric names inside scalers ([#2260](https://github.com/kedacore/keda/pull/2260))
- Validating values length in prometheus query response ([#2264](https://github.com/kedacore/keda/pull/2264))
- Add `unsafeSsl` parameter in SeleniumGrid scaler ([#2157](https://github.com/kedacore/keda/pull/2157))

### Deprecations

- `subscriptionSize` is deprecated in favor of `mode` and `value` for GCP Pub/Sub scaler

### Breaking Changes

- TODO ([#XXX](https://github.com/kedacore/keda/pull/XXX))
Expand Down
143 changes: 104 additions & 39 deletions pkg/scalers/gcp_pub_sub_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ package scalers

import (
"context"
"errors"
"fmt"
"strconv"

v2beta2 "k8s.io/api/autoscaling/v2beta2"
"k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
Expand All @@ -17,8 +18,13 @@ import (
)

const (
defaultTargetSubscriptionSize = 5
pubSubStackDriverMetricName = "pubsub.googleapis.com/subscription/num_undelivered_messages"
defaultTargetSubscriptionSize = 5
defaultTargetOldestUnackedMessageAge = 10
pubSubStackDriverSubscriptionSizeMetricName = "pubsub.googleapis.com/subscription/num_undelivered_messages"
pubSubStackDriverOldestUnackedMessageAgeMetricName = "pubsub.googleapis.com/subscription/oldest_unacked_message_age"

pubsubModeSubscriptionSize = "SubscriptionSize"
pubsubModeOldestUnackedMessageAge = "OldestUnackedMessageAge"
)

type gcpAuthorizationMetadata struct {
Expand All @@ -33,10 +39,12 @@ type pubsubScaler struct {
}

type pubsubMetadata struct {
targetSubscriptionSize int
subscriptionName string
gcpAuthorization gcpAuthorizationMetadata
scalerIndex int
mode string
value int

subscriptionName string
gcpAuthorization gcpAuthorizationMetadata
scalerIndex int
}

var gcpPubSubLog = logf.Log.WithName("gcp_pub_sub_scaler")
Expand All @@ -55,15 +63,43 @@ func NewPubSubScaler(config *ScalerConfig) (Scaler, error) {

func parsePubSubMetadata(config *ScalerConfig) (*pubsubMetadata, error) {
meta := pubsubMetadata{}
meta.targetSubscriptionSize = defaultTargetSubscriptionSize
meta.mode = pubsubModeSubscriptionSize

if val, ok := config.TriggerMetadata["subscriptionSize"]; ok {
subscriptionSize, err := strconv.Atoi(val)
mode, modePresent := config.TriggerMetadata["mode"]
value, valuePresent := config.TriggerMetadata["value"]

if subSize, subSizePresent := config.TriggerMetadata["subscriptionSize"]; subSizePresent {
if modePresent || valuePresent {
return nil, errors.New("you can use either mode and value fields or subscriptionSize field")
}
gcpPubSubLog.Info("subscriptionSize field is deprecated. Use mode and value fields instead")
meta.mode = pubsubModeSubscriptionSize
subSizeValue, err := strconv.Atoi(subSize)
if err != nil {
return nil, fmt.Errorf("subscription Size parsing error %s", err.Error())
return nil, fmt.Errorf("value parsing error %s", err.Error())
}
meta.value = subSizeValue
} else {
if modePresent {
meta.mode = mode
}

switch meta.mode {
case pubsubModeSubscriptionSize:
meta.value = defaultTargetSubscriptionSize
case pubsubModeOldestUnackedMessageAge:
meta.value = defaultTargetOldestUnackedMessageAge
default:
return nil, fmt.Errorf("trigger mode %s must be one of %s, %s", meta.mode, pubsubModeSubscriptionSize, pubsubModeOldestUnackedMessageAge)
}

meta.targetSubscriptionSize = subscriptionSize
if valuePresent {
triggerValue, err := strconv.Atoi(value)
if err != nil {
return nil, fmt.Errorf("value parsing error %s", err.Error())
}
meta.value = triggerValue
}
}

if val, ok := config.TriggerMetadata["subscriptionName"]; ok {
Expand All @@ -87,14 +123,24 @@ func parsePubSubMetadata(config *ScalerConfig) (*pubsubMetadata, error) {

// IsActive checks if there are any messages in the subscription
func (s *pubsubScaler) IsActive(ctx context.Context) (bool, error) {
size, err := s.GetSubscriptionSize(ctx)

if err != nil {
gcpPubSubLog.Error(err, "error getting Active Status")
return false, err
switch s.metadata.mode {
case pubsubModeSubscriptionSize:
size, err := s.getMetrics(ctx, pubSubStackDriverSubscriptionSizeMetricName)
if err != nil {
gcpPubSubLog.Error(err, "error getting Active Status")
return false, err
}
return size > 0, nil
case pubsubModeOldestUnackedMessageAge:
_, err := s.getMetrics(ctx, pubSubStackDriverOldestUnackedMessageAgeMetricName)
if err != nil {
gcpPubSubLog.Error(err, "error getting Active Status")
return false, err
}
return true, nil
default:
return false, errors.New("unknown mode")
}

return size > 0, nil
}

func (s *pubsubScaler) Close(context.Context) error {
Expand All @@ -111,16 +157,16 @@ func (s *pubsubScaler) Close(context.Context) error {

// GetMetricSpecForScaling returns the metric spec for the HPA
func (s *pubsubScaler) GetMetricSpecForScaling(context.Context) []v2beta2.MetricSpec {
// Construct the target subscription size as a quantity
targetSubscriptionSizeQty := resource.NewQuantity(int64(s.metadata.targetSubscriptionSize), resource.DecimalSI)
// Construct the target value as a quantity
targetValueQty := resource.NewQuantity(int64(s.metadata.value), resource.DecimalSI)

externalMetric := &v2beta2.ExternalMetricSource{
Metric: v2beta2.MetricIdentifier{
Name: GenerateMetricNameWithIndex(s.metadata.scalerIndex, kedautil.NormalizeString(fmt.Sprintf("gcp-ps-%s", s.metadata.subscriptionName))),
},
Target: v2beta2.MetricTarget{
Type: v2beta2.AverageValueMetricType,
AverageValue: targetSubscriptionSizeQty,
AverageValue: targetValueQty,
},
}

Expand All @@ -135,40 +181,59 @@ func (s *pubsubScaler) GetMetricSpecForScaling(context.Context) []v2beta2.Metric

// GetMetrics connects to Stack Driver and finds the size of the pub sub subscription
func (s *pubsubScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
size, err := s.GetSubscriptionSize(ctx)
var value int64
var err error

if err != nil {
gcpPubSubLog.Error(err, "error getting subscription size")
return []external_metrics.ExternalMetricValue{}, err
switch s.metadata.mode {
case pubsubModeSubscriptionSize:
value, err = s.getMetrics(ctx, pubSubStackDriverSubscriptionSizeMetricName)
if err != nil {
gcpPubSubLog.Error(err, "error getting subscription size")
return []external_metrics.ExternalMetricValue{}, err
}
case pubsubModeOldestUnackedMessageAge:
value, err = s.getMetrics(ctx, pubSubStackDriverOldestUnackedMessageAgeMetricName)
if err != nil {
gcpPubSubLog.Error(err, "error getting oldest unacked message age")
return []external_metrics.ExternalMetricValue{}, err
}
}

metric := external_metrics.ExternalMetricValue{
MetricName: metricName,
Value: *resource.NewQuantity(size, resource.DecimalSI),
Value: *resource.NewQuantity(value, resource.DecimalSI),
Timestamp: metav1.Now(),
}

return append([]external_metrics.ExternalMetricValue{}, metric), nil
}

// GetSubscriptionSize gets the number of messages in a subscription by calling the
// Stackdriver api
func (s *pubsubScaler) GetSubscriptionSize(ctx context.Context) (int64, error) {
func (s *pubsubScaler) setStackdriverClient(ctx context.Context) error {
var client *StackDriverClient
var err error
if s.metadata.gcpAuthorization.podIdentityProviderEnabled {
client, err = NewStackDriverClientPodIdentity(ctx)
} else {
client, err = NewStackDriverClient(ctx, s.metadata.gcpAuthorization.GoogleApplicationCredentials)
}

if err != nil {
return err
}
s.client = client
return nil
}

// getMetrics gets metric type value from stackdriver api
func (s *pubsubScaler) getMetrics(ctx context.Context, metricType string) (int64, error) {
if s.client == nil {
var client *StackDriverClient
var err error
if s.metadata.gcpAuthorization.podIdentityProviderEnabled {
client, err = NewStackDriverClientPodIdentity(ctx)
} else {
client, err = NewStackDriverClient(ctx, s.metadata.gcpAuthorization.GoogleApplicationCredentials)
}
err := s.setStackdriverClient(ctx)
if err != nil {
return -1, err
}
s.client = client
}

filter := `metric.type="` + pubSubStackDriverMetricName + `" AND resource.labels.subscription_id="` + s.metadata.subscriptionName + `"`
filter := `metric.type="` + metricType + `" AND resource.labels.subscription_id="` + s.metadata.subscriptionName + `"`

return s.client.GetMetrics(ctx, filter)
}
Expand Down
18 changes: 12 additions & 6 deletions pkg/scalers/gcp_pubsub_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,24 @@ type gcpPubSubMetricIdentifier struct {

var testPubSubMetadata = []parsePubSubMetadataTestData{
{map[string]string{}, map[string]string{}, true},
// all properly formed
// all properly formed with deprecated field
{nil, map[string]string{"subscriptionName": "mysubscription", "subscriptionSize": "7", "credentialsFromEnv": "SAMPLE_CREDS"}, false},
// all properly formed
{nil, map[string]string{"subscriptionName": "mysubscription", "value": "7", "credentialsFromEnv": "SAMPLE_CREDS"}, false},
// all properly formed with oldest unacked message age mode
{nil, map[string]string{"subscriptionName": "mysubscription", "mode": pubsubModeOldestUnackedMessageAge, "value": "7", "credentialsFromEnv": "SAMPLE_CREDS"}, false},
// missing subscriptionName
{nil, map[string]string{"subscriptionName": "", "subscriptionSize": "7", "credentialsFromEnv": "SAMPLE_CREDS"}, true},
{nil, map[string]string{"subscriptionName": "", "value": "7", "credentialsFromEnv": "SAMPLE_CREDS"}, true},
// missing credentials
{nil, map[string]string{"subscriptionName": "mysubscription", "subscriptionSize": "7", "credentialsFromEnv": ""}, true},
{nil, map[string]string{"subscriptionName": "mysubscription", "value": "7", "credentialsFromEnv": ""}, true},
// malformed subscriptionSize
{nil, map[string]string{"subscriptionName": "mysubscription", "subscriptionSize": "AA", "credentialsFromEnv": "SAMPLE_CREDS"}, true},
{nil, map[string]string{"subscriptionName": "mysubscription", "value": "AA", "credentialsFromEnv": "SAMPLE_CREDS"}, true},
// malformed mode
{nil, map[string]string{"subscriptionName": "", "mode": "AA", "value": "7", "credentialsFromEnv": "SAMPLE_CREDS"}, true},
// Credentials from AuthParams
{map[string]string{"GoogleApplicationCredentials": "Creds", "podIdentityOwner": ""}, map[string]string{"subscriptionName": "mysubscription", "subscriptionSize": "7"}, false},
{map[string]string{"GoogleApplicationCredentials": "Creds", "podIdentityOwner": ""}, map[string]string{"subscriptionName": "mysubscription", "value": "7"}, false},
// Credentials from AuthParams with empty creds
{map[string]string{"GoogleApplicationCredentials": "", "podIdentityOwner": ""}, map[string]string{"subscriptionName": "mysubscription", "subscriptionSize": "7"}, true},
{map[string]string{"GoogleApplicationCredentials": "", "podIdentityOwner": ""}, map[string]string{"subscriptionName": "mysubscription", "value": "7"}, true},
}

var gcpPubSubMetricIdentifiers = []gcpPubSubMetricIdentifier{
Expand Down

0 comments on commit 2b43d40

Please sign in to comment.