Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor ActiveMQ scaler config #5799

Merged
merged 1 commit into from
May 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio
### New

- TODO ([#XXX](https://github.com/kedacore/keda/issues/XXX))
- **General**: Declarative parsing of scaler config ([#5037](https://github.com/kedacore/keda/issues/5037))
- **General**: Declarative parsing of scaler config ([#5037](https://github.com/kedacore/keda/issues/5037)|[#5797](https://github.com/kedacore/keda/issues/5797))

#### Experimental

Expand Down
216 changes: 73 additions & 143 deletions pkg/scalers/activemq_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,10 @@ import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"html/template"
"net/http"
"net/url"
"strconv"
"strings"

"github.com/go-logr/logr"
Expand All @@ -20,6 +18,8 @@ import (
kedautil "github.com/kedacore/keda/v2/pkg/util"
)

const defaultActiveMQRestAPITemplate = "http://{{.ManagementEndpoint}}/api/jolokia/read/org.apache.activemq:type=Broker,brokerName={{.BrokerName}},destinationType=Queue,destinationName={{.DestinationName}}/QueueSize"

type activeMQScaler struct {
metricType v2.MetricTargetType
metadata *activeMQMetadata
Expand All @@ -28,17 +28,65 @@ type activeMQScaler struct {
}

type activeMQMetadata struct {
managementEndpoint string
destinationName string
brokerName string
username string
password string
restAPITemplate string
targetQueueSize int64
activationTargetQueueSize int64
corsHeader string
metricName string
triggerIndex int
metricName string
triggerIndex int

ManagementEndpoint string `keda:"name=managementEndpoint, order=triggerMetadata, optional"`
DestinationName string `keda:"name=destinationName, order=triggerMetadata, optional"`
BrokerName string `keda:"name=brokerName, order=triggerMetadata, optional"`

// auth
Username string `keda:"name=username, order=authParams;resolvedEnv;triggerMetadata"`
Password string `keda:"name=password, order=authParams;resolvedEnv;triggerMetadata"`

CorsHeader string `keda:"name=corsHeader, order=triggerMetadata, optional"`

RestAPITemplate string `keda:"name=restAPITemplate, order=triggerMetadata, optional"`
TargetQueueSize int64 `keda:"name=targetQueueSize, order=triggerMetadata, optional, default=10"`
ActivationTargetQueueSize int64 `keda:"name=activationTargetQueueSize, order=triggerMetadata, optional, default=0"`
}

func (a *activeMQMetadata) Validate() error {
if a.RestAPITemplate != "" {
// parse restAPITemplate to provide managementEndpoint, brokerName, destinationName
u, err := url.ParseRequestURI(a.RestAPITemplate)
if err != nil {
return fmt.Errorf("unable to parse ActiveMQ restAPITemplate: %w", err)
}
a.ManagementEndpoint = u.Host
// This returns : type=Broker,brokerName=<<brokerName>>,destinationType=Queue,destinationName=<<destinationName>>
splitURL := strings.Split(strings.Split(u.Path, ":")[1], "/")[0]
replacer := strings.NewReplacer(",", "&")
// This returns a map with key: string types and element type [] string. : map[brokerName:[<<brokerName>>] destinationName:[<<destinationName>>] destinationType:[Queue] type:[Broker]]
v, err := url.ParseQuery(replacer.Replace(splitURL))
if err != nil {
return fmt.Errorf("unable to parse ActiveMQ restAPITemplate: %w", err)
}
if len(v["destinationName"][0]) == 0 {
return fmt.Errorf("no destinationName is given")
}
a.DestinationName = v["destinationName"][0]
if len(v["brokerName"][0]) == 0 {
return fmt.Errorf("no brokerName given: %s", a.RestAPITemplate)
}
a.BrokerName = v["brokerName"][0]
} else {
a.RestAPITemplate = defaultActiveMQRestAPITemplate
if a.ManagementEndpoint == "" {
return fmt.Errorf("no management endpoint given")
}
if a.DestinationName == "" {
return fmt.Errorf("no destination name given")
}
if a.BrokerName == "" {
return fmt.Errorf("no broker name given")
}
}
if a.CorsHeader == "" {
a.CorsHeader = fmt.Sprintf(defaultCorsHeader, a.ManagementEndpoint)
}
a.metricName = GenerateMetricNameWithIndex(a.triggerIndex, kedautil.NormalizeString(fmt.Sprintf("activemq-%s", a.DestinationName)))
return nil
}

type activeMQMonitoring struct {
Expand All @@ -47,12 +95,6 @@ type activeMQMonitoring struct {
Timestamp int64 `json:"timestamp"`
}

const (
defaultTargetQueueSize = 10
defaultActivationTargetQueueSize = 0
defaultActiveMQRestAPITemplate = "http://{{.ManagementEndpoint}}/api/jolokia/read/org.apache.activemq:type=Broker,brokerName={{.BrokerName}},destinationType=Queue,destinationName={{.DestinationName}}/QueueSize"
)

// NewActiveMQScaler creates a new activeMQ Scaler
func NewActiveMQScaler(config *scalersconfig.ScalerConfig) (Scaler, error) {
metricType, err := GetMetricTargetType(config)
Expand All @@ -75,134 +117,22 @@ func NewActiveMQScaler(config *scalersconfig.ScalerConfig) (Scaler, error) {
}

func parseActiveMQMetadata(config *scalersconfig.ScalerConfig) (*activeMQMetadata, error) {
meta := activeMQMetadata{}

if val, ok := config.TriggerMetadata["restAPITemplate"]; ok && val != "" {
meta.restAPITemplate = config.TriggerMetadata["restAPITemplate"]
var err error
if meta, err = getRestAPIParameters(meta); err != nil {
return nil, fmt.Errorf("can't parse restAPITemplate : %s ", err)
}
} else {
meta.restAPITemplate = defaultActiveMQRestAPITemplate
if config.TriggerMetadata["managementEndpoint"] == "" {
return nil, errors.New("no management endpoint given")
}
meta.managementEndpoint = config.TriggerMetadata["managementEndpoint"]

if config.TriggerMetadata["destinationName"] == "" {
return nil, errors.New("no destination name given")
}
meta.destinationName = config.TriggerMetadata["destinationName"]

if config.TriggerMetadata["brokerName"] == "" {
return nil, errors.New("no broker name given")
}
meta.brokerName = config.TriggerMetadata["brokerName"]
}

if val, ok := config.TriggerMetadata["targetQueueSize"]; ok {
queueSize, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return nil, fmt.Errorf("invalid targetQueueSize - must be an integer")
}

meta.targetQueueSize = queueSize
} else {
meta.targetQueueSize = defaultTargetQueueSize
}

if val, ok := config.TriggerMetadata["activationTargetQueueSize"]; ok {
activationTargetQueueSize, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return nil, fmt.Errorf("invalid activationTargetQueueSize - must be an integer")
}
meta.activationTargetQueueSize = activationTargetQueueSize
} else {
meta.activationTargetQueueSize = defaultActivationTargetQueueSize
}

if val, ok := config.AuthParams["username"]; ok && val != "" {
meta.username = val
} else if val, ok := config.TriggerMetadata["username"]; ok && val != "" {
username := val

if val, ok := config.ResolvedEnv[username]; ok && val != "" {
meta.username = val
} else {
meta.username = username
}
}

if val, ok := config.TriggerMetadata["corsHeader"]; ok && val != "" {
meta.corsHeader = config.TriggerMetadata["corsHeader"]
} else {
meta.corsHeader = fmt.Sprintf(defaultCorsHeader, meta.managementEndpoint)
}

if meta.username == "" {
return nil, fmt.Errorf("username cannot be empty")
}

if val, ok := config.AuthParams["password"]; ok && val != "" {
meta.password = val
} else if val, ok := config.TriggerMetadata["password"]; ok && val != "" {
password := val

if val, ok := config.ResolvedEnv[password]; ok && val != "" {
meta.password = val
} else {
meta.password = password
}
}

if meta.password == "" {
return nil, fmt.Errorf("password cannot be empty")
}

meta.metricName = GenerateMetricNameWithIndex(config.TriggerIndex, kedautil.NormalizeString(fmt.Sprintf("activemq-%s", meta.destinationName)))

meta := &activeMQMetadata{}
meta.triggerIndex = config.TriggerIndex

return &meta, nil
}

// getRestAPIParameters parse restAPITemplate to provide managementEndpoint, brokerName, destinationName
func getRestAPIParameters(meta activeMQMetadata) (activeMQMetadata, error) {
u, err := url.ParseRequestURI(meta.restAPITemplate)
if err != nil {
return meta, fmt.Errorf("unable to parse ActiveMQ restAPITemplate: %w", err)
if err := config.TypedConfig(meta); err != nil {
return nil, fmt.Errorf("error parsing prometheus metadata: %w", err)
}

meta.managementEndpoint = u.Host
splitURL := strings.Split(strings.Split(u.Path, ":")[1], "/")[0] // This returns : type=Broker,brokerName=<<brokerName>>,destinationType=Queue,destinationName=<<destinationName>>
replacer := strings.NewReplacer(",", "&")
v, err := url.ParseQuery(replacer.Replace(splitURL)) // This returns a map with key: string types and element type [] string. : map[brokerName:[<<brokerName>>] destinationName:[<<destinationName>>] destinationType:[Queue] type:[Broker]]
if err != nil {
return meta, fmt.Errorf("unable to parse ActiveMQ restAPITemplate: %w", err)
}

if len(v["destinationName"][0]) == 0 {
return meta, errors.New("no destinationName is given")
}
meta.destinationName = v["destinationName"][0]

if len(v["brokerName"][0]) == 0 {
return meta, fmt.Errorf("no brokerName given: %s", meta.restAPITemplate)
}
meta.brokerName = v["brokerName"][0]

return meta, nil
}

func (s *activeMQScaler) getMonitoringEndpoint() (string, error) {
var buf bytes.Buffer
endpoint := map[string]string{
"ManagementEndpoint": s.metadata.managementEndpoint,
"BrokerName": s.metadata.brokerName,
"DestinationName": s.metadata.destinationName,
"ManagementEndpoint": s.metadata.ManagementEndpoint,
"BrokerName": s.metadata.BrokerName,
"DestinationName": s.metadata.DestinationName,
}
template, err := template.New("monitoring_endpoint").Parse(s.metadata.restAPITemplate)
template, err := template.New("monitoring_endpoint").Parse(s.metadata.RestAPITemplate)
if err != nil {
return "", fmt.Errorf("error parsing template: %w", err)
}
Expand Down Expand Up @@ -230,9 +160,9 @@ func (s *activeMQScaler) getQueueMessageCount(ctx context.Context) (int64, error
}

// Add HTTP Auth and Headers
req.SetBasicAuth(s.metadata.username, s.metadata.password)
req.SetBasicAuth(s.metadata.Username, s.metadata.Password)
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Origin", s.metadata.corsHeader)
req.Header.Set("Origin", s.metadata.CorsHeader)

resp, err := client.Do(req)
if err != nil {
Expand All @@ -250,7 +180,7 @@ func (s *activeMQScaler) getQueueMessageCount(ctx context.Context) (int64, error
return -1, fmt.Errorf("ActiveMQ management endpoint response error code : %d %d", resp.StatusCode, monitoringInfo.Status)
}

s.logger.V(1).Info(fmt.Sprintf("ActiveMQ scaler: Providing metrics based on current queue size %d queue size limit %d", queueMessageCount, s.metadata.targetQueueSize))
s.logger.V(1).Info(fmt.Sprintf("ActiveMQ scaler: Providing metrics based on current queue size %d queue size limit %d", queueMessageCount, s.metadata.TargetQueueSize))

return queueMessageCount, nil
}
Expand All @@ -261,7 +191,7 @@ func (s *activeMQScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpe
Metric: v2.MetricIdentifier{
Name: s.metadata.metricName,
},
Target: GetMetricTarget(s.metricType, s.metadata.targetQueueSize),
Target: GetMetricTarget(s.metricType, s.metadata.TargetQueueSize),
}
metricSpec := v2.MetricSpec{
External: externalMetric, Type: externalMetricType,
Expand All @@ -277,7 +207,7 @@ func (s *activeMQScaler) GetMetricsAndActivity(ctx context.Context, metricName s

metric := GenerateMetricInMili(metricName, float64(queueSize))

return []external_metrics.ExternalMetricValue{metric}, queueSize > s.metadata.activationTargetQueueSize, nil
return []external_metrics.ExternalMetricValue{metric}, queueSize > s.metadata.ActivationTargetQueueSize, nil
}

func (s *activeMQScaler) Close(context.Context) error {
Expand Down
17 changes: 9 additions & 8 deletions pkg/scalers/activemq_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

const (
testInvalidRestAPITemplate = "testInvalidRestAPITemplate"
defaultTargetQueueSize = 10
)

type parseActiveMQMetadataTestData struct {
Expand Down Expand Up @@ -228,8 +229,8 @@ func TestActiveMQDefaultCorsHeader(t *testing.T) {
if err != nil {
t.Error("Expected success but got error", err)
}
if !(meta.corsHeader == "http://localhost:8161") {
t.Errorf("Expected http://localhost:8161 but got %s", meta.corsHeader)
if !(meta.CorsHeader == "http://localhost:8161") {
t.Errorf("Expected http://localhost:8161 but got %s", meta.CorsHeader)
}
}

Expand All @@ -240,8 +241,8 @@ func TestActiveMQCorsHeader(t *testing.T) {
if err != nil {
t.Error("Expected success but got error", err)
}
if !(meta.corsHeader == "test") {
t.Errorf("Expected test but got %s", meta.corsHeader)
if !(meta.CorsHeader == "test") {
t.Errorf("Expected test but got %s", meta.CorsHeader)
}
}

Expand All @@ -255,8 +256,8 @@ func TestParseActiveMQMetadata(t *testing.T) {
if testData.isError && err == nil {
t.Error("Expected error but got success")
}
if metadata != nil && metadata.password != "" && metadata.password != testData.authParams["password"] {
t.Error("Expected password from configuration but found something else: ", metadata.password)
if metadata != nil && metadata.Password != "" && metadata.Password != testData.authParams["password"] {
t.Error("Expected password from configuration but found something else: ", metadata.Password)
fmt.Println(testData)
}
})
Expand Down Expand Up @@ -288,8 +289,8 @@ func TestParseDefaultTargetQueueSize(t *testing.T) {
t.Error("Expected success but got error", err)
case testData.isError && err == nil:
t.Error("Expected error but got success")
case metadata.targetQueueSize != defaultTargetQueueSize:
t.Error("Expected default targetQueueSize =", defaultTargetQueueSize, "but got", metadata.targetQueueSize)
case metadata.TargetQueueSize != defaultTargetQueueSize:
t.Error("Expected default targetQueueSize =", defaultTargetQueueSize, "but got", metadata.TargetQueueSize)
}
})
}
Expand Down
Loading