Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka Scaler: added support for PKCS #8 private key decryption #3413

Merged
merged 3 commits into from
Aug 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ To learn more about our roadmap, we recommend reading [this document](ROADMAP.md
- **General:** Use more readable timestamps in KEDA Operator logs ([#3066](https://github.com/kedacore/keda/issue/3066))
- **AWS SQS Queue Scaler:** Support for scaling to include in-flight messages. ([#3133](https://github.com/kedacore/keda/issues/3133))
- **GCP Stackdriver Scaler:** Added aggregation parameters ([#3008](https://github.com/kedacore/keda/issues/3008))
- **Kafka Scaler:** Support of passphrase encrypted PKCS #\8 private key ([3449](https://github.com/kedacore/keda/issues/3449))
- **Prometheus Scaler:** Add ignoreNullValues to return error when prometheus return null in values ([#3065](https://github.com/kedacore/keda/issues/3065))
- **Selenium Grid Scaler:** Edge active sessions not being properly counted ([#2709](https://github.com/kedacore/keda/issues/2709))
- **Selenium Grid Scaler:** Max Sessions implementation issue ([#3061](https://github.com/kedacore/keda/issues/3061))
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ require (
github.com/xdg-go/scram v1.1.0 // indirect
github.com/xdg-go/stringprep v1.0.2 // indirect
github.com/xdg/stringprep v1.0.3 // indirect
github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect
github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d
go.etcd.io/etcd/api/v3 v3.5.0 // indirect
go.etcd.io/etcd/client/pkg/v3 v3.5.0 // indirect
go.etcd.io/etcd/client/v3 v3.5.0 // indirect
Expand Down
16 changes: 11 additions & 5 deletions pkg/scalers/kafka_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,11 @@ type kafkaMetadata struct {
password string

// TLS
enableTLS bool
cert string
key string
ca string
enableTLS bool
cert string
key string
keyPassword string
ca string

scalerIndex int
}
Expand Down Expand Up @@ -141,6 +142,11 @@ func parseKafkaAuthParams(config *ScalerConfig, meta *kafkaMetadata) error {
meta.ca = config.AuthParams["ca"]
meta.cert = config.AuthParams["cert"]
meta.key = config.AuthParams["key"]
if value, found := config.AuthParams["keyPassword"]; found {
meta.keyPassword = value
} else {
meta.keyPassword = ""
}
meta.enableTLS = true
} else if val != "disable" {
return fmt.Errorf("err incorrect value for TLS given: %s", val)
Expand Down Expand Up @@ -280,7 +286,7 @@ func getKafkaClients(metadata kafkaMetadata) (sarama.Client, sarama.ClusterAdmin

if metadata.enableTLS {
config.Net.TLS.Enable = true
tlsConfig, err := kedautil.NewTLSConfig(metadata.cert, metadata.key, metadata.ca)
tlsConfig, err := kedautil.NewTLSConfigWithPassword(metadata.cert, metadata.key, metadata.keyPassword, metadata.ca)
if err != nil {
return nil, nil, err
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/scalers/kafka_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ var parseKafkaAuthParamsTestDataset = []parseKafkaAuthParamsTestData{
{map[string]string{"tls": "enable", "ca": "caaa", "cert": "ceert", "key": "keey"}, false, true},
// success, TLS cert/key and assumed public CA
{map[string]string{"tls": "enable", "cert": "ceert", "key": "keey"}, false, true},
// success, TLS cert/key + key password and assumed public CA
{map[string]string{"tls": "enable", "cert": "ceert", "key": "keey", "keyPassword": "keeyPassword"}, false, true},
// success, TLS CA only
{map[string]string{"tls": "enable", "ca": "caaa"}, false, true},
// success, SASL + TLS
Expand Down Expand Up @@ -205,6 +207,9 @@ func TestKafkaAuthParams(t *testing.T) {
if meta.key != testData.authParams["key"] {
t.Errorf("Expected key to be set to %v but got %v\n", testData.authParams["key"], meta.key)
}
if meta.keyPassword != testData.authParams["keyPassword"] {
t.Errorf("Expected key to be set to %v but got %v\n", testData.authParams["keyPassword"], meta.key)
}
}
}
}
Expand Down
50 changes: 46 additions & 4 deletions pkg/util/tls_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,39 @@ package util
import (
"crypto/tls"
"crypto/x509"
"encoding/pem"
"fmt"

"github.com/youmark/pkcs8"
)

// NewTLSConfig returns a *tls.Config using the given ceClient cert, ceClient key,
// and CA certificate. If none are appropriate, a nil *tls.Config is returned.
func NewTLSConfig(clientCert, clientKey, caCert string) (*tls.Config, error) {
func decryptClientKey(clientKey, clientKeyPassword string) ([]byte, error) {
block, _ := pem.Decode([]byte(clientKey))

key, err := pkcs8.ParsePKCS8PrivateKey(block.Bytes, []byte(clientKeyPassword))
if err != nil {
return nil, err
}

pemData, err := x509.MarshalPKCS8PrivateKey(key)
if err != nil {
return nil, err
}

var pemPrivateBlock = &pem.Block{
Type: "RSA PRIVATE KEY",
Bytes: pemData,
}

encodedData := pem.EncodeToMemory(pemPrivateBlock)

return encodedData, nil
}

// NewTLSConfigWithPassword returns a *tls.Config using the given ceClient cert, ceClient key,
// and CA certificate. If clientKeyPassword is not empty the provided password will be used to
// decrypt the given key. If none are appropriate, a nil *tls.Config is returned.
func NewTLSConfigWithPassword(clientCert, clientKey, clientKeyPassword, caCert string) (*tls.Config, error) {
// skipVerify := true is a hack to avoid the CodeQL error related with allowing insecure certificates in production environments.
// Skipping this validation is necessary and intended in our use case in order to be able to trust in the CA.
skipVerify := true
Expand All @@ -33,7 +60,16 @@ func NewTLSConfig(clientCert, clientKey, caCert string) (*tls.Config, error) {
config := &tls.Config{}

if clientCert != "" && clientKey != "" {
cert, err := tls.X509KeyPair([]byte(clientCert), []byte(clientKey))
key := []byte(clientKey)
if clientKeyPassword != "" {
var err error
key, err = decryptClientKey(clientKey, clientKeyPassword)
if err != nil {
return nil, fmt.Errorf("error decrypt X509Key: %s", err)
}
}

cert, err := tls.X509KeyPair([]byte(clientCert), key)
if err != nil {
return nil, fmt.Errorf("error parse X509KeyPair: %s", err)
}
Expand All @@ -55,3 +91,9 @@ func NewTLSConfig(clientCert, clientKey, caCert string) (*tls.Config, error) {

return config, nil
}

// NewTLSConfig returns a *tls.Config using the given ceClient cert, ceClient key,
// and CA certificate. If none are appropriate, a nil *tls.Config is returned.
func NewTLSConfig(clientCert, clientKey, caCert string) (*tls.Config, error) {
return NewTLSConfigWithPassword(clientCert, clientKey, "", caCert)
}