From ba93cab322a96463c5ab2a1209f5fc4f5a4537b8 Mon Sep 17 00:00:00 2001 From: Abhishek Mohite Date: Tue, 17 Jan 2023 14:22:46 +0530 Subject: [PATCH] Add support for tls in rabbitmq scaler (#967) (#4086) * Add support for tls in rabbitmq scaler (#967) Signed-off-by: Abhishek Mohite * Fix: linting errors Signed-off-by: Abhishek Mohite * Fix: linting errors Signed-off-by: Abhishek Mohite * Fix: linting errors Signed-off-by: Abhishek Mohite * Fix: linting errors Signed-off-by: Abhishek Mohite * Fix: linting errors Signed-off-by: Abhishek Mohite * Fix: linting errors Signed-off-by: Abhishek Mohite * Fix: linting errors Signed-off-by: Abhishek Mohite * Add tests Signed-off-by: Abhishek Mohite * Add suggested changes Signed-off-by: Abhishek Mohite * Fix: linting errors Signed-off-by: Abhishek Mohite * Fix: failing test Signed-off-by: Abhishek Mohite Signed-off-by: Abhishek Mohite --- CHANGELOG.md | 1 + pkg/scalers/rabbitmq_scaler.go | 51 +++++++++++++++++++++++++++-- pkg/scalers/rabbitmq_scaler_test.go | 51 +++++++++++++++++++++++++++++ 3 files changed, 100 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 02a24bec518..54e06161f7c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -61,6 +61,7 @@ Here is an overview of all new **experimental** features: ### Improvements - **General**: Use (self-signed) certificates for all the communications (internals and externals) ([#3931](https://github.com/kedacore/keda/issues/3931)) +- **RabbitMQ Scaler**: Add TLS support ([#967](https://github.com/kedacore/keda/issues/967)) - **Redis Scalers**: Add support to Redis 7 ([#4052](https://github.com/kedacore/keda/issues/4052)) - **Selenium Grid Scaler**: Add 'platformName' to selenium-grid scaler metadata structure ([#4038](https://github.com/kedacore/keda/issues/4038)) diff --git a/pkg/scalers/rabbitmq_scaler.go b/pkg/scalers/rabbitmq_scaler.go index e0ea41d4160..2f4b059f26e 100644 --- a/pkg/scalers/rabbitmq_scaler.go +++ b/pkg/scalers/rabbitmq_scaler.go @@ -9,6 +9,7 @@ import ( "net/url" "regexp" "strconv" + "strings" "time" "github.com/go-logr/logr" @@ -35,6 +36,7 @@ const ( defaultRabbitMQQueueLength = 20 rabbitMetricType = "External" rabbitRootVhostPath = "/%2F" + rmqTLSEnable = "enable" ) const ( @@ -75,6 +77,13 @@ type rabbitMQMetadata struct { metricName string // custom metric name for trigger timeout time.Duration // custom http timeout for a specific trigger scalerIndex int // scaler index + + // TLS + ca string + cert string + key string + keyPassword string + enableTLS bool } type queueInfo struct { @@ -129,7 +138,7 @@ func NewRabbitMQScaler(config *ScalerConfig) (Scaler, error) { host = hostURI.String() } - conn, ch, err := getConnectionAndChannel(host) + conn, ch, err := getConnectionAndChannel(host, meta) if err != nil { return nil, fmt.Errorf("error establishing rabbitmq connection: %w", err) } @@ -167,6 +176,28 @@ func parseRabbitMQMetadata(config *ScalerConfig) (*rabbitMQMetadata, error) { return nil, fmt.Errorf("no host setting given") } + // Resolve TLS authentication parameters + meta.enableTLS = false + if val, ok := config.AuthParams["tls"]; ok { + val = strings.TrimSpace(val) + if val == rmqTLSEnable { + meta.ca = config.AuthParams["ca"] + meta.cert = config.AuthParams["cert"] + meta.key = config.AuthParams["key"] + meta.enableTLS = true + } else if val != "disable" { + return nil, fmt.Errorf("err incorrect value for TLS given: %s", val) + } + } + + meta.keyPassword = config.AuthParams["keyPassword"] + + certGiven := meta.cert != "" + keyGiven := meta.key != "" + if certGiven != keyGiven { + return nil, fmt.Errorf("both key and cert must be provided") + } + // If the protocol is auto, check the host scheme. if meta.protocol == autoProtocol { parsedURL, err := url.Parse(meta.host) @@ -354,8 +385,22 @@ func parseTrigger(meta *rabbitMQMetadata, config *ScalerConfig) (*rabbitMQMetada return meta, nil } -func getConnectionAndChannel(host string) (*amqp.Connection, *amqp.Channel, error) { - conn, err := amqp.Dial(host) +// getConnectionAndChannel returns an amqp connection. If enableTLS is true tls connection is made using +// +// the given ceClient cert, ceClient key,and CA certificate. If clientKeyPassword is not empty the provided password will be used to +// +// decrypt the given key. If enableTLS is disabled then amqp connection will be created without tls. +func getConnectionAndChannel(host string, meta *rabbitMQMetadata) (*amqp.Connection, *amqp.Channel, error) { + var conn *amqp.Connection + var err error + if meta.enableTLS { + tlsConfig, configErr := kedautil.NewTLSConfigWithPassword(meta.cert, meta.key, meta.keyPassword, meta.ca) + if configErr == nil { + conn, err = amqp.DialTLS(host, tlsConfig) + } + } else { + conn, err = amqp.Dial(host) + } if err != nil { return nil, nil, err } diff --git a/pkg/scalers/rabbitmq_scaler_test.go b/pkg/scalers/rabbitmq_scaler_test.go index 3cb8a8b491e..0ca90e17632 100644 --- a/pkg/scalers/rabbitmq_scaler_test.go +++ b/pkg/scalers/rabbitmq_scaler_test.go @@ -22,6 +22,13 @@ type parseRabbitMQMetadataTestData struct { authParams map[string]string } +type parseRabbitMQAuthParamTestData struct { + metadata map[string]string + authParams map[string]string + isError bool + enableTLS bool +} + type rabbitMQMetricIdentifier struct { metadataTestData *parseRabbitMQMetadataTestData index int @@ -121,6 +128,21 @@ var testRabbitMQMetadata = []parseRabbitMQMetadataTestData{ {map[string]string{"mode": "QueueLength", "value": "1000", "queueName": "sample", "host": "amqp://", "useRegex": "true", "excludeUnacknowledged": "true"}, true, map[string]string{}}, } +var testRabbitMQAuthParamData = []parseRabbitMQAuthParamTestData{ + {map[string]string{"queueName": "sample", "hostFromEnv": host}, map[string]string{"tls": "enable", "ca": "caaa", "cert": "ceert", "key": "keey"}, false, true}, + // success, TLS cert/key and assumed public CA + {map[string]string{"queueName": "sample", "hostFromEnv": host}, map[string]string{"tls": "enable", "cert": "ceert", "key": "keey"}, false, true}, + // success, TLS cert/key + key password and assumed public CA + {map[string]string{"queueName": "sample", "hostFromEnv": host}, map[string]string{"tls": "enable", "cert": "ceert", "key": "keey", "keyPassword": "keeyPassword"}, false, true}, + // success, TLS CA only + {map[string]string{"queueName": "sample", "hostFromEnv": host}, map[string]string{"tls": "enable", "ca": "caaa"}, false, true}, + // failure, TLS missing cert + {map[string]string{"queueName": "sample", "hostFromEnv": host}, map[string]string{"tls": "enable", "ca": "caaa", "key": "kee"}, true, true}, + // failure, TLS missing key + {map[string]string{"queueName": "sample", "hostFromEnv": host}, map[string]string{"tls": "enable", "ca": "caaa", "cert": "ceert"}, true, true}, + // failure, TLS invalid + {map[string]string{"queueName": "sample", "hostFromEnv": host}, map[string]string{"tls": "yes", "ca": "caaa", "cert": "ceert", "key": "kee"}, true, true}, +} var rabbitMQMetricIdentifiers = []rabbitMQMetricIdentifier{ {&testRabbitMQMetadata[1], 0, "s0-rabbitmq-sample"}, {&testRabbitMQMetadata[7], 1, "s1-rabbitmq-namespace-2Fname"}, @@ -139,6 +161,35 @@ func TestRabbitMQParseMetadata(t *testing.T) { } } +func TestRabbitMQParseAuthParamdata(t *testing.T) { + for _, testData := range testRabbitMQAuthParamData { + metadata, err := parseRabbitMQMetadata(&ScalerConfig{ResolvedEnv: sampleRabbitMqResolvedEnv, TriggerMetadata: testData.metadata, AuthParams: testData.authParams}) + if err != nil && !testData.isError { + t.Error("Expected success but got error", err) + } + if testData.isError && err == nil { + t.Error("Expected error but got success") + } + if metadata != nil && metadata.enableTLS != testData.enableTLS { + t.Errorf("Expected enableTLS to be set to %v but got %v\n", testData.enableTLS, metadata.enableTLS) + } + if metadata != nil && metadata.enableTLS { + if metadata.ca != testData.authParams["ca"] { + t.Errorf("Expected ca to be set to %v but got %v\n", testData.authParams["ca"], metadata.enableTLS) + } + if metadata.cert != testData.authParams["cert"] { + t.Errorf("Expected cert to be set to %v but got %v\n", testData.authParams["cert"], metadata.cert) + } + if metadata.key != testData.authParams["key"] { + t.Errorf("Expected key to be set to %v but got %v\n", testData.authParams["key"], metadata.key) + } + if metadata.keyPassword != testData.authParams["keyPassword"] { + t.Errorf("Expected key to be set to %v but got %v\n", testData.authParams["keyPassword"], metadata.key) + } + } + } +} + var testDefaultQueueLength = []parseRabbitMQMetadataTestData{ // use default queueLength {map[string]string{"queueName": "sample", "hostFromEnv": host}, false, map[string]string{}},