diff --git a/pkg/scalers/rabbitmq_scaler.go b/pkg/scalers/rabbitmq_scaler.go index 3094dd4b8b7..e825bf94ecd 100644 --- a/pkg/scalers/rabbitmq_scaler.go +++ b/pkg/scalers/rabbitmq_scaler.go @@ -2,12 +2,15 @@ package scalers import ( "context" + "crypto/tls" + "crypto/x509" "encoding/json" "fmt" "io/ioutil" "net/http" "net/url" "strconv" + "strings" "time" "github.com/streadway/amqp" @@ -25,6 +28,7 @@ const ( rabbitMetricType = "External" rabbitIncludeUnacked = "includeUnacked" defaultIncludeUnacked = false + amqps = "amqps" ) type rabbitMQScaler struct { @@ -38,7 +42,10 @@ type rabbitMQMetadata struct { host string // connection string for AMQP protocol apiHost string // connection string for management API requests queueLength int - includeUnacked bool // if true uses HTTP API and requires apiHost, if false uses AMQP and requires host + includeUnacked bool // if true uses HTTP API and requires apiHost, if false uses AMQP and requires host + ca string //Certificate authority file for TLS client authentication. Optional. If authmode is sasl_ssl, this is required. + cert string //Certificate for client authentication. Optional. If authmode is sasl_ssl, this is required. + key string //Key for client authentication. Optional. If authmode is sasl_ssl, this is required. } type queueInfo struct { @@ -59,7 +66,7 @@ func NewRabbitMQScaler(resolvedEnv, metadata, authParams map[string]string) (Sca if meta.includeUnacked { return &rabbitMQScaler{metadata: meta}, nil } else { - conn, ch, err := getConnectionAndChannel(meta.host) + conn, ch, err := getConnectionAndChannel(meta.host, meta.ca, meta.cert, meta.key) if err != nil { return nil, fmt.Errorf("error establishing rabbitmq connection: %s", err) } @@ -110,6 +117,24 @@ func parseRabbitMQMetadata(resolvedEnv, metadata, authParams map[string]string) } } + if strings.HasPrefix(meta.host, amqps) { + if val, ok := authParams["ca"]; ok { + meta.ca = val + } else { + return nil, fmt.Errorf("rabbitmq host is using amqps://, but no ca given") + } + if val, ok := authParams["cert"]; ok { + meta.cert = val + } else { + return nil, fmt.Errorf("rabbitmq host is using amqps://, no cert given") + } + if val, ok := authParams["key"]; ok { + meta.key = val + } else { + return nil, fmt.Errorf("rabbitmq host is using amqps://, no key given") + } + } + if val, ok := metadata["queueName"]; ok { meta.queueName = val } else { @@ -130,8 +155,27 @@ func parseRabbitMQMetadata(resolvedEnv, metadata, authParams map[string]string) return &meta, nil } -func getConnectionAndChannel(host string) (*amqp.Connection, *amqp.Channel, error) { - conn, err := amqp.Dial(host) +func getConnectionAndChannel(host string, caFile string, certFile string, keyFile string) (*amqp.Connection, *amqp.Channel, error) { + var conn *amqp.Connection + var err error + + if strings.HasPrefix(host, amqps) { + cfg := new(tls.Config) + + cfg.RootCAs = x509.NewCertPool() + + if ca, err := ioutil.ReadFile(caFile); err == nil { + cfg.RootCAs.AppendCertsFromPEM(ca) + } + + if cert, err := tls.LoadX509KeyPair(certFile, keyFile); err == nil { + cfg.Certificates = append(cfg.Certificates, cert) + } + conn, err = amqp.DialTLS(host, cfg) + } else { + conn, err = amqp.Dial(host) + } + if err != nil { return nil, nil, err } diff --git a/pkg/scalers/rabbitmq_scaler_test.go b/pkg/scalers/rabbitmq_scaler_test.go index 324d44f58d8..f97cad85873 100644 --- a/pkg/scalers/rabbitmq_scaler_test.go +++ b/pkg/scalers/rabbitmq_scaler_test.go @@ -45,6 +45,10 @@ var testRabbitMQMetadata = []parseRabbitMQMetadataTestData{ {map[string]string{"queueLength": "10"}, true, map[string]string{"host": host}}, // properly formed metadata with includeUnacked {map[string]string{"queueLength": "10", "queueName": "sample", "apiHostFromEnv": apiHost, "includeUnacked": "true"}, false, map[string]string{}}, + // properly formed metadata with amqps but no cert/ca/key + {map[string]string{"queueLength": "10", "queueName": "sample", "hostFromEnv": host}, true, map[string]string{"host": "amqps://user:sercet@somehost.com:5236/vhost"}}, + // properly formed metadata with amqps + {map[string]string{"queueLength": "10", "queueName": "sample", "hostFromEnv": host}, false, map[string]string{"host": "amqps://user:sercet@somehost.com:5236/vhost", "ca": "ca", "cert": "cert", "key": "key"}}, } var rabbitMQMetricIdentifiers = []rabbitMQMetricIdentifier{