Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(event-hubs-scaler): Add pod/workload identity support to checkpoint in Azure Blob Storage #3573

Merged
merged 7 commits into from
Oct 31, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio
- **Azure AD Pod Identity Authentication:** Improve error messages to emphasize problems around the integration with aad-pod-identity itself ([#3610](https://github.com/kedacore/keda/issues/3610))
- **Azure Pipelines Scaler:** Improved speed of profiling large set of Job Requests from Azure Pipelines ([#3702](https://github.com/kedacore/keda/issues/3702))
- **Prometheus Scaler:** Introduce skipping of certificate check for unsigned certs ([#2310](https://github.com/kedacore/keda/issues/2310))
- **Event Hubs Scaler:** Support Azure Active Direcotry Pod & Workload Identity for Storage Blobs ([#3569](https://github.com/kedacore/keda/issues/3569))
tomkerkhove marked this conversation as resolved.
Show resolved Hide resolved

### Fixes

Expand Down
3 changes: 3 additions & 0 deletions pkg/scalers/azure/azure_cloud_environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ const (

// Default Endpoint key in trigger metadata
DefaultEndpointSuffixKey string = "endpointSuffix"

// Default Storage Endpoint key in trigger metadata
DefaultStorageSuffixKey string = "storageEndpointSuffix"
)

// EnvironmentPropertyProvider for different types of Azure scalers
Expand Down
2 changes: 2 additions & 0 deletions pkg/scalers/azure/azure_cloud_environment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ var parseEnvironmentPropertyTestDataset = []parseEnvironmentPropertyTestData{
{map[string]string{"cloud": "Private", "endpointSuffix": "suffix.private.cloud"}, "suffix.private.cloud", DefaultEndpointSuffixKey, testPropertyProvider, false},
{map[string]string{"endpointSuffix": "ignored"}, "AzurePublicCloud.suffix", DefaultEndpointSuffixKey, testPropertyProvider, false},
{map[string]string{"cloud": "Private", "endpointSuffixDiff": "suffix.private.cloud"}, "suffix.private.cloud", "endpointSuffixDiff", testPropertyProvider, false},
{map[string]string{"cloud": "Private", "storageEndpointSuffix": "suffix.private.cloud"}, "suffix.private.cloud", DefaultStorageSuffixKey, testPropertyProvider, false},
{map[string]string{"cloud": "Private"}, "suffix.private.cloud", DefaultStorageSuffixKey, testPropertyProvider, true},
}

func TestParseEnvironmentProperty(t *testing.T) {
Expand Down
2 changes: 2 additions & 0 deletions pkg/scalers/azure/azure_eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ type EventHubInfo struct {
EventHubConnection string
EventHubConsumerGroup string
StorageConnection string
StorageAccountName string
BlobStorageEndpoint string
BlobContainer string
Namespace string
EventHubName string
Expand Down
16 changes: 15 additions & 1 deletion pkg/scalers/azure/azure_eventhub_checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,8 +224,22 @@ func (checkpointer *defaultCheckpointer) extractCheckpoint(get *azblob.DownloadR
}

func getCheckpoint(ctx context.Context, httpClient util.HTTPDoer, info EventHubInfo, checkpointer checkpointer) (Checkpoint, error) {
var podIdentity = info.PodIdentity

// For back-compat, prefer a connection string over pod identity when present
if len(info.StorageConnection) != 0 {
podIdentity.Provider = kedav1alpha1.PodIdentityProviderNone
}
andyatwork marked this conversation as resolved.
Show resolved Hide resolved

if podIdentity.Provider == kedav1alpha1.PodIdentityProviderAzure || podIdentity.Provider == kedav1alpha1.PodIdentityProviderAzureWorkload {
tomkerkhove marked this conversation as resolved.
Show resolved Hide resolved
if len(info.StorageAccountName) == 0 {
return Checkpoint{}, fmt.Errorf("storageAccountName not supplied when PodIdentity authentication is enabled")
}
}

blobCreds, storageEndpoint, err := ParseAzureStorageBlobConnection(ctx, httpClient,
kedav1alpha1.AuthPodIdentity{Provider: kedav1alpha1.PodIdentityProviderNone}, info.StorageConnection, "", "")
podIdentity, info.StorageConnection, info.StorageAccountName, info.BlobStorageEndpoint)
tomkerkhove marked this conversation as resolved.
Show resolved Hide resolved

if err != nil {
return Checkpoint{}, err
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/scalers/azure/azure_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func ParseAzureStorageEndpointSuffix(metadata map[string]string, endpointType St
func ParseAzureStorageQueueConnection(ctx context.Context, httpClient util.HTTPDoer, podIdentity kedav1alpha1.AuthPodIdentity, connectionString, accountName, endpointSuffix string) (azqueue.Credential, *url.URL, error) {
switch podIdentity.Provider {
case kedav1alpha1.PodIdentityProviderAzure, kedav1alpha1.PodIdentityProviderAzureWorkload:
token, endpoint, err := parseAcessTokenAndEndpoint(ctx, httpClient, accountName, endpointSuffix, podIdentity)
token, endpoint, err := parseAccessTokenAndEndpoint(ctx, httpClient, accountName, endpointSuffix, podIdentity)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -111,7 +111,7 @@ func ParseAzureStorageQueueConnection(ctx context.Context, httpClient util.HTTPD
func ParseAzureStorageBlobConnection(ctx context.Context, httpClient util.HTTPDoer, podIdentity kedav1alpha1.AuthPodIdentity, connectionString, accountName, endpointSuffix string) (azblob.Credential, *url.URL, error) {
switch podIdentity.Provider {
case kedav1alpha1.PodIdentityProviderAzure, kedav1alpha1.PodIdentityProviderAzureWorkload:
token, endpoint, err := parseAcessTokenAndEndpoint(ctx, httpClient, accountName, endpointSuffix, podIdentity)
token, endpoint, err := parseAccessTokenAndEndpoint(ctx, httpClient, accountName, endpointSuffix, podIdentity)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -192,7 +192,7 @@ func parseAzureStorageConnectionString(connectionString string, endpointType Sto
return u, name, key, nil
}

func parseAcessTokenAndEndpoint(ctx context.Context, httpClient util.HTTPDoer, accountName string, endpointSuffix string,
func parseAccessTokenAndEndpoint(ctx context.Context, httpClient util.HTTPDoer, accountName string, endpointSuffix string,
podIdentity kedav1alpha1.AuthPodIdentity) (string, *url.URL, error) {
var token AADToken
var err error
Expand Down
80 changes: 62 additions & 18 deletions pkg/scalers/azure_eventhub_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@ func NewAzureEventHubScaler(ctx context.Context, config *ScalerConfig) (Scaler,
return nil, fmt.Errorf("error getting scaler metric type: %s", err)
}

parsedMetadata, err := parseAzureEventHubMetadata(config)
logger := InitializeLogger(config, "azure_eventhub_scaler")

parsedMetadata, err := parseAzureEventHubMetadata(logger, config)
if err != nil {
return nil, fmt.Errorf("unable to get eventhub metadata: %s", err)
}
Expand All @@ -85,21 +87,36 @@ func NewAzureEventHubScaler(ctx context.Context, config *ScalerConfig) (Scaler,
metadata: parsedMetadata,
client: hub,
httpClient: kedautil.CreateHTTPClient(config.GlobalHTTPTimeout, false),
logger: InitializeLogger(config, "azure_eventhub_scaler"),
logger: logger,
}, nil
}

// parseAzureEventHubMetadata parses metadata
func parseAzureEventHubMetadata(config *ScalerConfig) (*eventHubMetadata, error) {
func parseAzureEventHubMetadata(logger logr.Logger, config *ScalerConfig) (*eventHubMetadata, error) {
meta := eventHubMetadata{
eventHubInfo: azure.EventHubInfo{},
}

err := parseCommonAzureEventHubMetadata(config, &meta)
if err != nil {
return nil, err
}

err = parseAzureEventHubAuthenticationMetadata(logger, config, &meta)
if err != nil {
return nil, err
}

return &meta, nil
}

func parseCommonAzureEventHubMetadata(config *ScalerConfig, meta *eventHubMetadata) error {
meta.threshold = defaultEventHubMessageThreshold

if val, ok := config.TriggerMetadata[thresholdMetricName]; ok {
threshold, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return nil, fmt.Errorf("error parsing azure eventhub metadata %s: %s", thresholdMetricName, err)
return fmt.Errorf("error parsing azure eventhub metadata %s: %s", thresholdMetricName, err)
}

meta.threshold = threshold
Expand All @@ -109,7 +126,7 @@ func parseAzureEventHubMetadata(config *ScalerConfig) (*eventHubMetadata, error)
if val, ok := config.TriggerMetadata[activationThresholdMetricName]; ok {
activationThreshold, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return nil, fmt.Errorf("error parsing azure eventhub metadata %s: %s", activationThresholdMetricName, err)
return fmt.Errorf("error parsing azure eventhub metadata %s: %s", activationThresholdMetricName, err)
}

meta.activationThreshold = activationThreshold
Expand All @@ -121,10 +138,6 @@ func parseAzureEventHubMetadata(config *ScalerConfig) (*eventHubMetadata, error)
meta.eventHubInfo.StorageConnection = config.ResolvedEnv[config.TriggerMetadata["storageConnectionFromEnv"]]
}

if len(meta.eventHubInfo.StorageConnection) == 0 {
return nil, fmt.Errorf("no storage connection string given")
}

meta.eventHubInfo.EventHubConsumerGroup = defaultEventHubConsumerGroup
if val, ok := config.TriggerMetadata["consumerGroup"]; ok {
meta.eventHubInfo.EventHubConsumerGroup = val
Expand All @@ -146,7 +159,7 @@ func parseAzureEventHubMetadata(config *ScalerConfig) (*eventHubMetadata, error)
if resourceURL, ok := config.TriggerMetadata["eventHubResourceURL"]; ok {
meta.eventHubInfo.EventHubResourceURL = resourceURL
} else {
return nil, fmt.Errorf("eventHubResourceURL must be provided for %s cloud type", azure.PrivateCloud)
return fmt.Errorf("eventHubResourceURL must be provided for %s cloud type", azure.PrivateCloud)
}
}
}
Expand All @@ -156,37 +169,70 @@ func parseAzureEventHubMetadata(config *ScalerConfig) (*eventHubMetadata, error)
}
serviceBusEndpointSuffix, err := azure.ParseEnvironmentProperty(config.TriggerMetadata, azure.DefaultEndpointSuffixKey, serviceBusEndpointSuffixProvider)
if err != nil {
return nil, err
return err
}
meta.eventHubInfo.ServiceBusEndpointSuffix = serviceBusEndpointSuffix

activeDirectoryEndpoint, err := azure.ParseActiveDirectoryEndpoint(config.TriggerMetadata)
if err != nil {
return nil, err
return err
}
meta.eventHubInfo.ActiveDirectoryEndpoint = activeDirectoryEndpoint

meta.scalerIndex = config.ScalerIndex

return nil
}

func parseAzureEventHubAuthenticationMetadata(logger logr.Logger, config *ScalerConfig, meta *eventHubMetadata) error {
meta.eventHubInfo.PodIdentity = config.PodIdentity

switch config.PodIdentity.Provider {
case "", v1alpha1.PodIdentityProviderNone:
if len(meta.eventHubInfo.StorageConnection) == 0 {
return fmt.Errorf("no storage connection string given")
}

if config.AuthParams["connection"] != "" {
meta.eventHubInfo.EventHubConnection = config.AuthParams["connection"]
} else if config.TriggerMetadata["connectionFromEnv"] != "" {
meta.eventHubInfo.EventHubConnection = config.ResolvedEnv[config.TriggerMetadata["connectionFromEnv"]]
}

if len(meta.eventHubInfo.EventHubConnection) == 0 {
return nil, fmt.Errorf("no event hub connection string given")
return fmt.Errorf("no event hub connection string given")
}
case v1alpha1.PodIdentityProviderAzure, v1alpha1.PodIdentityProviderAzureWorkload:
meta.eventHubInfo.StorageAccountName = ""
if val, ok := config.TriggerMetadata["storageAccountName"]; ok {
meta.eventHubInfo.StorageAccountName = val
} else {
logger.Info("no 'storageAccountName' provided to enable identity based authentication to Blob Storage. Attempting to use connection string instead")
}

if len(meta.eventHubInfo.StorageAccountName) != 0 {
storageEndpointSuffixProvider := func(env az.Environment) (string, error) {
return env.StorageEndpointSuffix, nil
}
storageEndpointSuffix, err := azure.ParseEnvironmentProperty(config.TriggerMetadata, azure.DefaultStorageSuffixKey, storageEndpointSuffixProvider)
if err != nil {
return err
}
meta.eventHubInfo.BlobStorageEndpoint = "blob." + storageEndpointSuffix
}

if len(meta.eventHubInfo.StorageConnection) == 0 && len(meta.eventHubInfo.StorageAccountName) == 0 {
return fmt.Errorf("no storage connection string or storage account name for pod identity based authentication given")
}

andyatwork marked this conversation as resolved.
Show resolved Hide resolved
if config.TriggerMetadata["eventHubNamespace"] != "" {
meta.eventHubInfo.Namespace = config.TriggerMetadata["eventHubNamespace"]
} else if config.TriggerMetadata["eventHubNamespaceFromEnv"] != "" {
meta.eventHubInfo.Namespace = config.ResolvedEnv[config.TriggerMetadata["eventHubNamespaceFromEnv"]]
}

if len(meta.eventHubInfo.Namespace) == 0 {
return nil, fmt.Errorf("no event hub namespace string given")
return fmt.Errorf("no event hub namespace string given")
}

if config.TriggerMetadata["eventHubName"] != "" {
Expand All @@ -196,13 +242,11 @@ func parseAzureEventHubMetadata(config *ScalerConfig) (*eventHubMetadata, error)
}

if len(meta.eventHubInfo.EventHubName) == 0 {
return nil, fmt.Errorf("no event hub name string given")
return fmt.Errorf("no event hub name string given")
}
}

meta.scalerIndex = config.ScalerIndex

return &meta, nil
return nil
}

// GetUnprocessedEventCountInPartition gets number of unprocessed events in a given partition
Expand Down
22 changes: 18 additions & 4 deletions pkg/scalers/azure_eventhub_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

eventhub "github.com/Azure/azure-event-hubs-go/v3"
"github.com/Azure/azure-storage-blob-go/azblob"
"github.com/go-logr/logr"

kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
"github.com/kedacore/keda/v2/pkg/scalers/azure"
Expand All @@ -20,6 +21,7 @@ const (
eventHubConnectionSetting = "testEventHubConnectionSetting"
storageConnectionSetting = "testStorageConnectionSetting"
serviceBusEndpointSuffix = "serviceBusEndpointSuffix"
storageEndpointSuffix = "storageEndpointSuffix"
activeDirectoryEndpoint = "activeDirectoryEndpoint"
eventHubResourceURL = "eventHubResourceURL"
testEventHubNamespace = "kedatesteventhub"
Expand Down Expand Up @@ -87,6 +89,18 @@ var parseEventHubMetadataDatasetWithPodIdentity = []parseEventHubMetadataTestDat
// properly formed metadata with private cloud
{map[string]string{"storageConnectionFromEnv": storageConnectionSetting, "consumerGroup": eventHubConsumerGroup, "unprocessedEventThreshold": "15", "eventHubName": testEventHubName,
"eventHubNamespace": testEventHubNamespace, "cloud": "private", "endpointSuffix": serviceBusEndpointSuffix, "activeDirectoryEndpoint": activeDirectoryEndpoint, "eventHubResourceURL": eventHubResourceURL}, false},
// properly formed event hub metadata with Pod Identity and no storage connection string
{map[string]string{"storageAccountName": "blobstorage", "consumerGroup": eventHubConsumerGroup, "unprocessedEventThreshold": "15", "eventHubName": testEventHubName, "eventHubNamespace": testEventHubNamespace}, false},
// event hub metadata with Pod Identity, no storage connection string, no storageAccountName - should fail
{map[string]string{"consumerGroup": eventHubConsumerGroup, "unprocessedEventThreshold": "15", "eventHubName": testEventHubName, "eventHubNamespace": testEventHubNamespace}, true},
// event hub metadata with Pod Identity, no storage connection string, empty storageAccountName - should fail
{map[string]string{"storageAccount": "", "consumerGroup": eventHubConsumerGroup, "unprocessedEventThreshold": "15", "eventHubName": testEventHubName, "eventHubNamespace": testEventHubNamespace}, true},
// event hub metadata with Pod Identity, storage connection string, empty storageAccountName - should ignore pod identity for blob storage and succeed
{map[string]string{"storageConnectionFromEnv": storageConnectionSetting, "storageAccountName": "", "consumerGroup": eventHubConsumerGroup, "unprocessedEventThreshold": "15", "eventHubName": testEventHubName, "eventHubNamespace": testEventHubNamespace}, false},
// event hub metadata with Pod Identity and no storage connection string, private cloud and no storageEndpointSuffix - should fail
{map[string]string{"cloud": "private", "storageAccountName": "blobstorage", "consumerGroup": eventHubConsumerGroup, "unprocessedEventThreshold": "15", "eventHubName": testEventHubName, "eventHubNamespace": testEventHubNamespace}, true},
// properly formed event hub metadata with Pod Identity and no storage connection string, private cloud and storageEndpointSuffix
{map[string]string{"cloud": "private", "endpointSuffix": serviceBusEndpointSuffix, "activeDirectoryEndpoint": activeDirectoryEndpoint, "eventHubResourceURL": eventHubResourceURL, "storageAccountName": "aStorageAccount", "storageEndpointSuffix": storageEndpointSuffix, "consumerGroup": eventHubConsumerGroup, "unprocessedEventThreshold": "15", "eventHubName": testEventHubName, "eventHubNamespace": testEventHubNamespace}, false},
}

var eventHubMetricIdentifiers = []eventHubMetricIdentifier{
Expand All @@ -106,7 +120,7 @@ var testEventHubScaler = azureEventHubScaler{
func TestParseEventHubMetadata(t *testing.T) {
// Test first with valid resolved environment
for _, testData := range parseEventHubMetadataDataset {
_, err := parseAzureEventHubMetadata(&ScalerConfig{TriggerMetadata: testData.metadata, ResolvedEnv: sampleEventHubResolvedEnv, AuthParams: map[string]string{}})
_, err := parseAzureEventHubMetadata(logr.Discard(), &ScalerConfig{TriggerMetadata: testData.metadata, ResolvedEnv: sampleEventHubResolvedEnv, AuthParams: map[string]string{}})

if err != nil && !testData.isError {
t.Errorf("Expected success but got error: %s", err)
Expand All @@ -117,7 +131,7 @@ func TestParseEventHubMetadata(t *testing.T) {
}

for _, testData := range parseEventHubMetadataDatasetWithPodIdentity {
_, err := parseAzureEventHubMetadata(&ScalerConfig{TriggerMetadata: testData.metadata, ResolvedEnv: sampleEventHubResolvedEnv,
_, err := parseAzureEventHubMetadata(logr.Discard(), &ScalerConfig{TriggerMetadata: testData.metadata, ResolvedEnv: sampleEventHubResolvedEnv,
AuthParams: map[string]string{}, PodIdentity: kedav1alpha1.AuthPodIdentity{Provider: kedav1alpha1.PodIdentityProviderAzure}})

if err != nil && !testData.isError {
Expand All @@ -129,7 +143,7 @@ func TestParseEventHubMetadata(t *testing.T) {
}

for _, testData := range parseEventHubMetadataDatasetWithPodIdentity {
_, err := parseAzureEventHubMetadata(&ScalerConfig{TriggerMetadata: testData.metadata, ResolvedEnv: sampleEventHubResolvedEnv,
_, err := parseAzureEventHubMetadata(logr.Discard(), &ScalerConfig{TriggerMetadata: testData.metadata, ResolvedEnv: sampleEventHubResolvedEnv,
AuthParams: map[string]string{}, PodIdentity: kedav1alpha1.AuthPodIdentity{Provider: kedav1alpha1.PodIdentityProviderAzureWorkload}})

if err != nil && !testData.isError {
Expand Down Expand Up @@ -476,7 +490,7 @@ func DeleteContainerInStorage(ctx context.Context, endpoint *url.URL, credential

func TestEventHubGetMetricSpecForScaling(t *testing.T) {
for _, testData := range eventHubMetricIdentifiers {
meta, err := parseAzureEventHubMetadata(&ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, ResolvedEnv: sampleEventHubResolvedEnv, AuthParams: map[string]string{}, ScalerIndex: testData.scalerIndex})
meta, err := parseAzureEventHubMetadata(logr.Discard(), &ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, ResolvedEnv: sampleEventHubResolvedEnv, AuthParams: map[string]string{}, ScalerIndex: testData.scalerIndex})
if err != nil {
t.Fatal("Could not parse metadata:", err)
}
Expand Down