-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[v2] Add SSL support to RabbitMQ Scaler #1073
Changes from all commits
b9f3246
18797f3
b3a236f
1203a69
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,12 +2,15 @@ package scalers | |
|
||
import ( | ||
"context" | ||
"crypto/tls" | ||
"crypto/x509" | ||
"encoding/json" | ||
"fmt" | ||
"io/ioutil" | ||
"net/http" | ||
"net/url" | ||
"strconv" | ||
"strings" | ||
"time" | ||
|
||
"github.com/streadway/amqp" | ||
|
@@ -25,6 +28,7 @@ const ( | |
rabbitMetricType = "External" | ||
rabbitIncludeUnacked = "includeUnacked" | ||
defaultIncludeUnacked = false | ||
amqps = "amqps" | ||
) | ||
|
||
type rabbitMQScaler struct { | ||
|
@@ -38,7 +42,10 @@ type rabbitMQMetadata struct { | |
host string // connection string for AMQP protocol | ||
apiHost string // connection string for management API requests | ||
queueLength int | ||
includeUnacked bool // if true uses HTTP API and requires apiHost, if false uses AMQP and requires host | ||
includeUnacked bool // if true uses HTTP API and requires apiHost, if false uses AMQP and requires host | ||
ca string //Certificate authority file for TLS client authentication. Optional. If authmode is sasl_ssl, this is required. | ||
cert string //Certificate for client authentication. Optional. If authmode is sasl_ssl, this is required. | ||
key string //Key for client authentication. Optional. If authmode is sasl_ssl, this is required. | ||
} | ||
|
||
type queueInfo struct { | ||
|
@@ -59,7 +66,7 @@ func NewRabbitMQScaler(resolvedEnv, metadata, authParams map[string]string) (Sca | |
if meta.includeUnacked { | ||
return &rabbitMQScaler{metadata: meta}, nil | ||
} else { | ||
conn, ch, err := getConnectionAndChannel(meta.host) | ||
conn, ch, err := getConnectionAndChannel(meta.host, meta.ca, meta.cert, meta.key) | ||
if err != nil { | ||
return nil, fmt.Errorf("error establishing rabbitmq connection: %s", err) | ||
} | ||
|
@@ -110,6 +117,24 @@ func parseRabbitMQMetadata(resolvedEnv, metadata, authParams map[string]string) | |
} | ||
} | ||
|
||
if strings.HasPrefix(meta.host, amqps) { | ||
if val, ok := authParams["ca"]; ok { | ||
meta.ca = val | ||
} else { | ||
return nil, fmt.Errorf("rabbitmq host is using amqps://, but no ca given") | ||
} | ||
if val, ok := authParams["cert"]; ok { | ||
meta.cert = val | ||
} else { | ||
return nil, fmt.Errorf("rabbitmq host is using amqps://, no cert given") | ||
} | ||
if val, ok := authParams["key"]; ok { | ||
meta.key = val | ||
} else { | ||
return nil, fmt.Errorf("rabbitmq host is using amqps://, no key given") | ||
} | ||
} | ||
|
||
if val, ok := metadata["queueName"]; ok { | ||
meta.queueName = val | ||
} else { | ||
|
@@ -130,8 +155,27 @@ func parseRabbitMQMetadata(resolvedEnv, metadata, authParams map[string]string) | |
return &meta, nil | ||
} | ||
|
||
func getConnectionAndChannel(host string) (*amqp.Connection, *amqp.Channel, error) { | ||
conn, err := amqp.Dial(host) | ||
func getConnectionAndChannel(host string, caFile string, certFile string, keyFile string) (*amqp.Connection, *amqp.Channel, error) { | ||
var conn *amqp.Connection | ||
var err error | ||
|
||
if strings.HasPrefix(host, amqps) { | ||
cfg := new(tls.Config) | ||
|
||
cfg.RootCAs = x509.NewCertPool() | ||
|
||
if ca, err := ioutil.ReadFile(caFile); err == nil { | ||
cfg.RootCAs.AppendCertsFromPEM(ca) | ||
} | ||
|
||
if cert, err := tls.LoadX509KeyPair(certFile, keyFile); err == nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In my opinion, this should not be the required parameter. client cert and the key are only required when client authentication at the server is set to REQUIREANDVERIFY. In other cases, it could be empty. Until and unless I am missing something completely in regards to rabbitmq tls. |
||
cfg.Certificates = append(cfg.Certificates, cert) | ||
} | ||
conn, err = amqp.DialTLS(host, cfg) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This could fail in some cases. So DialTLS has this defined: Either specify InsecureSkipVerify or take servername as parameter. @ahmelsayed this is same what I have mentioned in this #1263 |
||
} else { | ||
conn, err = amqp.Dial(host) | ||
} | ||
|
||
if err != nil { | ||
return nil, nil, err | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -45,6 +45,10 @@ var testRabbitMQMetadata = []parseRabbitMQMetadataTestData{ | |
{map[string]string{"queueLength": "10"}, true, map[string]string{"host": host}}, | ||
// properly formed metadata with includeUnacked | ||
{map[string]string{"queueLength": "10", "queueName": "sample", "apiHostFromEnv": apiHost, "includeUnacked": "true"}, false, map[string]string{}}, | ||
// properly formed metadata with amqps but no cert/ca/key | ||
{map[string]string{"queueLength": "10", "queueName": "sample", "hostFromEnv": host}, true, map[string]string{"host": "amqps://user:[email protected]:5236/vhost"}}, | ||
// properly formed metadata with amqps | ||
{map[string]string{"queueLength": "10", "queueName": "sample", "hostFromEnv": host}, false, map[string]string{"host": "amqps://user:[email protected]:5236/vhost", "ca": "ca", "cert": "cert", "key": "key"}}, | ||
} | ||
|
||
var rabbitMQMetricIdentifiers = []rabbitMQMetricIdentifier{ | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With the recent change, we have added tls_config.go in pkg/util. That is supposed to be one-stop destination to enable tls encryption. Can you please move this to tls_config.go