From e225aa3c2036c66b1377392075a3188056375c15 Mon Sep 17 00:00:00 2001 From: melisatanrverdi Date: Mon, 27 Sep 2021 23:37:00 +0300 Subject: [PATCH 1/3] signed-off-by: melisatanrverdi melisatanrverdi@gmail.com Signed-off-by: melisatanrverdi --- pkg/scalers/activemq_classic_scaler.go | 272 +++++++++++++++++++++++++ pkg/scaling/scale_handler.go | 2 + 2 files changed, 274 insertions(+) create mode 100644 pkg/scalers/activemq_classic_scaler.go diff --git a/pkg/scalers/activemq_classic_scaler.go b/pkg/scalers/activemq_classic_scaler.go new file mode 100644 index 00000000000..aaf53a4e5a8 --- /dev/null +++ b/pkg/scalers/activemq_classic_scaler.go @@ -0,0 +1,272 @@ +package scalers + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "net/http" + "net/url" + "strconv" + "strings" + + kedautil "github.com/kedacore/keda/v2/pkg/util" + v2beta2 "k8s.io/api/autoscaling/v2beta2" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/metrics/pkg/apis/external_metrics" + logf "sigs.k8s.io/controller-runtime/pkg/log" +) + +type activemqClassicScaler struct { + metadata *activemqClassicMetadata + httpClient *http.Client +} + +//revive:disable:var-naming breaking change on restApiTemplate, wouldn't bring any benefit to users +type activemqClassicMetadata struct { + managementEndpoint string + destinationName string + brokerName string + username string + password string + restApiTemplate string + queueSize int + corsHeader string +} + +//revive:enable:var-naming +type activemqClassicMonitoring struct { + MsgCount int `json:"value"` + Status int `json:"status"` + Timestamp int64 `json:"timestamp"` +} + +const ( + activemqClassicMetricType = "External" + defaultActivemqClassicQueueSize = 10 + defaultRestApiTemplate = "http://<>/api/jolokia/read/org.apache.activemq:type=Broker,brokerName=<>,destinationType=Queue,destinationName=<>/QueueSize" + defaultActivemqClassicCorsHeader = "http://%s" +) + +var activemqClassicLog = logf.Log.WithName("activemq_classic_scaler") + +// NewActivemqClassicQueueScaler creates a new activemqClassic queue Scaler +func NewActivemqClassicScaler(config *ScalerConfig) (Scaler, error) { + // do we need to guarantee this timeout for a specific + // reason? if not, we can have buildScaler pass in + // the global client + httpClient := kedautil.CreateHTTPClient(config.GlobalHTTPTimeout) + + activemqClassicMetadata, err := parseActivemqClassicMetadata(config) + if err != nil { + return nil, fmt.Errorf("error parsing activemq classic metadata: %s", err) + } + + return &activemqClassicScaler{ + metadata: activemqClassicMetadata, + httpClient: httpClient, + }, nil +} + +func parseActivemqClassicMetadata(config *ScalerConfig) (*activemqClassicMetadata, error) { + meta := activemqClassicMetadata{} + + meta.queueSize = defaultActivemqClassicQueueSize + + if val, ok := config.TriggerMetadata["restApiTemplate"]; ok && val != "" { + meta.restApiTemplate = config.TriggerMetadata["restApiTemplate"] + var err error + if meta, err = getAPIParameters(meta); err != nil { + return nil, fmt.Errorf("can't parse restApiTemplate : %s ", err) + } + } else { + meta.restApiTemplate = defaultRestApiTemplate + if config.TriggerMetadata["managementEndpoint"] == "" { + return nil, errors.New("no management endpoint given") + } + meta.managementEndpoint = config.TriggerMetadata["managementEndpoint"] + + if config.TriggerMetadata["destinationName"] == "" { + return nil, errors.New("no destination name given") + } + meta.destinationName = config.TriggerMetadata["destinationName"] + + if config.TriggerMetadata["brokerName"] == "" { + return nil, errors.New("no broker name given") + } + meta.brokerName = config.TriggerMetadata["brokerName"] + + } + if val, ok := config.TriggerMetadata["corsHeader"]; ok && val != "" { + meta.corsHeader = config.TriggerMetadata["corsHeader"] + } else { + meta.corsHeader = fmt.Sprintf(defaultActivemqClassicCorsHeader, meta.managementEndpoint) + } + + if val, ok := config.TriggerMetadata["queueSize"]; ok { + queueSize, err := strconv.Atoi(val) + if err != nil { + return nil, fmt.Errorf("can't parse queueSize: %s", err) + } + + meta.queueSize = queueSize + } + + if val, ok := config.AuthParams["username"]; ok && val != "" { + meta.username = val + } else if val, ok := config.TriggerMetadata["username"]; ok && val != "" { + username := val + + if val, ok := config.ResolvedEnv[username]; ok && val != "" { + meta.username = val + } else { + meta.username = username + } + } + + if meta.username == "" { + return nil, fmt.Errorf("username cannot be empty") + } + + if val, ok := config.AuthParams["password"]; ok && val != "" { + meta.password = val + } else if val, ok := config.TriggerMetadata["password"]; ok && val != "" { + password := val + + if val, ok := config.ResolvedEnv[password]; ok && val != "" { + meta.password = val + } else { + meta.password = password + } + } + + if meta.password == "" { + return nil, fmt.Errorf("password cannot be empty") + } + return &meta, nil +} + +// IsActive determines if we need to scale from zero +func (s *activemqClassicScaler) IsActive(ctx context.Context) (bool, error) { + messages, err := s.getQueueMessageCount() + if err != nil { + activemqClassicLog.Error(err, "Unable to access the activemq classic management endpoint", "managementEndpoint", s.metadata.managementEndpoint) + return false, err + } + + return messages > 0, nil +} + +// getAPIParameters parse restApiTemplate to provide managementEndpoint , brokerName, destinationType, destinationName +func getAPIParameters(meta activemqClassicMetadata) (activemqClassicMetadata, error) { + u, err := url.ParseRequestURI(meta.restApiTemplate) + if err != nil { + return meta, fmt.Errorf("unable to parse the activemq classic restApiTemplate: %s", err) + } + + meta.managementEndpoint = u.Host + splitURL := strings.Split(strings.Split(u.Path, ":")[1], "/")[0] // This returns : type=Broker,brokerName=<>,destinationType=Queue,destinationName=<> + replacer := strings.NewReplacer(",", "&") + v, err := url.ParseQuery(replacer.Replace(splitURL)) // This returns a map with key: string types and element type [] string. : map[brokerName:[<>] destinationName:[<>] destinationType:[Queue] type:[Broker]] + if err != nil { + return meta, fmt.Errorf("unable to parse the activemq classic restApiTemplate: %s", err) + } + + if len(v["destinationName"][0]) == 0 { + return meta, errors.New("no destinationName is given") + } + meta.destinationName = v["destinationName"][0] + + if len(v["brokerName"][0]) == 0 { + return meta, fmt.Errorf("no brokerName given: %s", meta.restApiTemplate) + } + meta.brokerName = v["brokerName"][0] + + return meta, nil +} + +func (s *activemqClassicScaler) getMonitoringEndpoint() string { + replacer := strings.NewReplacer("<>", s.metadata.managementEndpoint, + "<>", s.metadata.brokerName, + "<>", s.metadata.destinationName) + + monitoringEndpoint := replacer.Replace(s.metadata.restApiTemplate) + + return monitoringEndpoint +} + +func (s *activemqClassicScaler) getQueueMessageCount() (int, error) { + var monitoringInfo *activemqClassicMonitoring + messageCount := 0 + + client := s.httpClient + url := s.getMonitoringEndpoint() + + req, err := http.NewRequest("GET", url, nil) + + req.SetBasicAuth(s.metadata.username, s.metadata.password) + req.Header.Set("Origin", s.metadata.corsHeader) + + if err != nil { + return -1, err + } + resp, err := client.Do(req) + if err != nil { + return -1, err + } + + defer resp.Body.Close() + + if err := json.NewDecoder(resp.Body).Decode(&monitoringInfo); err != nil { + return -1, err + } + if resp.StatusCode == 200 && monitoringInfo.Status == 200 { + messageCount = monitoringInfo.MsgCount + } else { + return -1, fmt.Errorf("activemq classic management endpoint response error code : %d %d", resp.StatusCode, monitoringInfo.Status) + } + + activemqClassicLog.V(1).Info(fmt.Sprintf("Activemq classic scaler: Providing metrics based on current queue size %d queue size limit %d", messageCount, s.metadata.queueSize)) + + return messageCount, nil +} + +func (s *activemqClassicScaler) GetMetricSpecForScaling() []v2beta2.MetricSpec { + targetMetricValue := resource.NewQuantity(int64(s.metadata.queueSize), resource.DecimalSI) + externalMetric := &v2beta2.ExternalMetricSource{ + Metric: v2beta2.MetricIdentifier{ + Name: kedautil.NormalizeString(fmt.Sprintf("%s-%s-%s", "activemqClassic", s.metadata.brokerName, s.metadata.destinationName)), + }, + Target: v2beta2.MetricTarget{ + Type: v2beta2.AverageValueMetricType, + AverageValue: targetMetricValue, + }, + } + metricSpec := v2beta2.MetricSpec{External: externalMetric, Type: activemqClassicMetricType} + return []v2beta2.MetricSpec{metricSpec} +} + +// GetMetrics returns value for a supported metric and an error if there is a problem getting the metric +func (s *activemqClassicScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) { + messages, err := s.getQueueMessageCount() + + if err != nil { + activemqClassicLog.Error(err, "Unable to access the activemq classic management endpoint", "managementEndpoint", s.metadata.managementEndpoint) + return []external_metrics.ExternalMetricValue{}, err + } + + metric := external_metrics.ExternalMetricValue{ + MetricName: metricName, + Value: *resource.NewQuantity(int64(messages), resource.DecimalSI), + Timestamp: metav1.Now(), + } + + return append([]external_metrics.ExternalMetricValue{}, metric), nil +} + +// Nothing to close here. +func (s *activemqClassicScaler) Close() error { + return nil +} diff --git a/pkg/scaling/scale_handler.go b/pkg/scaling/scale_handler.go index e2da33b7c0d..78ed0417994 100644 --- a/pkg/scaling/scale_handler.go +++ b/pkg/scaling/scale_handler.go @@ -312,6 +312,8 @@ func (h *scaleHandler) buildScalers(withTriggers *kedav1alpha1.WithTriggers, pod func buildScaler(client client.Client, triggerType string, config *scalers.ScalerConfig) (scalers.Scaler, error) { // TRIGGERS-START switch triggerType { + case "activemq-classic": + return scalers.NewActivemqClassicScaler(config) case "artemis-queue": return scalers.NewArtemisQueueScaler(config) case "aws-cloudwatch": From fa324f9f3957324b5fba3c78d2fe04e464b821ac Mon Sep 17 00:00:00 2001 From: melisatanrverdi Date: Tue, 28 Sep 2021 01:17:53 +0300 Subject: [PATCH 2/3] update changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index d278c433fc6..8822c316685 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,7 @@ ### New - TODO ([#XXX](https://github.com/kedacore/keda/pull/XXX)) +- Add ActiveMQ Classic Scaler ([#2121](https://github.com/kedacore/keda/pull/2121)) - ScaledJob: introduce MultipleScalersCalculation ([#2016](https://github.com/kedacore/keda/pull/2016)) - Add Graphite Scaler ([#1628](https://github.com/kedacore/keda/pull/2092)) From ef6116e5e1b74611640537c055db56642e614a0a Mon Sep 17 00:00:00 2001 From: melisatanrverdi Date: Sun, 10 Oct 2021 01:37:45 +0300 Subject: [PATCH 3/3] rename ActiveMQ Classic as ActiveMQ Signed-off-by: melisatanrverdi --- .../generated/openapi/zz_generated.openapi.go | 4 +- ...keda.sh_clustertriggerauthentications.yaml | 4 +- config/crd/bases/keda.sh_scaledjobs.yaml | 2 +- config/crd/bases/keda.sh_scaledobjects.yaml | 2 +- .../bases/keda.sh_triggerauthentications.yaml | 4 +- pkg/scalers/activemq_classic_scaler.go | 272 ------------------ pkg/scalers/activemq_scaler.go | 264 +++++++++++++++++ pkg/scaling/scale_handler.go | 4 +- 8 files changed, 276 insertions(+), 280 deletions(-) delete mode 100644 pkg/scalers/activemq_classic_scaler.go create mode 100644 pkg/scalers/activemq_scaler.go diff --git a/adapter/generated/openapi/zz_generated.openapi.go b/adapter/generated/openapi/zz_generated.openapi.go index 4535e7cf4b4..c00b62da2c3 100644 --- a/adapter/generated/openapi/zz_generated.openapi.go +++ b/adapter/generated/openapi/zz_generated.openapi.go @@ -14784,7 +14784,7 @@ func schema_pkg_apis_metrics_v1beta1_NodeMetrics(ref common.ReferenceCallback) c }, "window": { SchemaProps: spec.SchemaProps{ - Default: map[string]interface{}{}, + Default: 0, Ref: ref("k8s.io/apimachinery/pkg/apis/meta/v1.Duration"), }, }, @@ -14899,7 +14899,7 @@ func schema_pkg_apis_metrics_v1beta1_PodMetrics(ref common.ReferenceCallback) co }, "window": { SchemaProps: spec.SchemaProps{ - Default: map[string]interface{}{}, + Default: 0, Ref: ref("k8s.io/apimachinery/pkg/apis/meta/v1.Duration"), }, }, diff --git a/config/crd/bases/keda.sh_clustertriggerauthentications.yaml b/config/crd/bases/keda.sh_clustertriggerauthentications.yaml index a33408d758d..c871e853d2c 100644 --- a/config/crd/bases/keda.sh_clustertriggerauthentications.yaml +++ b/config/crd/bases/keda.sh_clustertriggerauthentications.yaml @@ -4,7 +4,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.4.1 + controller-gen.kubebuilder.io/version: v0.6.1 creationTimestamp: null name: clustertriggerauthentications.keda.sh spec: @@ -90,6 +90,8 @@ spec: type: object mount: type: string + namespace: + type: string role: type: string secrets: diff --git a/config/crd/bases/keda.sh_scaledjobs.yaml b/config/crd/bases/keda.sh_scaledjobs.yaml index 511680bc103..b444a4239d9 100644 --- a/config/crd/bases/keda.sh_scaledjobs.yaml +++ b/config/crd/bases/keda.sh_scaledjobs.yaml @@ -4,7 +4,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.4.1 + controller-gen.kubebuilder.io/version: v0.6.1 creationTimestamp: null name: scaledjobs.keda.sh spec: diff --git a/config/crd/bases/keda.sh_scaledobjects.yaml b/config/crd/bases/keda.sh_scaledobjects.yaml index 0781a3e86d0..1ff6f64fd0c 100644 --- a/config/crd/bases/keda.sh_scaledobjects.yaml +++ b/config/crd/bases/keda.sh_scaledobjects.yaml @@ -4,7 +4,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.4.1 + controller-gen.kubebuilder.io/version: v0.6.1 creationTimestamp: null name: scaledobjects.keda.sh spec: diff --git a/config/crd/bases/keda.sh_triggerauthentications.yaml b/config/crd/bases/keda.sh_triggerauthentications.yaml index 4f4b4226470..eccd7862528 100644 --- a/config/crd/bases/keda.sh_triggerauthentications.yaml +++ b/config/crd/bases/keda.sh_triggerauthentications.yaml @@ -4,7 +4,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.4.1 + controller-gen.kubebuilder.io/version: v0.6.1 creationTimestamp: null name: triggerauthentications.keda.sh spec: @@ -89,6 +89,8 @@ spec: type: object mount: type: string + namespace: + type: string role: type: string secrets: diff --git a/pkg/scalers/activemq_classic_scaler.go b/pkg/scalers/activemq_classic_scaler.go deleted file mode 100644 index aaf53a4e5a8..00000000000 --- a/pkg/scalers/activemq_classic_scaler.go +++ /dev/null @@ -1,272 +0,0 @@ -package scalers - -import ( - "context" - "encoding/json" - "errors" - "fmt" - "net/http" - "net/url" - "strconv" - "strings" - - kedautil "github.com/kedacore/keda/v2/pkg/util" - v2beta2 "k8s.io/api/autoscaling/v2beta2" - "k8s.io/apimachinery/pkg/api/resource" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" - "k8s.io/metrics/pkg/apis/external_metrics" - logf "sigs.k8s.io/controller-runtime/pkg/log" -) - -type activemqClassicScaler struct { - metadata *activemqClassicMetadata - httpClient *http.Client -} - -//revive:disable:var-naming breaking change on restApiTemplate, wouldn't bring any benefit to users -type activemqClassicMetadata struct { - managementEndpoint string - destinationName string - brokerName string - username string - password string - restApiTemplate string - queueSize int - corsHeader string -} - -//revive:enable:var-naming -type activemqClassicMonitoring struct { - MsgCount int `json:"value"` - Status int `json:"status"` - Timestamp int64 `json:"timestamp"` -} - -const ( - activemqClassicMetricType = "External" - defaultActivemqClassicQueueSize = 10 - defaultRestApiTemplate = "http://<>/api/jolokia/read/org.apache.activemq:type=Broker,brokerName=<>,destinationType=Queue,destinationName=<>/QueueSize" - defaultActivemqClassicCorsHeader = "http://%s" -) - -var activemqClassicLog = logf.Log.WithName("activemq_classic_scaler") - -// NewActivemqClassicQueueScaler creates a new activemqClassic queue Scaler -func NewActivemqClassicScaler(config *ScalerConfig) (Scaler, error) { - // do we need to guarantee this timeout for a specific - // reason? if not, we can have buildScaler pass in - // the global client - httpClient := kedautil.CreateHTTPClient(config.GlobalHTTPTimeout) - - activemqClassicMetadata, err := parseActivemqClassicMetadata(config) - if err != nil { - return nil, fmt.Errorf("error parsing activemq classic metadata: %s", err) - } - - return &activemqClassicScaler{ - metadata: activemqClassicMetadata, - httpClient: httpClient, - }, nil -} - -func parseActivemqClassicMetadata(config *ScalerConfig) (*activemqClassicMetadata, error) { - meta := activemqClassicMetadata{} - - meta.queueSize = defaultActivemqClassicQueueSize - - if val, ok := config.TriggerMetadata["restApiTemplate"]; ok && val != "" { - meta.restApiTemplate = config.TriggerMetadata["restApiTemplate"] - var err error - if meta, err = getAPIParameters(meta); err != nil { - return nil, fmt.Errorf("can't parse restApiTemplate : %s ", err) - } - } else { - meta.restApiTemplate = defaultRestApiTemplate - if config.TriggerMetadata["managementEndpoint"] == "" { - return nil, errors.New("no management endpoint given") - } - meta.managementEndpoint = config.TriggerMetadata["managementEndpoint"] - - if config.TriggerMetadata["destinationName"] == "" { - return nil, errors.New("no destination name given") - } - meta.destinationName = config.TriggerMetadata["destinationName"] - - if config.TriggerMetadata["brokerName"] == "" { - return nil, errors.New("no broker name given") - } - meta.brokerName = config.TriggerMetadata["brokerName"] - - } - if val, ok := config.TriggerMetadata["corsHeader"]; ok && val != "" { - meta.corsHeader = config.TriggerMetadata["corsHeader"] - } else { - meta.corsHeader = fmt.Sprintf(defaultActivemqClassicCorsHeader, meta.managementEndpoint) - } - - if val, ok := config.TriggerMetadata["queueSize"]; ok { - queueSize, err := strconv.Atoi(val) - if err != nil { - return nil, fmt.Errorf("can't parse queueSize: %s", err) - } - - meta.queueSize = queueSize - } - - if val, ok := config.AuthParams["username"]; ok && val != "" { - meta.username = val - } else if val, ok := config.TriggerMetadata["username"]; ok && val != "" { - username := val - - if val, ok := config.ResolvedEnv[username]; ok && val != "" { - meta.username = val - } else { - meta.username = username - } - } - - if meta.username == "" { - return nil, fmt.Errorf("username cannot be empty") - } - - if val, ok := config.AuthParams["password"]; ok && val != "" { - meta.password = val - } else if val, ok := config.TriggerMetadata["password"]; ok && val != "" { - password := val - - if val, ok := config.ResolvedEnv[password]; ok && val != "" { - meta.password = val - } else { - meta.password = password - } - } - - if meta.password == "" { - return nil, fmt.Errorf("password cannot be empty") - } - return &meta, nil -} - -// IsActive determines if we need to scale from zero -func (s *activemqClassicScaler) IsActive(ctx context.Context) (bool, error) { - messages, err := s.getQueueMessageCount() - if err != nil { - activemqClassicLog.Error(err, "Unable to access the activemq classic management endpoint", "managementEndpoint", s.metadata.managementEndpoint) - return false, err - } - - return messages > 0, nil -} - -// getAPIParameters parse restApiTemplate to provide managementEndpoint , brokerName, destinationType, destinationName -func getAPIParameters(meta activemqClassicMetadata) (activemqClassicMetadata, error) { - u, err := url.ParseRequestURI(meta.restApiTemplate) - if err != nil { - return meta, fmt.Errorf("unable to parse the activemq classic restApiTemplate: %s", err) - } - - meta.managementEndpoint = u.Host - splitURL := strings.Split(strings.Split(u.Path, ":")[1], "/")[0] // This returns : type=Broker,brokerName=<>,destinationType=Queue,destinationName=<> - replacer := strings.NewReplacer(",", "&") - v, err := url.ParseQuery(replacer.Replace(splitURL)) // This returns a map with key: string types and element type [] string. : map[brokerName:[<>] destinationName:[<>] destinationType:[Queue] type:[Broker]] - if err != nil { - return meta, fmt.Errorf("unable to parse the activemq classic restApiTemplate: %s", err) - } - - if len(v["destinationName"][0]) == 0 { - return meta, errors.New("no destinationName is given") - } - meta.destinationName = v["destinationName"][0] - - if len(v["brokerName"][0]) == 0 { - return meta, fmt.Errorf("no brokerName given: %s", meta.restApiTemplate) - } - meta.brokerName = v["brokerName"][0] - - return meta, nil -} - -func (s *activemqClassicScaler) getMonitoringEndpoint() string { - replacer := strings.NewReplacer("<>", s.metadata.managementEndpoint, - "<>", s.metadata.brokerName, - "<>", s.metadata.destinationName) - - monitoringEndpoint := replacer.Replace(s.metadata.restApiTemplate) - - return monitoringEndpoint -} - -func (s *activemqClassicScaler) getQueueMessageCount() (int, error) { - var monitoringInfo *activemqClassicMonitoring - messageCount := 0 - - client := s.httpClient - url := s.getMonitoringEndpoint() - - req, err := http.NewRequest("GET", url, nil) - - req.SetBasicAuth(s.metadata.username, s.metadata.password) - req.Header.Set("Origin", s.metadata.corsHeader) - - if err != nil { - return -1, err - } - resp, err := client.Do(req) - if err != nil { - return -1, err - } - - defer resp.Body.Close() - - if err := json.NewDecoder(resp.Body).Decode(&monitoringInfo); err != nil { - return -1, err - } - if resp.StatusCode == 200 && monitoringInfo.Status == 200 { - messageCount = monitoringInfo.MsgCount - } else { - return -1, fmt.Errorf("activemq classic management endpoint response error code : %d %d", resp.StatusCode, monitoringInfo.Status) - } - - activemqClassicLog.V(1).Info(fmt.Sprintf("Activemq classic scaler: Providing metrics based on current queue size %d queue size limit %d", messageCount, s.metadata.queueSize)) - - return messageCount, nil -} - -func (s *activemqClassicScaler) GetMetricSpecForScaling() []v2beta2.MetricSpec { - targetMetricValue := resource.NewQuantity(int64(s.metadata.queueSize), resource.DecimalSI) - externalMetric := &v2beta2.ExternalMetricSource{ - Metric: v2beta2.MetricIdentifier{ - Name: kedautil.NormalizeString(fmt.Sprintf("%s-%s-%s", "activemqClassic", s.metadata.brokerName, s.metadata.destinationName)), - }, - Target: v2beta2.MetricTarget{ - Type: v2beta2.AverageValueMetricType, - AverageValue: targetMetricValue, - }, - } - metricSpec := v2beta2.MetricSpec{External: externalMetric, Type: activemqClassicMetricType} - return []v2beta2.MetricSpec{metricSpec} -} - -// GetMetrics returns value for a supported metric and an error if there is a problem getting the metric -func (s *activemqClassicScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) { - messages, err := s.getQueueMessageCount() - - if err != nil { - activemqClassicLog.Error(err, "Unable to access the activemq classic management endpoint", "managementEndpoint", s.metadata.managementEndpoint) - return []external_metrics.ExternalMetricValue{}, err - } - - metric := external_metrics.ExternalMetricValue{ - MetricName: metricName, - Value: *resource.NewQuantity(int64(messages), resource.DecimalSI), - Timestamp: metav1.Now(), - } - - return append([]external_metrics.ExternalMetricValue{}, metric), nil -} - -// Nothing to close here. -func (s *activemqClassicScaler) Close() error { - return nil -} diff --git a/pkg/scalers/activemq_scaler.go b/pkg/scalers/activemq_scaler.go new file mode 100644 index 00000000000..ef518adbf7e --- /dev/null +++ b/pkg/scalers/activemq_scaler.go @@ -0,0 +1,264 @@ +package scalers + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "net/http" + "net/url" + "strconv" + "strings" + + kedautil "github.com/kedacore/keda/v2/pkg/util" + v2beta2 "k8s.io/api/autoscaling/v2beta2" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/metrics/pkg/apis/external_metrics" + logf "sigs.k8s.io/controller-runtime/pkg/log" +) + +type activeMQScaler struct { + metadata *activeMQMetadata + httpClient *http.Client +} + +type activeMQMetadata struct { + managementEndpoint string + destinationName string + brokerName string + username string + password string + restAPITemplate string + queueSize int + corsHeader string +} + +type activeMQMonitoring struct { + MsgCount int `json:"value"` + Status int `json:"status"` + Timestamp int64 `json:"timestamp"` +} + +const ( + activeMQMetricType = "External" + defaultActiveMQQueueSize = 10 + defaultActiveMQrestAPITemplate = "http://<>/api/jolokia/read/org.apache.activemq:type=Broker,brokerName=<>,destinationType=Queue,destinationName=<>/QueueSize" + defaultActiveMQCorsHeader = "http://%s" +) + +var activeMQLog = logf.Log.WithName("activeMQ_scaler") + +// NewActiveMQScaler creates a new activeMQ Scaler +func NewActiveMQScaler(config *ScalerConfig) (Scaler, error) { + httpClient := kedautil.CreateHTTPClient(config.GlobalHTTPTimeout) + + activeMQMetadata, err := parseActiveMQMetadata(config) + if err != nil { + return nil, fmt.Errorf("error parsing activeMQ metadata: %s", err) + } + + return &activeMQScaler{ + metadata: activeMQMetadata, + httpClient: httpClient, + }, nil +} + +func parseActiveMQMetadata(config *ScalerConfig) (*activeMQMetadata, error) { + meta := activeMQMetadata{} + + meta.queueSize = defaultActiveMQQueueSize + + if val, ok := config.TriggerMetadata["restAPITemplate"]; ok && val != "" { + meta.restAPITemplate = config.TriggerMetadata["restAPITemplate"] + var err error + if meta, err = getRestAPIParameters(meta); err != nil { + return nil, fmt.Errorf("can't parse restAPITemplate : %s ", err) + } + } else { + meta.restAPITemplate = defaultActiveMQrestAPITemplate + if config.TriggerMetadata["managementEndpoint"] == "" { + return nil, errors.New("no management endpoint given") + } + meta.managementEndpoint = config.TriggerMetadata["managementEndpoint"] + + if config.TriggerMetadata["destinationName"] == "" { + return nil, errors.New("no destination name given") + } + meta.destinationName = config.TriggerMetadata["destinationName"] + + if config.TriggerMetadata["brokerName"] == "" { + return nil, errors.New("no broker name given") + } + meta.brokerName = config.TriggerMetadata["brokerName"] + } + if val, ok := config.TriggerMetadata["corsHeader"]; ok && val != "" { + meta.corsHeader = config.TriggerMetadata["corsHeader"] + } else { + meta.corsHeader = fmt.Sprintf(defaultActiveMQCorsHeader, meta.managementEndpoint) + } + + if val, ok := config.TriggerMetadata["queueSize"]; ok { + queueSize, err := strconv.Atoi(val) + if err != nil { + return nil, fmt.Errorf("can't parse queueSize: %s", err) + } + + meta.queueSize = queueSize + } + + if val, ok := config.AuthParams["username"]; ok && val != "" { + meta.username = val + } else if val, ok := config.TriggerMetadata["username"]; ok && val != "" { + username := val + + if val, ok := config.ResolvedEnv[username]; ok && val != "" { + meta.username = val + } else { + meta.username = username + } + } + + if meta.username == "" { + return nil, fmt.Errorf("username cannot be empty") + } + + if val, ok := config.AuthParams["password"]; ok && val != "" { + meta.password = val + } else if val, ok := config.TriggerMetadata["password"]; ok && val != "" { + password := val + + if val, ok := config.ResolvedEnv[password]; ok && val != "" { + meta.password = val + } else { + meta.password = password + } + } + + if meta.password == "" { + return nil, fmt.Errorf("password cannot be empty") + } + return &meta, nil +} + +func (s *activeMQScaler) IsActive(ctx context.Context) (bool, error) { + messages, err := s.getQueueMessageCount() + if err != nil { + activeMQLog.Error(err, "Unable to access the activeMQ management endpoint", "managementEndpoint", s.metadata.managementEndpoint) + return false, err + } + + return messages > 0, nil +} + +// getRestAPIParameters parse restAPITemplate to provide managementEndpoint, brokerName, destinationName +func getRestAPIParameters(meta activeMQMetadata) (activeMQMetadata, error) { + u, err := url.ParseRequestURI(meta.restAPITemplate) + if err != nil { + return meta, fmt.Errorf("unable to parse the activeMQ restAPITemplate: %s", err) + } + + meta.managementEndpoint = u.Host + splitURL := strings.Split(strings.Split(u.Path, ":")[1], "/")[0] // This returns : type=Broker,brokerName=<>,destinationType=Queue,destinationName=<> + replacer := strings.NewReplacer(",", "&") + v, err := url.ParseQuery(replacer.Replace(splitURL)) // This returns a map with key: string types and element type [] string. : map[brokerName:[<>] destinationName:[<>] destinationType:[Queue] type:[Broker]] + if err != nil { + return meta, fmt.Errorf("unable to parse the activeMQ restAPITemplate: %s", err) + } + + if len(v["destinationName"][0]) == 0 { + return meta, errors.New("no destinationName is given") + } + meta.destinationName = v["destinationName"][0] + + if len(v["brokerName"][0]) == 0 { + return meta, fmt.Errorf("no brokerName given: %s", meta.restAPITemplate) + } + meta.brokerName = v["brokerName"][0] + + return meta, nil +} + +func (s *activeMQScaler) getMonitoringEndpoint() string { + replacer := strings.NewReplacer("<>", s.metadata.managementEndpoint, + "<>", s.metadata.brokerName, + "<>", s.metadata.destinationName) + + monitoringEndpoint := replacer.Replace(s.metadata.restAPITemplate) + + return monitoringEndpoint +} + +func (s *activeMQScaler) getQueueMessageCount() (int, error) { + var monitoringInfo *activeMQMonitoring + var queueMessageCount int + + client := s.httpClient + url := s.getMonitoringEndpoint() + + req, err := http.NewRequest("GET", url, nil) + + req.SetBasicAuth(s.metadata.username, s.metadata.password) + req.Header.Set("Origin", s.metadata.corsHeader) + + if err != nil { + return -1, err + } + resp, err := client.Do(req) + if err != nil { + return -1, err + } + + defer resp.Body.Close() + + if err := json.NewDecoder(resp.Body).Decode(&monitoringInfo); err != nil { + return -1, err + } + if resp.StatusCode == 200 && monitoringInfo.Status == 200 { + queueMessageCount = monitoringInfo.MsgCount + } else { + return -1, fmt.Errorf("activeMQ management endpoint response error code : %d %d", resp.StatusCode, monitoringInfo.Status) + } + + activeMQLog.V(1).Info(fmt.Sprintf("ActiveMQ scaler: Providing metrics based on current queue size %d queue size limit %d", queueMessageCount, s.metadata.queueSize)) + + return queueMessageCount, nil +} + +func (s *activeMQScaler) GetMetricSpecForScaling() []v2beta2.MetricSpec { + targetMetricValue := resource.NewQuantity(int64(s.metadata.queueSize), resource.DecimalSI) + externalMetric := &v2beta2.ExternalMetricSource{ + Metric: v2beta2.MetricIdentifier{ + Name: kedautil.NormalizeString(fmt.Sprintf("%s-%s-%s", "activeMQ", s.metadata.brokerName, s.metadata.destinationName)), + }, + Target: v2beta2.MetricTarget{ + Type: v2beta2.AverageValueMetricType, + AverageValue: targetMetricValue, + }, + } + metricSpec := v2beta2.MetricSpec{External: externalMetric, Type: activeMQMetricType} + return []v2beta2.MetricSpec{metricSpec} +} + +func (s *activeMQScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) { + messages, err := s.getQueueMessageCount() + + if err != nil { + activeMQLog.Error(err, "Unable to access the activeMQ management endpoint", "managementEndpoint", s.metadata.managementEndpoint) + return []external_metrics.ExternalMetricValue{}, err + } + + metric := external_metrics.ExternalMetricValue{ + MetricName: metricName, + Value: *resource.NewQuantity(int64(messages), resource.DecimalSI), + Timestamp: metav1.Now(), + } + + return append([]external_metrics.ExternalMetricValue{}, metric), nil +} + +// Nothing to close here. +func (s *activeMQScaler) Close() error { + return nil +} diff --git a/pkg/scaling/scale_handler.go b/pkg/scaling/scale_handler.go index 78ed0417994..0548ca9e4fd 100644 --- a/pkg/scaling/scale_handler.go +++ b/pkg/scaling/scale_handler.go @@ -312,8 +312,8 @@ func (h *scaleHandler) buildScalers(withTriggers *kedav1alpha1.WithTriggers, pod func buildScaler(client client.Client, triggerType string, config *scalers.ScalerConfig) (scalers.Scaler, error) { // TRIGGERS-START switch triggerType { - case "activemq-classic": - return scalers.NewActivemqClassicScaler(config) + case "activemq": + return scalers.NewActiveMQScaler(config) case "artemis-queue": return scalers.NewArtemisQueueScaler(config) case "aws-cloudwatch":