Skip to content

Commit

Permalink
Kafka Scaler: added support for PKCS #8 private key decryption (#3413)
Browse files Browse the repository at this point in the history
Signed-off-by: Tobias Krause <[email protected]>
  • Loading branch information
tobiaskrause authored Aug 2, 2022
1 parent 343ae88 commit 81ae8e9
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 10 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ To learn more about our roadmap, we recommend reading [this document](ROADMAP.md
- **General:** Use more readable timestamps in KEDA Operator logs ([#3066](https://github.com/kedacore/keda/issue/3066))
- **AWS SQS Queue Scaler:** Support for scaling to include in-flight messages. ([#3133](https://github.com/kedacore/keda/issues/3133))
- **GCP Stackdriver Scaler:** Added aggregation parameters ([#3008](https://github.com/kedacore/keda/issues/3008))
- **Kafka Scaler:** Support of passphrase encrypted PKCS #\8 private key ([3449](https://github.com/kedacore/keda/issues/3449))
- **Prometheus Scaler:** Add ignoreNullValues to return error when prometheus return null in values ([#3065](https://github.com/kedacore/keda/issues/3065))
- **Selenium Grid Scaler:** Edge active sessions not being properly counted ([#2709](https://github.com/kedacore/keda/issues/2709))
- **Selenium Grid Scaler:** Max Sessions implementation issue ([#3061](https://github.com/kedacore/keda/issues/3061))
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ require (
github.com/xdg-go/scram v1.1.0 // indirect
github.com/xdg-go/stringprep v1.0.2 // indirect
github.com/xdg/stringprep v1.0.3 // indirect
github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect
github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d
go.etcd.io/etcd/api/v3 v3.5.0 // indirect
go.etcd.io/etcd/client/pkg/v3 v3.5.0 // indirect
go.etcd.io/etcd/client/v3 v3.5.0 // indirect
Expand Down
16 changes: 11 additions & 5 deletions pkg/scalers/kafka_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,11 @@ type kafkaMetadata struct {
password string

// TLS
enableTLS bool
cert string
key string
ca string
enableTLS bool
cert string
key string
keyPassword string
ca string

scalerIndex int
}
Expand Down Expand Up @@ -141,6 +142,11 @@ func parseKafkaAuthParams(config *ScalerConfig, meta *kafkaMetadata) error {
meta.ca = config.AuthParams["ca"]
meta.cert = config.AuthParams["cert"]
meta.key = config.AuthParams["key"]
if value, found := config.AuthParams["keyPassword"]; found {
meta.keyPassword = value
} else {
meta.keyPassword = ""
}
meta.enableTLS = true
} else if val != "disable" {
return fmt.Errorf("err incorrect value for TLS given: %s", val)
Expand Down Expand Up @@ -280,7 +286,7 @@ func getKafkaClients(metadata kafkaMetadata) (sarama.Client, sarama.ClusterAdmin

if metadata.enableTLS {
config.Net.TLS.Enable = true
tlsConfig, err := kedautil.NewTLSConfig(metadata.cert, metadata.key, metadata.ca)
tlsConfig, err := kedautil.NewTLSConfigWithPassword(metadata.cert, metadata.key, metadata.keyPassword, metadata.ca)
if err != nil {
return nil, nil, err
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/scalers/kafka_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ var parseKafkaAuthParamsTestDataset = []parseKafkaAuthParamsTestData{
{map[string]string{"tls": "enable", "ca": "caaa", "cert": "ceert", "key": "keey"}, false, true},
// success, TLS cert/key and assumed public CA
{map[string]string{"tls": "enable", "cert": "ceert", "key": "keey"}, false, true},
// success, TLS cert/key + key password and assumed public CA
{map[string]string{"tls": "enable", "cert": "ceert", "key": "keey", "keyPassword": "keeyPassword"}, false, true},
// success, TLS CA only
{map[string]string{"tls": "enable", "ca": "caaa"}, false, true},
// success, SASL + TLS
Expand Down Expand Up @@ -205,6 +207,9 @@ func TestKafkaAuthParams(t *testing.T) {
if meta.key != testData.authParams["key"] {
t.Errorf("Expected key to be set to %v but got %v\n", testData.authParams["key"], meta.key)
}
if meta.keyPassword != testData.authParams["keyPassword"] {
t.Errorf("Expected key to be set to %v but got %v\n", testData.authParams["keyPassword"], meta.key)
}
}
}
}
Expand Down
50 changes: 46 additions & 4 deletions pkg/util/tls_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,39 @@ package util
import (
"crypto/tls"
"crypto/x509"
"encoding/pem"
"fmt"

"github.com/youmark/pkcs8"
)

// NewTLSConfig returns a *tls.Config using the given ceClient cert, ceClient key,
// and CA certificate. If none are appropriate, a nil *tls.Config is returned.
func NewTLSConfig(clientCert, clientKey, caCert string) (*tls.Config, error) {
func decryptClientKey(clientKey, clientKeyPassword string) ([]byte, error) {
block, _ := pem.Decode([]byte(clientKey))

key, err := pkcs8.ParsePKCS8PrivateKey(block.Bytes, []byte(clientKeyPassword))
if err != nil {
return nil, err
}

pemData, err := x509.MarshalPKCS8PrivateKey(key)
if err != nil {
return nil, err
}

var pemPrivateBlock = &pem.Block{
Type: "RSA PRIVATE KEY",
Bytes: pemData,
}

encodedData := pem.EncodeToMemory(pemPrivateBlock)

return encodedData, nil
}

// NewTLSConfigWithPassword returns a *tls.Config using the given ceClient cert, ceClient key,
// and CA certificate. If clientKeyPassword is not empty the provided password will be used to
// decrypt the given key. If none are appropriate, a nil *tls.Config is returned.
func NewTLSConfigWithPassword(clientCert, clientKey, clientKeyPassword, caCert string) (*tls.Config, error) {
// skipVerify := true is a hack to avoid the CodeQL error related with allowing insecure certificates in production environments.
// Skipping this validation is necessary and intended in our use case in order to be able to trust in the CA.
skipVerify := true
Expand All @@ -33,7 +60,16 @@ func NewTLSConfig(clientCert, clientKey, caCert string) (*tls.Config, error) {
config := &tls.Config{}

if clientCert != "" && clientKey != "" {
cert, err := tls.X509KeyPair([]byte(clientCert), []byte(clientKey))
key := []byte(clientKey)
if clientKeyPassword != "" {
var err error
key, err = decryptClientKey(clientKey, clientKeyPassword)
if err != nil {
return nil, fmt.Errorf("error decrypt X509Key: %s", err)
}
}

cert, err := tls.X509KeyPair([]byte(clientCert), key)
if err != nil {
return nil, fmt.Errorf("error parse X509KeyPair: %s", err)
}
Expand All @@ -55,3 +91,9 @@ func NewTLSConfig(clientCert, clientKey, caCert string) (*tls.Config, error) {

return config, nil
}

// NewTLSConfig returns a *tls.Config using the given ceClient cert, ceClient key,
// and CA certificate. If none are appropriate, a nil *tls.Config is returned.
func NewTLSConfig(clientCert, clientKey, caCert string) (*tls.Config, error) {
return NewTLSConfigWithPassword(clientCert, clientKey, "", caCert)
}

0 comments on commit 81ae8e9

Please sign in to comment.