Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ActiveMQ Classic Scaler #2121

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
### New

- TODO ([#XXX](https://github.com/kedacore/keda/pull/XXX))
- Add ActiveMQ Classic Scaler ([#2121](https://github.com/kedacore/keda/pull/2121))
- ScaledJob: introduce MultipleScalersCalculation ([#2016](https://github.com/kedacore/keda/pull/2016))
- Add Graphite Scaler ([#1628](https://github.com/kedacore/keda/pull/2092))

Expand Down
4 changes: 2 additions & 2 deletions adapter/generated/openapi/zz_generated.openapi.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: v0.4.1
controller-gen.kubebuilder.io/version: v0.6.1
melisatanrverdi marked this conversation as resolved.
Show resolved Hide resolved
creationTimestamp: null
name: clustertriggerauthentications.keda.sh
spec:
Expand Down Expand Up @@ -90,6 +90,8 @@ spec:
type: object
mount:
type: string
namespace:
type: string
role:
type: string
secrets:
Expand Down
2 changes: 1 addition & 1 deletion config/crd/bases/keda.sh_scaledjobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: v0.4.1
controller-gen.kubebuilder.io/version: v0.6.1
creationTimestamp: null
name: scaledjobs.keda.sh
spec:
Expand Down
2 changes: 1 addition & 1 deletion config/crd/bases/keda.sh_scaledobjects.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: v0.4.1
controller-gen.kubebuilder.io/version: v0.6.1
creationTimestamp: null
name: scaledobjects.keda.sh
spec:
Expand Down
4 changes: 3 additions & 1 deletion config/crd/bases/keda.sh_triggerauthentications.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: v0.4.1
controller-gen.kubebuilder.io/version: v0.6.1
creationTimestamp: null
name: triggerauthentications.keda.sh
spec:
Expand Down Expand Up @@ -89,6 +89,8 @@ spec:
type: object
mount:
type: string
namespace:
type: string
role:
type: string
secrets:
Expand Down
264 changes: 264 additions & 0 deletions pkg/scalers/activemq_scaler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,264 @@
package scalers

import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"net/url"
"strconv"
"strings"

kedautil "github.com/kedacore/keda/v2/pkg/util"
v2beta2 "k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/metrics/pkg/apis/external_metrics"
logf "sigs.k8s.io/controller-runtime/pkg/log"
)

type activeMQScaler struct {
metadata *activeMQMetadata
httpClient *http.Client
}

type activeMQMetadata struct {
managementEndpoint string
destinationName string
brokerName string
username string
password string
restAPITemplate string
queueSize int
corsHeader string
}

type activeMQMonitoring struct {
MsgCount int `json:"value"`
Status int `json:"status"`
Timestamp int64 `json:"timestamp"`
}

const (
activeMQMetricType = "External"
defaultActiveMQQueueSize = 10
defaultActiveMQrestAPITemplate = "http://<<managementEndpoint>>/api/jolokia/read/org.apache.activemq:type=Broker,brokerName=<<brokerName>>,destinationType=Queue,destinationName=<<destinationName>>/QueueSize"
melisatanrverdi marked this conversation as resolved.
Show resolved Hide resolved
defaultActiveMQCorsHeader = "http://%s"
)

var activeMQLog = logf.Log.WithName("activeMQ_scaler")

// NewActiveMQScaler creates a new activeMQ Scaler
func NewActiveMQScaler(config *ScalerConfig) (Scaler, error) {
httpClient := kedautil.CreateHTTPClient(config.GlobalHTTPTimeout)

activeMQMetadata, err := parseActiveMQMetadata(config)
if err != nil {
return nil, fmt.Errorf("error parsing activeMQ metadata: %s", err)
}

return &activeMQScaler{
metadata: activeMQMetadata,
httpClient: httpClient,
}, nil
}

func parseActiveMQMetadata(config *ScalerConfig) (*activeMQMetadata, error) {
meta := activeMQMetadata{}

meta.queueSize = defaultActiveMQQueueSize

if val, ok := config.TriggerMetadata["restAPITemplate"]; ok && val != "" {
meta.restAPITemplate = config.TriggerMetadata["restAPITemplate"]
var err error
if meta, err = getRestAPIParameters(meta); err != nil {
return nil, fmt.Errorf("can't parse restAPITemplate : %s ", err)
}
} else {
meta.restAPITemplate = defaultActiveMQrestAPITemplate
if config.TriggerMetadata["managementEndpoint"] == "" {
return nil, errors.New("no management endpoint given")
}
meta.managementEndpoint = config.TriggerMetadata["managementEndpoint"]

if config.TriggerMetadata["destinationName"] == "" {
return nil, errors.New("no destination name given")
}
meta.destinationName = config.TriggerMetadata["destinationName"]

if config.TriggerMetadata["brokerName"] == "" {
return nil, errors.New("no broker name given")
}
meta.brokerName = config.TriggerMetadata["brokerName"]
}
if val, ok := config.TriggerMetadata["corsHeader"]; ok && val != "" {
meta.corsHeader = config.TriggerMetadata["corsHeader"]
} else {
meta.corsHeader = fmt.Sprintf(defaultActiveMQCorsHeader, meta.managementEndpoint)
}

if val, ok := config.TriggerMetadata["queueSize"]; ok {
queueSize, err := strconv.Atoi(val)
if err != nil {
return nil, fmt.Errorf("can't parse queueSize: %s", err)
}

meta.queueSize = queueSize
}

if val, ok := config.AuthParams["username"]; ok && val != "" {
meta.username = val
} else if val, ok := config.TriggerMetadata["username"]; ok && val != "" {
username := val

if val, ok := config.ResolvedEnv[username]; ok && val != "" {
meta.username = val
} else {
meta.username = username
}
}

if meta.username == "" {
return nil, fmt.Errorf("username cannot be empty")
}

if val, ok := config.AuthParams["password"]; ok && val != "" {
meta.password = val
} else if val, ok := config.TriggerMetadata["password"]; ok && val != "" {
password := val

if val, ok := config.ResolvedEnv[password]; ok && val != "" {
meta.password = val
} else {
meta.password = password
}
}

if meta.password == "" {
return nil, fmt.Errorf("password cannot be empty")
}
return &meta, nil
}

func (s *activeMQScaler) IsActive(ctx context.Context) (bool, error) {
messages, err := s.getQueueMessageCount()
if err != nil {
activeMQLog.Error(err, "Unable to access the activeMQ management endpoint", "managementEndpoint", s.metadata.managementEndpoint)
return false, err
}

return messages > 0, nil
}

// getRestAPIParameters parse restAPITemplate to provide managementEndpoint, brokerName, destinationName
func getRestAPIParameters(meta activeMQMetadata) (activeMQMetadata, error) {
u, err := url.ParseRequestURI(meta.restAPITemplate)
if err != nil {
return meta, fmt.Errorf("unable to parse the activeMQ restAPITemplate: %s", err)
}

meta.managementEndpoint = u.Host
splitURL := strings.Split(strings.Split(u.Path, ":")[1], "/")[0] // This returns : type=Broker,brokerName=<<brokerName>>,destinationType=Queue,destinationName=<<destinationName>>
replacer := strings.NewReplacer(",", "&")
v, err := url.ParseQuery(replacer.Replace(splitURL)) // This returns a map with key: string types and element type [] string. : map[brokerName:[<<brokerName>>] destinationName:[<<destinationName>>] destinationType:[Queue] type:[Broker]]
if err != nil {
return meta, fmt.Errorf("unable to parse the activeMQ restAPITemplate: %s", err)
}

if len(v["destinationName"][0]) == 0 {
return meta, errors.New("no destinationName is given")
}
meta.destinationName = v["destinationName"][0]

if len(v["brokerName"][0]) == 0 {
return meta, fmt.Errorf("no brokerName given: %s", meta.restAPITemplate)
}
meta.brokerName = v["brokerName"][0]

return meta, nil
}

func (s *activeMQScaler) getMonitoringEndpoint() string {
replacer := strings.NewReplacer("<<managementEndpoint>>", s.metadata.managementEndpoint,
"<<brokerName>>", s.metadata.brokerName,
"<<destinationName>>", s.metadata.destinationName)

monitoringEndpoint := replacer.Replace(s.metadata.restAPITemplate)

return monitoringEndpoint
}

func (s *activeMQScaler) getQueueMessageCount() (int, error) {
var monitoringInfo *activeMQMonitoring
var queueMessageCount int

client := s.httpClient
url := s.getMonitoringEndpoint()

req, err := http.NewRequest("GET", url, nil)

req.SetBasicAuth(s.metadata.username, s.metadata.password)
req.Header.Set("Origin", s.metadata.corsHeader)

if err != nil {
return -1, err
}
resp, err := client.Do(req)
if err != nil {
return -1, err
}

defer resp.Body.Close()

if err := json.NewDecoder(resp.Body).Decode(&monitoringInfo); err != nil {
return -1, err
}
if resp.StatusCode == 200 && monitoringInfo.Status == 200 {
queueMessageCount = monitoringInfo.MsgCount
} else {
return -1, fmt.Errorf("activeMQ management endpoint response error code : %d %d", resp.StatusCode, monitoringInfo.Status)
}

activeMQLog.V(1).Info(fmt.Sprintf("ActiveMQ scaler: Providing metrics based on current queue size %d queue size limit %d", queueMessageCount, s.metadata.queueSize))

return queueMessageCount, nil
}

func (s *activeMQScaler) GetMetricSpecForScaling() []v2beta2.MetricSpec {
targetMetricValue := resource.NewQuantity(int64(s.metadata.queueSize), resource.DecimalSI)
externalMetric := &v2beta2.ExternalMetricSource{
Metric: v2beta2.MetricIdentifier{
Name: kedautil.NormalizeString(fmt.Sprintf("%s-%s-%s", "activeMQ", s.metadata.brokerName, s.metadata.destinationName)),
},
Target: v2beta2.MetricTarget{
Type: v2beta2.AverageValueMetricType,
AverageValue: targetMetricValue,
},
}
metricSpec := v2beta2.MetricSpec{External: externalMetric, Type: activeMQMetricType}
return []v2beta2.MetricSpec{metricSpec}
}

func (s *activeMQScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
messages, err := s.getQueueMessageCount()

if err != nil {
activeMQLog.Error(err, "Unable to access the activeMQ management endpoint", "managementEndpoint", s.metadata.managementEndpoint)
return []external_metrics.ExternalMetricValue{}, err
}

metric := external_metrics.ExternalMetricValue{
MetricName: metricName,
Value: *resource.NewQuantity(int64(messages), resource.DecimalSI),
Timestamp: metav1.Now(),
}

return append([]external_metrics.ExternalMetricValue{}, metric), nil
}

// Nothing to close here.
func (s *activeMQScaler) Close() error {
return nil
}
2 changes: 2 additions & 0 deletions pkg/scaling/scale_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,8 @@ func (h *scaleHandler) buildScalers(withTriggers *kedav1alpha1.WithTriggers, pod
func buildScaler(client client.Client, triggerType string, config *scalers.ScalerConfig) (scalers.Scaler, error) {
// TRIGGERS-START
switch triggerType {
case "activemq":
return scalers.NewActiveMQScaler(config)
case "artemis-queue":
return scalers.NewArtemisQueueScaler(config)
case "aws-cloudwatch":
Expand Down