Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v2] Kafka: split metadata and config for SASL and TLS #1074

Merged
merged 4 commits into from
Sep 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,10 @@

- Change `apiGroup` from `keda.k8s.io` to `keda.sh` ([#552](https://github.com/kedacore/keda/issues/552))
- Introduce a separate ScaledObject and ScaledJob([#653](https://github.com/kedacore/keda/issues/653))
- Remove `New()` and `Close()` from the interface of `service ExternalScaler` in `externalscaler.proto` ([#865](https://github.com/kedacore/keda/pull/865))
- Remove `New()` and `Close()` from the interface of `service ExternalScaler` in `externalscaler.proto`.
- Removed deprecated brokerList for Kafka scaler ([#882](https://github.com/kedacore/keda/pull/882))
- All scalers metadata that is resolved from the scaleTarget environment have suffix `FromEnv` added. e.g: `connection` -> `connectionFromEnv` ([#1072](https://github.com/kedacore/keda/pull/1072))
- All scalers metadata that is resolved from the scaleTarget environment have suffix `FromEnv` added. e.g: `connection` -> `connectionFromEnv`
- Kafka: split metadata and config for SASL and TLS ([#1074](https://github.com/kedacore/keda/pull/1074))
zroubalik marked this conversation as resolved.
Show resolved Hide resolved

### Other
- Update Operator SDK and k8s deps ([#1007](https://github.com/kedacore/keda/pull/1007),[#870](https://github.com/kedacore/keda/issues/870))
Expand Down
176 changes: 94 additions & 82 deletions pkg/scalers/kafka_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"fmt"
"strconv"
"strings"
"time"

"github.com/Shopify/sarama"
v2beta2 "k8s.io/api/autoscaling/v2beta2"
Expand All @@ -32,15 +31,16 @@ type kafkaMetadata struct {
lagThreshold int64
offsetResetPolicy offsetResetPolicy

// auth
authMode kafkaAuthMode
// SASL
saslType kafkaSaslType
username string
password string

// ssl
cert string
key string
ca string
// TLS
enableTLS bool
cert string
key string
ca string
}

type offsetResetPolicy string
Expand All @@ -50,15 +50,14 @@ const (
earliest offsetResetPolicy = "earliest"
)

type kafkaAuthMode string
type kafkaSaslType string

// supported SASL types
const (
kafkaAuthModeForNone kafkaAuthMode = "none"
kafkaAuthModeForSaslPlaintext kafkaAuthMode = "sasl_plaintext"
kafkaAuthModeForSaslScramSha256 kafkaAuthMode = "sasl_scram_sha256"
kafkaAuthModeForSaslScramSha512 kafkaAuthMode = "sasl_scram_sha512"
kafkaAuthModeForSaslSSL kafkaAuthMode = "sasl_ssl"
kafkaAuthModeForSaslSSLPlain kafkaAuthMode = "sasl_ssl_plain"
KafkaSASLTypeNone kafkaSaslType = "none"
KafkaSASLTypePlaintext kafkaSaslType = "plaintext"
KafkaSASLTypeSCRAMSHA256 kafkaSaslType = "scram_sha256"
KafkaSASLTypeSCRAMSHA512 kafkaSaslType = "scram_sha512"
zroubalik marked this conversation as resolved.
Show resolved Hide resolved
)

const (
Expand Down Expand Up @@ -130,45 +129,50 @@ func parseKafkaMetadata(resolvedEnv, metadata, authParams map[string]string) (ka
meta.lagThreshold = t
}

meta.authMode = kafkaAuthModeForNone
if val, ok := authParams["authMode"]; ok {
meta.saslType = KafkaSASLTypeNone
if val, ok := authParams["sasl"]; ok {
val = strings.TrimSpace(val)
mode := kafkaAuthMode(val)

if mode != kafkaAuthModeForNone && mode != kafkaAuthModeForSaslPlaintext && mode != kafkaAuthModeForSaslSSL && mode != kafkaAuthModeForSaslSSLPlain && mode != kafkaAuthModeForSaslScramSha256 && mode != kafkaAuthModeForSaslScramSha512 {
return meta, fmt.Errorf("err auth mode %s given", mode)
mode := kafkaSaslType(val)

if mode == KafkaSASLTypePlaintext || mode == KafkaSASLTypeSCRAMSHA256 || mode == KafkaSASLTypeSCRAMSHA512 {
if authParams["username"] == "" {
return meta, errors.New("no username given")
}
meta.username = strings.TrimSpace(authParams["username"])

if authParams["password"] == "" {
return meta, errors.New("no password given")
}
meta.password = strings.TrimSpace(authParams["password"])
meta.saslType = mode
} else {
return meta, fmt.Errorf("err SASL mode %s given", mode)
}

meta.authMode = mode
}

if meta.authMode != kafkaAuthModeForNone && meta.authMode != kafkaAuthModeForSaslSSL {
if authParams["username"] == "" {
return meta, errors.New("no username given")
}
meta.username = strings.TrimSpace(authParams["username"])

if authParams["password"] == "" {
return meta, errors.New("no password given")
}
meta.password = strings.TrimSpace(authParams["password"])
}

if meta.authMode == kafkaAuthModeForSaslSSL {
if authParams["ca"] == "" {
return meta, errors.New("no ca given")
}
meta.ca = authParams["ca"]

if authParams["cert"] == "" {
return meta, errors.New("no cert given")
}
meta.cert = authParams["cert"]
meta.enableTLS = false
if val, ok := authParams["tls"]; ok {
val = strings.TrimSpace(val)

if authParams["key"] == "" {
return meta, errors.New("no key given")
if val == "enable" {
if authParams["ca"] == "" {
return meta, errors.New("no ca given")
}
meta.ca = authParams["ca"]

if authParams["cert"] == "" {
return meta, errors.New("no cert given")
}
meta.cert = authParams["cert"]

if authParams["key"] == "" {
return meta, errors.New("no key given")
}
meta.key = authParams["key"]
meta.enableTLS = true
} else {
return meta, fmt.Errorf("err incorrect value for TLS given: %s", val)
}
meta.key = authParams["key"]
}

return meta, nil
Expand Down Expand Up @@ -206,56 +210,33 @@ func getKafkaClients(metadata kafkaMetadata) (sarama.Client, sarama.ClusterAdmin
config := sarama.NewConfig()
config.Version = sarama.V1_0_0_0

if ok := metadata.authMode == kafkaAuthModeForSaslPlaintext || metadata.authMode == kafkaAuthModeForSaslSSLPlain || metadata.authMode == kafkaAuthModeForSaslScramSha256 || metadata.authMode == kafkaAuthModeForSaslScramSha512; ok {
if metadata.saslType != KafkaSASLTypeNone {
config.Net.SASL.Enable = true
config.Net.SASL.User = metadata.username
config.Net.SASL.Password = metadata.password
}

if metadata.authMode == kafkaAuthModeForSaslSSLPlain {
config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypePlaintext)

tlsConfig := &tls.Config{
InsecureSkipVerify: true,
ClientAuth: 0,
}

if metadata.enableTLS {
config.Net.TLS.Enable = true
config.Net.TLS.Config = tlsConfig
config.Net.DialTimeout = 10 * time.Second
}

if metadata.authMode == kafkaAuthModeForSaslSSL {
cert, err := tls.X509KeyPair([]byte(metadata.cert), []byte(metadata.key))
tlsConfig, err := newTLSConfig(metadata.cert, metadata.key, metadata.ca)
if err != nil {
return nil, nil, fmt.Errorf("error parse X509KeyPair: %s", err)
return nil, nil, err
}

caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM([]byte(metadata.ca))

tlsConfig := &tls.Config{
Certificates: []tls.Certificate{cert},
RootCAs: caCertPool,
}

config.Net.TLS.Enable = true
config.Net.TLS.Config = tlsConfig
}

if metadata.authMode == kafkaAuthModeForSaslScramSha256 {
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA256} }
config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA256)
if metadata.saslType == KafkaSASLTypePlaintext {
config.Net.SASL.Mechanism = sarama.SASLTypePlaintext
}

if metadata.authMode == kafkaAuthModeForSaslScramSha512 {
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA512} }
config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512)
if metadata.saslType == KafkaSASLTypeSCRAMSHA256 {
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA256} }
config.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA256
}

if metadata.authMode == kafkaAuthModeForSaslPlaintext {
config.Net.SASL.Mechanism = sarama.SASLTypePlaintext
config.Net.TLS.Enable = true
if metadata.saslType == KafkaSASLTypeSCRAMSHA512 {
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA512} }
config.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512
}

client, err := sarama.NewClient(metadata.bootstrapServers, config)
Expand All @@ -274,6 +255,37 @@ func getKafkaClients(metadata kafkaMetadata) (sarama.Client, sarama.ClusterAdmin
return client, admin, nil
}

// newTLSConfig returns a *tls.Config using the given ceClient cert, ceClient key,
// and CA certificate. If none are appropriate, a nil *tls.Config is returned.
func newTLSConfig(clientCert, clientKey, caCert string) (*tls.Config, error) {
valid := false

config := &tls.Config{}

if clientCert != "" && clientKey != "" {
cert, err := tls.X509KeyPair([]byte(clientCert), []byte(clientKey))
if err != nil {
return nil, fmt.Errorf("error parse X509KeyPair: %s", err)
}
config.Certificates = []tls.Certificate{cert}
valid = true
}

if caCert != "" {
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM([]byte(caCert))
config.RootCAs = caCertPool
config.InsecureSkipVerify = true
valid = true
}

if !valid {
config = nil
}

return config, nil
}

func (s *kafkaScaler) getPartitions() ([]int32, error) {
topicsMetadata, err := s.admin.DescribeTopics([]string{s.metadata.topic})
if err != nil {
Expand Down
66 changes: 64 additions & 2 deletions pkg/scalers/kafka_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,27 @@ type parseKafkaMetadataTestData struct {
offsetResetPolicy offsetResetPolicy
}

type parseKafkaAuthParamsTestData struct {
authParams map[string]string
isError bool
enableTLS bool
}

type kafkaMetricIdentifier struct {
metadataTestData *parseKafkaMetadataTestData
name string
}

// A complete valid metadata example for reference
var validMetadata = map[string]string{
var validKafkaMetadata = map[string]string{
"bootstrapServers": "broker1:9092,broker2:9092",
"consumerGroup": "my-group",
"topic": "my-topic",
}

// A complete valid authParams example for sasl, with username and passwd
var validWithAuthParams = map[string]string{
"authMode": "sasl_plaintext",
"sasl": "plaintext",
"username": "admin",
"password": "admin",
}
Expand All @@ -56,6 +62,47 @@ var parseKafkaMetadataTestDataset = []parseKafkaMetadataTestData{
{map[string]string{"bootstrapServers": "foo:9092,bar:9092", "consumerGroup": "my-group", "topic": "my-topic", "offsetResetPolicy": "earliest"}, false, 2, []string{"foo:9092", "bar:9092"}, "my-group", "my-topic", offsetResetPolicy("earliest")},
}

var parseKafkaAuthParamsTestDataset = []parseKafkaAuthParamsTestData{
// success, SASL only
{map[string]string{"sasl": "plaintext", "username": "admin", "password": "admin"}, false, false},
// success, SASL only
{map[string]string{"sasl": "scram_sha256", "username": "admin", "password": "admin"}, false, false},
// success, SASL only
{map[string]string{"sasl": "scram_sha512", "username": "admin", "password": "admin"}, false, false},
// success, TLS only
{map[string]string{"tls": "enable", "ca": "caaa", "cert": "ceert", "key": "keey"}, false, true},
// success, SASL + TLS
{map[string]string{"sasl": "plaintext", "username": "admin", "password": "admin", "tls": "enable", "ca": "caaa", "cert": "ceert", "key": "keey"}, false, true},
// failure, SASL incorrect type
{map[string]string{"sasl": "foo", "username": "admin", "password": "admin"}, true, false},
// failure, SASL missing username
{map[string]string{"sasl": "plaintext", "password": "admin"}, true, false},
// failure, SASL missing password
{map[string]string{"sasl": "plaintext", "username": "admin"}, true, false},
// failure, TLS incorrect
{map[string]string{"tls": "yes", "cert": "ceert", "key": "keey"}, true, false},
// failure, TLS missing ca
{map[string]string{"tls": "yes", "ca": "caaa", "key": "keey"}, true, false},
// failure, TLS missing cert
{map[string]string{"tls": "yes", "ca": "caaa", "cert": "ceert", "key": "keey"}, true, false},
// failure, TLS missing key
{map[string]string{"tls": "yes", "ca": "caaa", "cert": "ceert"}, true, false},
// failure, SASL + TLS, incorrect sasl
{map[string]string{"sasl": "foo", "username": "admin", "password": "admin", "tls": "enable", "ca": "caaa", "cert": "ceert", "key": "keey"}, true, false},
// failure, SASL + TLS, incorrect tls
{map[string]string{"sasl": "plaintext", "username": "admin", "password": "admin", "tls": "foo", "ca": "caaa", "cert": "ceert", "key": "keey"}, true, false},
// failure, SASL + TLS, missing username
{map[string]string{"sasl": "plaintext", "password": "admin", "tls": "enable", "ca": "caaa", "cert": "ceert", "key": "keey"}, true, false},
// failure, SASL + TLS, missing password
{map[string]string{"sasl": "plaintext", "username": "admin", "tls": "enable", "ca": "caaa", "cert": "ceert", "key": "keey"}, true, false},
// failure, SASL + TLS, missing ca
{map[string]string{"sasl": "plaintext", "username": "admin", "password": "admin", "tls": "enable", "cert": "ceert", "key": "keey"}, true, false},
// failure, SASL + TLS, missing cert
{map[string]string{"sasl": "plaintext", "username": "admin", "password": "admin", "tls": "enable", "ca": "caaa", "key": "keey"}, true, false},
// failure, SASL + TLS, missing key
{map[string]string{"sasl": "plaintext", "username": "admin", "password": "admin", "tls": "enable", "ca": "caaa", "cert": "ceert"}, true, false},
}

var kafkaMetricIdentifiers = []kafkaMetricIdentifier{
{&parseKafkaMetadataTestDataset[4], "kafka-my-topic-my-group"},
}
Expand Down Expand Up @@ -112,6 +159,21 @@ func TestGetBrokers(t *testing.T) {
}
}

func TestKafkaAuthParams(t *testing.T) {
for _, testData := range parseKafkaAuthParamsTestDataset {
meta, err := parseKafkaMetadata(nil, validKafkaMetadata, testData.authParams)

if err != nil && !testData.isError {
t.Error("Expected success but got error", err)
}
if testData.isError && err == nil {
t.Error("Expected error but got success")
}
if meta.enableTLS != testData.enableTLS {
t.Errorf("Expected enableTLS to be set to %v but got %v\n", testData.enableTLS, meta.enableTLS)
}
}
}
func TestKafkaGetMetricSpecForScaling(t *testing.T) {
for _, testData := range kafkaMetricIdentifiers {
meta, err := parseKafkaMetadata(nil, testData.metadataTestData.metadata, validWithAuthParams)
Expand Down