Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add kafka scaler sasl #486

Merged
merged 5 commits into from
Nov 28, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ require (
github.com/spf13/pflag v1.0.5
github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271
github.com/stretchr/testify v1.4.0
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c
google.golang.org/api v0.10.0
google.golang.org/genproto v0.0.0-20191002211648-c459b9ce5143
google.golang.org/grpc v1.24.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -599,7 +599,9 @@ github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8 h1:3SVOIvH7Ae1KRYy
github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=
github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs=
github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0=
github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
github.com/xiang90/probing v0.0.0-20160813154853-07dd2e8dfe18/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 h1:eY9dn8+vbi4tKz5Qo6v2eYzo7kUS51QINcR5jNpbZS8=
Expand Down
53 changes: 53 additions & 0 deletions pkg/scalers/kafka_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,23 @@ type kafkaMetadata struct {
group string
topic string
lagThreshold int64

// auth
authMode kafkaAuthMode
username string
passwd string
}

type kafkaAuthMode string

const (
kafkaAuthModeForNone kafkaAuthMode = "none"
kafkaAuthModeForSaslPlaintext kafkaAuthMode = "sasl_plaintext"
kafkaAuthModeForSaslScramSha256 kafkaAuthMode = "sasl_scram_sha256"
kafkaAuthModeForSaslScramSha512 kafkaAuthMode = "sasl_scram_sha512"
kafkaAuthModeForSaslSSL kafkaAuthMode = "sasl_ssl" // sarama package not support sasl_ssl
)

const (
lagThresholdMetricName = "lagThreshold"
kafkaMetricType = "External"
Expand Down Expand Up @@ -86,6 +101,28 @@ func parseKafkaMetadata(metadata map[string]string) (kafkaMetadata, error) {
meta.lagThreshold = t
}

meta.authMode = kafkaAuthModeForNone
if val, ok := metadata["authMode"]; ok {
mode := kafkaAuthMode(val)
if mode != kafkaAuthModeForNone && mode != kafkaAuthModeForSaslPlaintext && mode != kafkaAuthModeForSaslScramSha256 && mode != kafkaAuthModeForSaslScramSha512 {
return meta, fmt.Errorf("err auth mode %s given", mode)
}

meta.authMode = mode
}

if meta.authMode != kafkaAuthModeForNone {
if metadata["username"] == "" {
return meta, errors.New("no username given")
}
meta.username = metadata["username"]

if metadata["passwd"] == "" {
return meta, errors.New("no passwd given")
}
meta.passwd = metadata["passwd"]
jeffhollan marked this conversation as resolved.
Show resolved Hide resolved
}

return meta, nil
}

Expand Down Expand Up @@ -118,6 +155,22 @@ func getKafkaClients(metadata kafkaMetadata) (sarama.Client, sarama.ClusterAdmin
config := sarama.NewConfig()
config.Version = sarama.V1_0_0_0

if ok := metadata.authMode == kafkaAuthModeForSaslPlaintext || metadata.authMode == kafkaAuthModeForSaslScramSha256 || metadata.authMode == kafkaAuthModeForSaslScramSha512; ok {
config.Net.SASL.Enable = true
config.Net.SASL.User = metadata.username
config.Net.SASL.Password = metadata.passwd
}

if metadata.authMode == kafkaAuthModeForSaslScramSha256 {
jeffhollan marked this conversation as resolved.
Show resolved Hide resolved
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA256} }
config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA256)
}

if metadata.authMode == kafkaAuthModeForSaslScramSha512 {
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA512} }
config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512)
}

client, err := sarama.NewClient(metadata.brokers, config)
if err != nil {
return nil, nil, fmt.Errorf("error creating kafka client: %s", err)
Expand Down
21 changes: 20 additions & 1 deletion pkg/scalers/kafka_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,31 @@ var validMetadata = map[string]string{
"topic": "my-topic",
}

// A complete valid metadata example for sasl, with username and passwd
var validMetadataWithSasl = map[string]string{
"brokerList": "broker1:9092,broker2:9092",
"consumerGroup": "my-group",
"topic": "my-topic",
"authMode": "sasl_plaintext",
"username": "admin",
"passwd": "admin",
}

// A complete valid metadata example for sasl, without username and passwd
var validMetadataWithoutSasl = map[string]string{
"brokerList": "broker1:9092,broker2:9092",
"consumerGroup": "my-group",
"topic": "my-topic",
"authMode": "sasl_plaintext",
}

var parseKafkaMetadataTestDataset = []parseKafkaMetadataTestData{
{map[string]string{}, true, 0, nil, "", ""},
{map[string]string{"brokerList": "foobar:9092"}, true, 1, []string{"foobar:9092"}, "", ""},
{map[string]string{"brokerList": "foo:9092,bar:9092"}, true, 2, []string{"foo:9092", "bar:9092"}, "", ""},
{map[string]string{"brokerList": "a", "consumerGroup": "my-group"}, true, 1, []string{"a"}, "my-group", ""},
jeffhollan marked this conversation as resolved.
Show resolved Hide resolved
{validMetadata, false, 2, []string{"broker1:9092", "broker2:9092"}, "my-group", "my-topic"},
{validMetadataWithSasl, false, 2, []string{"broker1:9092", "broker2:9092"}, "my-group", "my-topic"},
{validMetadataWithoutSasl, true, 2, []string{"broker1:9092", "broker2:9092"}, "my-group", "my-topic"},
}

func TestGetBrokers(t *testing.T) {
Expand Down
36 changes: 36 additions & 0 deletions pkg/scalers/kafka_scram_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package scalers

import (
"crypto/sha256"
"crypto/sha512"
"hash"

"github.com/xdg/scram"
)

var SHA256 scram.HashGeneratorFcn = func() hash.Hash { return sha256.New() }
var SHA512 scram.HashGeneratorFcn = func() hash.Hash { return sha512.New() }

type XDGSCRAMClient struct {
*scram.Client
*scram.ClientConversation
scram.HashGeneratorFcn
}

func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error) {
x.Client, err = x.HashGeneratorFcn.NewClient(userName, password, authzID)
if err != nil {
return err
}
x.ClientConversation = x.Client.NewConversation()
return nil
}

func (x *XDGSCRAMClient) Step(challenge string) (response string, err error) {
response, err = x.ClientConversation.Step(challenge)
return
}

func (x *XDGSCRAMClient) Done() bool {
return x.ClientConversation.Done()
}