Skip to content

Commit

Permalink
feat: Support regex usage in Azure Service Bus scaler. (#3607)
Browse files Browse the repository at this point in the history
  • Loading branch information
v-shenoy authored Sep 30, 2022
1 parent 1c4d314 commit 344e468
Show file tree
Hide file tree
Showing 14 changed files with 645 additions and 25 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio
### New

- **General**: Provide Prometheus metric with indication of total number of triggers per trigger type in `ScaledJob`/`ScaledObject`. ([#3663](https://github.com/kedacore/keda/issues/3663))
- **Azure Service Bus**: Add support for Shared Access Signature (SAS) tokens for authentication. ([#2920](https://github.com/kedacore/keda/issues/2920))
- **Azure Service Bus Scaler**: Add support for Shared Access Signature (SAS) tokens for authentication. ([#2920](https://github.com/kedacore/keda/issues/2920))
- **Azure Service Bus Scaler:** Support regex usage in queueName / subscriptionName parameters. ([#1624](https://github.com/kedacore/keda/issues/1624))

### Improvements

Expand Down
147 changes: 131 additions & 16 deletions pkg/scalers/azure_servicebus_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ limitations under the License.
import (
"context"
"fmt"
"regexp"
"strconv"

"github.com/Azure/azure-sdk-for-go/sdk/azcore"
Expand Down Expand Up @@ -66,6 +67,9 @@ type azureServiceBusMetadata struct {
connection string
entityType entityType
fullyQualifiedNamespace string
useRegex bool
entityNameRegex *regexp.Regexp
operation string
scalerIndex int
}

Expand Down Expand Up @@ -118,6 +122,28 @@ func parseAzureServiceBusMetadata(config *ScalerConfig, logger logr.Logger) (*az
meta.activationTargetLength = activationMessageCount
}

meta.useRegex = false
if val, ok := config.TriggerMetadata["useRegex"]; ok {
useRegex, err := strconv.ParseBool(val)
if err != nil {
return nil, fmt.Errorf("useRegex has invalid value")
}
meta.useRegex = useRegex
}

meta.operation = sumOperation
if meta.useRegex {
if val, ok := config.TriggerMetadata["operation"]; ok {
meta.operation = val
}

switch meta.operation {
case avgOperation, maxOperation, sumOperation:
default:
return nil, fmt.Errorf("operation must be one of avg, max, or sum")
}
}

// get queue name OR topic and subscription name & set entity type accordingly
if val, ok := config.TriggerMetadata["queueName"]; ok {
meta.queueName = val
Expand All @@ -126,6 +152,16 @@ func parseAzureServiceBusMetadata(config *ScalerConfig, logger logr.Logger) (*az
if _, ok := config.TriggerMetadata["subscriptionName"]; ok {
return nil, fmt.Errorf("subscription name provided with queue name")
}

if meta.useRegex {
entityNameRegex, err := regexp.Compile(meta.queueName)
if err != nil {
return nil, fmt.Errorf("queueName is not a valid regular expression")
}
entityNameRegex.Longest()

meta.entityNameRegex = entityNameRegex
}
}

if val, ok := config.TriggerMetadata["topicName"]; ok {
Expand All @@ -140,6 +176,16 @@ func parseAzureServiceBusMetadata(config *ScalerConfig, logger logr.Logger) (*az
} else {
return nil, fmt.Errorf("no subscription name provided with topic name")
}

if meta.useRegex {
entityNameRegex, err := regexp.Compile(meta.subscriptionName)
if err != nil {
return nil, fmt.Errorf("subscriptionName is not a valid regular expression")
}
entityNameRegex.Longest()

meta.entityNameRegex = entityNameRegex
}
}
if meta.entityType == none {
return nil, fmt.Errorf("no service bus entity type set")
Expand Down Expand Up @@ -200,10 +246,18 @@ func (s *azureServiceBusScaler) Close(context.Context) error {
// Returns the metric spec to be used by the HPA
func (s *azureServiceBusScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec {
metricName := ""

var entityType string
if s.metadata.entityType == queue {
metricName = s.metadata.queueName
entityType = "queue"
} else {
metricName = s.metadata.topicName
entityType = "topic"
}

if s.metadata.useRegex {
metricName = fmt.Sprintf("%s-regex", entityType)
}

externalMetric := &v2.ExternalMetricSource{
Expand Down Expand Up @@ -240,9 +294,9 @@ func (s *azureServiceBusScaler) getAzureServiceBusLength(ctx context.Context) (i
// switch case for queue vs topic here
switch s.metadata.entityType {
case queue:
return getQueueLength(ctx, adminClient, s.metadata.queueName)
return getQueueLength(ctx, adminClient, s.metadata)
case subscription:
return getSubscriptionLength(ctx, adminClient, s.metadata.topicName, s.metadata.subscriptionName)
return getSubscriptionLength(ctx, adminClient, s.metadata)
default:
return -1, fmt.Errorf("no entity type")
}
Expand Down Expand Up @@ -303,26 +357,87 @@ func (s *azureServiceBusScaler) getServiceBusAdminClient(ctx context.Context) (*
return adminClient, nil
}

func getQueueLength(ctx context.Context, adminClient *admin.Client, queueName string) (int64, error) {
queueEntity, err := adminClient.GetQueueRuntimeProperties(ctx, queueName, &admin.GetQueueRuntimePropertiesOptions{})
if err != nil {
return -1, err
func getQueueLength(ctx context.Context, adminClient *admin.Client, meta *azureServiceBusMetadata) (int64, error) {
if !meta.useRegex {
queueEntity, err := adminClient.GetQueueRuntimeProperties(ctx, meta.queueName, &admin.GetQueueRuntimePropertiesOptions{})
if err != nil {
return -1, err
}
if queueEntity == nil {
return -1, fmt.Errorf("queue %s doesn't exist", meta.queueName)
}

return int64(queueEntity.ActiveMessageCount), nil
}
if queueEntity == nil {
return -1, fmt.Errorf("queue %s doesn't exist", queueName)

messageCounts := make([]int64, 0)

queuePager := adminClient.NewListQueuesRuntimePropertiesPager(nil)
for queuePager.More() {
page, err := queuePager.NextPage(ctx)
if err != nil {
return -1, err
}

for _, queue := range page.QueueRuntimeProperties {
if meta.entityNameRegex.FindString(queue.QueueName) == queue.QueueName {
messageCounts = append(messageCounts, int64(queue.ActiveMessageCount))
}
}
}

return int64(queueEntity.ActiveMessageCount), nil
return performOperation(messageCounts, meta.operation), nil
}

func getSubscriptionLength(ctx context.Context, adminClient *admin.Client, topicName, subscriptionName string) (int64, error) {
subscriptionEntity, err := adminClient.GetSubscriptionRuntimeProperties(ctx, topicName, subscriptionName, &admin.GetSubscriptionRuntimePropertiesOptions{})
if err != nil {
return -1, err
func getSubscriptionLength(ctx context.Context, adminClient *admin.Client, meta *azureServiceBusMetadata) (int64, error) {
if !meta.useRegex {
subscriptionEntity, err := adminClient.GetSubscriptionRuntimeProperties(ctx, meta.topicName, meta.subscriptionName,
&admin.GetSubscriptionRuntimePropertiesOptions{})
if err != nil {
return -1, err
}
if subscriptionEntity == nil {
return -1, fmt.Errorf("subscription %s doesn't exist in topic %s", meta.subscriptionName, meta.topicName)
}

return int64(subscriptionEntity.ActiveMessageCount), nil
}
if subscriptionEntity == nil {
return -1, fmt.Errorf("subscription %s doesn't exist in topic %s", subscriptionName, topicName)

messageCounts := make([]int64, 0)

subscriptionPager := adminClient.NewListSubscriptionsRuntimePropertiesPager(meta.topicName, nil)
for subscriptionPager.More() {
page, err := subscriptionPager.NextPage(ctx)
if err != nil {
return -1, err
}

for _, subscription := range page.SubscriptionRuntimeProperties {
if meta.entityNameRegex.FindString(subscription.SubscriptionName) == subscription.SubscriptionName {
messageCounts = append(messageCounts, int64(subscription.ActiveMessageCount))
}
}
}

return int64(subscriptionEntity.ActiveMessageCount), nil
return performOperation(messageCounts, meta.operation), nil
}

func performOperation(messageCounts []int64, operation string) int64 {
var result int64
for _, val := range messageCounts {
switch operation {
case avgOperation, sumOperation:
result += val
case maxOperation:
if val > result {
result = val
}
}
}

total := int64(len(messageCounts))
if operation == "avg" && total != 0 {
return result / total
}
return result
}
21 changes: 21 additions & 0 deletions pkg/scalers/azure_servicebus_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,27 @@ var parseServiceBusMetadataDataset = []parseServiceBusMetadataTestData{
{map[string]string{"queueName": queueName, "namespace": namespaceName}, false, queue, defaultSuffix, map[string]string{}, kedav1alpha1.PodIdentityProviderAzureWorkload},
// invalid activation message count
{map[string]string{"queueName": queueName, "connectionFromEnv": connectionSetting, "messageCount": messageCount, "activationMessageCount": "AA"}, true, queue, defaultSuffix, map[string]string{}, ""},
// queue with incorrect useRegex value
{map[string]string{"queueName": queueName, "connectionFromEnv": connectionSetting, "useRegex": "ababa"}, true, queue, defaultSuffix, map[string]string{}, ""},
// properly formed queues with regex
{map[string]string{"queueName": queueName, "connectionFromEnv": connectionSetting, "useRegex": "false"}, false, queue, defaultSuffix, map[string]string{}, ""},
{map[string]string{"queueName": queueName, "connectionFromEnv": connectionSetting, "useRegex": "true", "operation": avgOperation}, false, queue, defaultSuffix, map[string]string{}, ""},
{map[string]string{"queueName": queueName, "connectionFromEnv": connectionSetting, "useRegex": "true", "operation": sumOperation}, false, queue, defaultSuffix, map[string]string{}, ""},
{map[string]string{"queueName": queueName, "connectionFromEnv": connectionSetting, "useRegex": "true", "operation": maxOperation}, false, queue, defaultSuffix, map[string]string{}, ""},
{map[string]string{"queueName": queueName, "connectionFromEnv": connectionSetting, "useRegex": "true", "operation": "random"}, true, queue, defaultSuffix, map[string]string{}, ""},
// queue with invalid regex string
{map[string]string{"queueName": "*", "connectionFromEnv": connectionSetting, "useRegex": "true", "operation": "avg"}, true, queue, defaultSuffix, map[string]string{}, ""},

// subscription with incorrect useRegex value
{map[string]string{"topicName": topicName, "subscriptionName": subscriptionName, "connectionFromEnv": connectionSetting, "useRegex": "ababa"}, true, subscription, defaultSuffix, map[string]string{}, ""},
// properly formed subscriptions with regex
{map[string]string{"topicName": topicName, "subscriptionName": subscriptionName, "connectionFromEnv": connectionSetting, "useRegex": "false"}, false, subscription, defaultSuffix, map[string]string{}, ""},
{map[string]string{"topicName": topicName, "subscriptionName": subscriptionName, "connectionFromEnv": connectionSetting, "useRegex": "true", "operation": avgOperation}, false, subscription, defaultSuffix, map[string]string{}, ""},
{map[string]string{"topicName": topicName, "subscriptionName": subscriptionName, "connectionFromEnv": connectionSetting, "useRegex": "true", "operation": sumOperation}, false, subscription, defaultSuffix, map[string]string{}, ""},
{map[string]string{"topicName": topicName, "subscriptionName": subscriptionName, "connectionFromEnv": connectionSetting, "useRegex": "true", "operation": maxOperation}, false, subscription, defaultSuffix, map[string]string{}, ""},
{map[string]string{"topicName": topicName, "subscriptionName": subscriptionName, "connectionFromEnv": connectionSetting, "useRegex": "true", "operation": "random"}, true, subscription, defaultSuffix, map[string]string{}, ""},
// subscription with invalid regex string
{map[string]string{"topicName": topicName, "subscriptionName": "*", "connectionFromEnv": connectionSetting, "useRegex": "true", "operation": "avg"}, true, subscription, defaultSuffix, map[string]string{}, ""},
}

var azServiceBusMetricIdentifiers = []azServiceBusMetricIdentifier{
Expand Down
1 change: 1 addition & 0 deletions tests/.env
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ AZURE_DEVOPS_PROJECT=
AZURE_KEYVAULT_URI=
AZURE_LOG_ANALYTICS_WORKSPACE_ID=
AZURE_RESOURCE_GROUP=
AZURE_SERVICE_BUS_CONNECTION_STRING=
AZURE_SP_APP_ID=
AZURE_SP_KEY=
AZURE_SP_OBJECT_ID=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
)

// Load environment variables from .env file
var _ = godotenv.Load("../../.env")
var _ = godotenv.Load("../../../.env")

const (
testName = "azure-app-insights-test"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
)

// Load environment variables from .env file
var _ = godotenv.Load("../../.env")
var _ = godotenv.Load("../../../.env")

const (
testName = "azure-blob-test"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
)

// Load environment variables from .env file
var _ = godotenv.Load("../../.env")
var _ = godotenv.Load("../../../.env")

const (
testName = "azure-data-explorer-test"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
)

// Load environment variables from .env file
var _ = godotenv.Load("../../.env")
var _ = godotenv.Load("../../../.env")

const (
testName = "azure-log-analytics-test"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
)

// Load environment variables from .env file
var _ = godotenv.Load("../../.env")
var _ = godotenv.Load("../../../.env")

const (
testName = "azure-pipelines-test"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
)

// Load environment variables from .env file
var _ = godotenv.Load("../../.env")
var _ = godotenv.Load("../../../.env")

const (
testName = "azure-queue-test"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
)

// Load environment variables from .env file
var _ = godotenv.Load("../../.env")
var _ = godotenv.Load("../../../.env")

const (
testName = "azure-service-bus-queue-test"
Expand Down
Loading

0 comments on commit 344e468

Please sign in to comment.