diff --git a/CHANGELOG.md b/CHANGELOG.md index ae601e15830..95d9588bc34 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -55,6 +55,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio - **Azure AD Pod Identity Authentication:** Improve error messages to emphasize problems around the integration with aad-pod-identity itself ([#3610](https://github.com/kedacore/keda/issues/3610)) - **Azure Pipelines Scaler:** Improved speed of profiling large set of Job Requests from Azure Pipelines ([#3702](https://github.com/kedacore/keda/issues/3702)) - **Prometheus Scaler:** Introduce skipping of certificate check for unsigned certs ([#2310](https://github.com/kedacore/keda/issues/2310)) +- **Event Hubs Scaler:** Support Azure Active Direcotry Pod & Workload Identity for Storage Blobs ([#3569](https://github.com/kedacore/keda/issues/3569)) ### Fixes diff --git a/pkg/scalers/azure/azure_cloud_environment.go b/pkg/scalers/azure/azure_cloud_environment.go index 9aebd1b39a0..8f34c9ea20c 100644 --- a/pkg/scalers/azure/azure_cloud_environment.go +++ b/pkg/scalers/azure/azure_cloud_environment.go @@ -15,6 +15,9 @@ const ( // Default Endpoint key in trigger metadata DefaultEndpointSuffixKey string = "endpointSuffix" + + // Default Storage Endpoint key in trigger metadata + DefaultStorageSuffixKey string = "storageEndpointSuffix" ) // EnvironmentPropertyProvider for different types of Azure scalers diff --git a/pkg/scalers/azure/azure_cloud_environment_test.go b/pkg/scalers/azure/azure_cloud_environment_test.go index e72e2ed343a..1b9f95bdb94 100644 --- a/pkg/scalers/azure/azure_cloud_environment_test.go +++ b/pkg/scalers/azure/azure_cloud_environment_test.go @@ -31,6 +31,8 @@ var parseEnvironmentPropertyTestDataset = []parseEnvironmentPropertyTestData{ {map[string]string{"cloud": "Private", "endpointSuffix": "suffix.private.cloud"}, "suffix.private.cloud", DefaultEndpointSuffixKey, testPropertyProvider, false}, {map[string]string{"endpointSuffix": "ignored"}, "AzurePublicCloud.suffix", DefaultEndpointSuffixKey, testPropertyProvider, false}, {map[string]string{"cloud": "Private", "endpointSuffixDiff": "suffix.private.cloud"}, "suffix.private.cloud", "endpointSuffixDiff", testPropertyProvider, false}, + {map[string]string{"cloud": "Private", "storageEndpointSuffix": "suffix.private.cloud"}, "suffix.private.cloud", DefaultStorageSuffixKey, testPropertyProvider, false}, + {map[string]string{"cloud": "Private"}, "suffix.private.cloud", DefaultStorageSuffixKey, testPropertyProvider, true}, } func TestParseEnvironmentProperty(t *testing.T) { diff --git a/pkg/scalers/azure/azure_eventhub.go b/pkg/scalers/azure/azure_eventhub.go index 1517b0a2ca7..1b77159398c 100644 --- a/pkg/scalers/azure/azure_eventhub.go +++ b/pkg/scalers/azure/azure_eventhub.go @@ -18,6 +18,8 @@ type EventHubInfo struct { EventHubConnection string EventHubConsumerGroup string StorageConnection string + StorageAccountName string + BlobStorageEndpoint string BlobContainer string Namespace string EventHubName string diff --git a/pkg/scalers/azure/azure_eventhub_checkpoint.go b/pkg/scalers/azure/azure_eventhub_checkpoint.go index f9aa53f742f..14d7d8cfdd7 100644 --- a/pkg/scalers/azure/azure_eventhub_checkpoint.go +++ b/pkg/scalers/azure/azure_eventhub_checkpoint.go @@ -224,8 +224,22 @@ func (checkpointer *defaultCheckpointer) extractCheckpoint(get *azblob.DownloadR } func getCheckpoint(ctx context.Context, httpClient util.HTTPDoer, info EventHubInfo, checkpointer checkpointer) (Checkpoint, error) { + var podIdentity = info.PodIdentity + + // For back-compat, prefer a connection string over pod identity when present + if len(info.StorageConnection) != 0 { + podIdentity.Provider = kedav1alpha1.PodIdentityProviderNone + } + + if podIdentity.Provider == kedav1alpha1.PodIdentityProviderAzure || podIdentity.Provider == kedav1alpha1.PodIdentityProviderAzureWorkload { + if len(info.StorageAccountName) == 0 { + return Checkpoint{}, fmt.Errorf("storageAccountName not supplied when PodIdentity authentication is enabled") + } + } + blobCreds, storageEndpoint, err := ParseAzureStorageBlobConnection(ctx, httpClient, - kedav1alpha1.AuthPodIdentity{Provider: kedav1alpha1.PodIdentityProviderNone}, info.StorageConnection, "", "") + podIdentity, info.StorageConnection, info.StorageAccountName, info.BlobStorageEndpoint) + if err != nil { return Checkpoint{}, err } diff --git a/pkg/scalers/azure/azure_storage.go b/pkg/scalers/azure/azure_storage.go index 0ed84f71d6a..336e0495964 100644 --- a/pkg/scalers/azure/azure_storage.go +++ b/pkg/scalers/azure/azure_storage.go @@ -83,7 +83,7 @@ func ParseAzureStorageEndpointSuffix(metadata map[string]string, endpointType St func ParseAzureStorageQueueConnection(ctx context.Context, httpClient util.HTTPDoer, podIdentity kedav1alpha1.AuthPodIdentity, connectionString, accountName, endpointSuffix string) (azqueue.Credential, *url.URL, error) { switch podIdentity.Provider { case kedav1alpha1.PodIdentityProviderAzure, kedav1alpha1.PodIdentityProviderAzureWorkload: - token, endpoint, err := parseAcessTokenAndEndpoint(ctx, httpClient, accountName, endpointSuffix, podIdentity) + token, endpoint, err := parseAccessTokenAndEndpoint(ctx, httpClient, accountName, endpointSuffix, podIdentity) if err != nil { return nil, nil, err } @@ -111,7 +111,7 @@ func ParseAzureStorageQueueConnection(ctx context.Context, httpClient util.HTTPD func ParseAzureStorageBlobConnection(ctx context.Context, httpClient util.HTTPDoer, podIdentity kedav1alpha1.AuthPodIdentity, connectionString, accountName, endpointSuffix string) (azblob.Credential, *url.URL, error) { switch podIdentity.Provider { case kedav1alpha1.PodIdentityProviderAzure, kedav1alpha1.PodIdentityProviderAzureWorkload: - token, endpoint, err := parseAcessTokenAndEndpoint(ctx, httpClient, accountName, endpointSuffix, podIdentity) + token, endpoint, err := parseAccessTokenAndEndpoint(ctx, httpClient, accountName, endpointSuffix, podIdentity) if err != nil { return nil, nil, err } @@ -192,7 +192,7 @@ func parseAzureStorageConnectionString(connectionString string, endpointType Sto return u, name, key, nil } -func parseAcessTokenAndEndpoint(ctx context.Context, httpClient util.HTTPDoer, accountName string, endpointSuffix string, +func parseAccessTokenAndEndpoint(ctx context.Context, httpClient util.HTTPDoer, accountName string, endpointSuffix string, podIdentity kedav1alpha1.AuthPodIdentity) (string, *url.URL, error) { var token AADToken var err error diff --git a/pkg/scalers/azure_eventhub_scaler.go b/pkg/scalers/azure_eventhub_scaler.go index 173f31602de..281451c75b9 100644 --- a/pkg/scalers/azure_eventhub_scaler.go +++ b/pkg/scalers/azure_eventhub_scaler.go @@ -70,7 +70,9 @@ func NewAzureEventHubScaler(ctx context.Context, config *ScalerConfig) (Scaler, return nil, fmt.Errorf("error getting scaler metric type: %s", err) } - parsedMetadata, err := parseAzureEventHubMetadata(config) + logger := InitializeLogger(config, "azure_eventhub_scaler") + + parsedMetadata, err := parseAzureEventHubMetadata(logger, config) if err != nil { return nil, fmt.Errorf("unable to get eventhub metadata: %s", err) } @@ -85,21 +87,36 @@ func NewAzureEventHubScaler(ctx context.Context, config *ScalerConfig) (Scaler, metadata: parsedMetadata, client: hub, httpClient: kedautil.CreateHTTPClient(config.GlobalHTTPTimeout, false), - logger: InitializeLogger(config, "azure_eventhub_scaler"), + logger: logger, }, nil } // parseAzureEventHubMetadata parses metadata -func parseAzureEventHubMetadata(config *ScalerConfig) (*eventHubMetadata, error) { +func parseAzureEventHubMetadata(logger logr.Logger, config *ScalerConfig) (*eventHubMetadata, error) { meta := eventHubMetadata{ eventHubInfo: azure.EventHubInfo{}, } + + err := parseCommonAzureEventHubMetadata(config, &meta) + if err != nil { + return nil, err + } + + err = parseAzureEventHubAuthenticationMetadata(logger, config, &meta) + if err != nil { + return nil, err + } + + return &meta, nil +} + +func parseCommonAzureEventHubMetadata(config *ScalerConfig, meta *eventHubMetadata) error { meta.threshold = defaultEventHubMessageThreshold if val, ok := config.TriggerMetadata[thresholdMetricName]; ok { threshold, err := strconv.ParseInt(val, 10, 64) if err != nil { - return nil, fmt.Errorf("error parsing azure eventhub metadata %s: %s", thresholdMetricName, err) + return fmt.Errorf("error parsing azure eventhub metadata %s: %s", thresholdMetricName, err) } meta.threshold = threshold @@ -109,7 +126,7 @@ func parseAzureEventHubMetadata(config *ScalerConfig) (*eventHubMetadata, error) if val, ok := config.TriggerMetadata[activationThresholdMetricName]; ok { activationThreshold, err := strconv.ParseInt(val, 10, 64) if err != nil { - return nil, fmt.Errorf("error parsing azure eventhub metadata %s: %s", activationThresholdMetricName, err) + return fmt.Errorf("error parsing azure eventhub metadata %s: %s", activationThresholdMetricName, err) } meta.activationThreshold = activationThreshold @@ -121,10 +138,6 @@ func parseAzureEventHubMetadata(config *ScalerConfig) (*eventHubMetadata, error) meta.eventHubInfo.StorageConnection = config.ResolvedEnv[config.TriggerMetadata["storageConnectionFromEnv"]] } - if len(meta.eventHubInfo.StorageConnection) == 0 { - return nil, fmt.Errorf("no storage connection string given") - } - meta.eventHubInfo.EventHubConsumerGroup = defaultEventHubConsumerGroup if val, ok := config.TriggerMetadata["consumerGroup"]; ok { meta.eventHubInfo.EventHubConsumerGroup = val @@ -146,7 +159,7 @@ func parseAzureEventHubMetadata(config *ScalerConfig) (*eventHubMetadata, error) if resourceURL, ok := config.TriggerMetadata["eventHubResourceURL"]; ok { meta.eventHubInfo.EventHubResourceURL = resourceURL } else { - return nil, fmt.Errorf("eventHubResourceURL must be provided for %s cloud type", azure.PrivateCloud) + return fmt.Errorf("eventHubResourceURL must be provided for %s cloud type", azure.PrivateCloud) } } } @@ -156,19 +169,30 @@ func parseAzureEventHubMetadata(config *ScalerConfig) (*eventHubMetadata, error) } serviceBusEndpointSuffix, err := azure.ParseEnvironmentProperty(config.TriggerMetadata, azure.DefaultEndpointSuffixKey, serviceBusEndpointSuffixProvider) if err != nil { - return nil, err + return err } meta.eventHubInfo.ServiceBusEndpointSuffix = serviceBusEndpointSuffix activeDirectoryEndpoint, err := azure.ParseActiveDirectoryEndpoint(config.TriggerMetadata) if err != nil { - return nil, err + return err } meta.eventHubInfo.ActiveDirectoryEndpoint = activeDirectoryEndpoint + meta.scalerIndex = config.ScalerIndex + + return nil +} + +func parseAzureEventHubAuthenticationMetadata(logger logr.Logger, config *ScalerConfig, meta *eventHubMetadata) error { meta.eventHubInfo.PodIdentity = config.PodIdentity + switch config.PodIdentity.Provider { case "", v1alpha1.PodIdentityProviderNone: + if len(meta.eventHubInfo.StorageConnection) == 0 { + return fmt.Errorf("no storage connection string given") + } + if config.AuthParams["connection"] != "" { meta.eventHubInfo.EventHubConnection = config.AuthParams["connection"] } else if config.TriggerMetadata["connectionFromEnv"] != "" { @@ -176,9 +200,31 @@ func parseAzureEventHubMetadata(config *ScalerConfig) (*eventHubMetadata, error) } if len(meta.eventHubInfo.EventHubConnection) == 0 { - return nil, fmt.Errorf("no event hub connection string given") + return fmt.Errorf("no event hub connection string given") } case v1alpha1.PodIdentityProviderAzure, v1alpha1.PodIdentityProviderAzureWorkload: + meta.eventHubInfo.StorageAccountName = "" + if val, ok := config.TriggerMetadata["storageAccountName"]; ok { + meta.eventHubInfo.StorageAccountName = val + } else { + logger.Info("no 'storageAccountName' provided to enable identity based authentication to Blob Storage. Attempting to use connection string instead") + } + + if len(meta.eventHubInfo.StorageAccountName) != 0 { + storageEndpointSuffixProvider := func(env az.Environment) (string, error) { + return env.StorageEndpointSuffix, nil + } + storageEndpointSuffix, err := azure.ParseEnvironmentProperty(config.TriggerMetadata, azure.DefaultStorageSuffixKey, storageEndpointSuffixProvider) + if err != nil { + return err + } + meta.eventHubInfo.BlobStorageEndpoint = "blob." + storageEndpointSuffix + } + + if len(meta.eventHubInfo.StorageConnection) == 0 && len(meta.eventHubInfo.StorageAccountName) == 0 { + return fmt.Errorf("no storage connection string or storage account name for pod identity based authentication given") + } + if config.TriggerMetadata["eventHubNamespace"] != "" { meta.eventHubInfo.Namespace = config.TriggerMetadata["eventHubNamespace"] } else if config.TriggerMetadata["eventHubNamespaceFromEnv"] != "" { @@ -186,7 +232,7 @@ func parseAzureEventHubMetadata(config *ScalerConfig) (*eventHubMetadata, error) } if len(meta.eventHubInfo.Namespace) == 0 { - return nil, fmt.Errorf("no event hub namespace string given") + return fmt.Errorf("no event hub namespace string given") } if config.TriggerMetadata["eventHubName"] != "" { @@ -196,13 +242,11 @@ func parseAzureEventHubMetadata(config *ScalerConfig) (*eventHubMetadata, error) } if len(meta.eventHubInfo.EventHubName) == 0 { - return nil, fmt.Errorf("no event hub name string given") + return fmt.Errorf("no event hub name string given") } } - meta.scalerIndex = config.ScalerIndex - - return &meta, nil + return nil } // GetUnprocessedEventCountInPartition gets number of unprocessed events in a given partition diff --git a/pkg/scalers/azure_eventhub_scaler_test.go b/pkg/scalers/azure_eventhub_scaler_test.go index 0e8fae40b69..38ba4bdb38b 100644 --- a/pkg/scalers/azure_eventhub_scaler_test.go +++ b/pkg/scalers/azure_eventhub_scaler_test.go @@ -10,6 +10,7 @@ import ( eventhub "github.com/Azure/azure-event-hubs-go/v3" "github.com/Azure/azure-storage-blob-go/azblob" + "github.com/go-logr/logr" kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1" "github.com/kedacore/keda/v2/pkg/scalers/azure" @@ -20,6 +21,7 @@ const ( eventHubConnectionSetting = "testEventHubConnectionSetting" storageConnectionSetting = "testStorageConnectionSetting" serviceBusEndpointSuffix = "serviceBusEndpointSuffix" + storageEndpointSuffix = "storageEndpointSuffix" activeDirectoryEndpoint = "activeDirectoryEndpoint" eventHubResourceURL = "eventHubResourceURL" testEventHubNamespace = "kedatesteventhub" @@ -87,6 +89,18 @@ var parseEventHubMetadataDatasetWithPodIdentity = []parseEventHubMetadataTestDat // properly formed metadata with private cloud {map[string]string{"storageConnectionFromEnv": storageConnectionSetting, "consumerGroup": eventHubConsumerGroup, "unprocessedEventThreshold": "15", "eventHubName": testEventHubName, "eventHubNamespace": testEventHubNamespace, "cloud": "private", "endpointSuffix": serviceBusEndpointSuffix, "activeDirectoryEndpoint": activeDirectoryEndpoint, "eventHubResourceURL": eventHubResourceURL}, false}, + // properly formed event hub metadata with Pod Identity and no storage connection string + {map[string]string{"storageAccountName": "blobstorage", "consumerGroup": eventHubConsumerGroup, "unprocessedEventThreshold": "15", "eventHubName": testEventHubName, "eventHubNamespace": testEventHubNamespace}, false}, + // event hub metadata with Pod Identity, no storage connection string, no storageAccountName - should fail + {map[string]string{"consumerGroup": eventHubConsumerGroup, "unprocessedEventThreshold": "15", "eventHubName": testEventHubName, "eventHubNamespace": testEventHubNamespace}, true}, + // event hub metadata with Pod Identity, no storage connection string, empty storageAccountName - should fail + {map[string]string{"storageAccount": "", "consumerGroup": eventHubConsumerGroup, "unprocessedEventThreshold": "15", "eventHubName": testEventHubName, "eventHubNamespace": testEventHubNamespace}, true}, + // event hub metadata with Pod Identity, storage connection string, empty storageAccountName - should ignore pod identity for blob storage and succeed + {map[string]string{"storageConnectionFromEnv": storageConnectionSetting, "storageAccountName": "", "consumerGroup": eventHubConsumerGroup, "unprocessedEventThreshold": "15", "eventHubName": testEventHubName, "eventHubNamespace": testEventHubNamespace}, false}, + // event hub metadata with Pod Identity and no storage connection string, private cloud and no storageEndpointSuffix - should fail + {map[string]string{"cloud": "private", "storageAccountName": "blobstorage", "consumerGroup": eventHubConsumerGroup, "unprocessedEventThreshold": "15", "eventHubName": testEventHubName, "eventHubNamespace": testEventHubNamespace}, true}, + // properly formed event hub metadata with Pod Identity and no storage connection string, private cloud and storageEndpointSuffix + {map[string]string{"cloud": "private", "endpointSuffix": serviceBusEndpointSuffix, "activeDirectoryEndpoint": activeDirectoryEndpoint, "eventHubResourceURL": eventHubResourceURL, "storageAccountName": "aStorageAccount", "storageEndpointSuffix": storageEndpointSuffix, "consumerGroup": eventHubConsumerGroup, "unprocessedEventThreshold": "15", "eventHubName": testEventHubName, "eventHubNamespace": testEventHubNamespace}, false}, } var eventHubMetricIdentifiers = []eventHubMetricIdentifier{ @@ -106,7 +120,7 @@ var testEventHubScaler = azureEventHubScaler{ func TestParseEventHubMetadata(t *testing.T) { // Test first with valid resolved environment for _, testData := range parseEventHubMetadataDataset { - _, err := parseAzureEventHubMetadata(&ScalerConfig{TriggerMetadata: testData.metadata, ResolvedEnv: sampleEventHubResolvedEnv, AuthParams: map[string]string{}}) + _, err := parseAzureEventHubMetadata(logr.Discard(), &ScalerConfig{TriggerMetadata: testData.metadata, ResolvedEnv: sampleEventHubResolvedEnv, AuthParams: map[string]string{}}) if err != nil && !testData.isError { t.Errorf("Expected success but got error: %s", err) @@ -117,7 +131,7 @@ func TestParseEventHubMetadata(t *testing.T) { } for _, testData := range parseEventHubMetadataDatasetWithPodIdentity { - _, err := parseAzureEventHubMetadata(&ScalerConfig{TriggerMetadata: testData.metadata, ResolvedEnv: sampleEventHubResolvedEnv, + _, err := parseAzureEventHubMetadata(logr.Discard(), &ScalerConfig{TriggerMetadata: testData.metadata, ResolvedEnv: sampleEventHubResolvedEnv, AuthParams: map[string]string{}, PodIdentity: kedav1alpha1.AuthPodIdentity{Provider: kedav1alpha1.PodIdentityProviderAzure}}) if err != nil && !testData.isError { @@ -129,7 +143,7 @@ func TestParseEventHubMetadata(t *testing.T) { } for _, testData := range parseEventHubMetadataDatasetWithPodIdentity { - _, err := parseAzureEventHubMetadata(&ScalerConfig{TriggerMetadata: testData.metadata, ResolvedEnv: sampleEventHubResolvedEnv, + _, err := parseAzureEventHubMetadata(logr.Discard(), &ScalerConfig{TriggerMetadata: testData.metadata, ResolvedEnv: sampleEventHubResolvedEnv, AuthParams: map[string]string{}, PodIdentity: kedav1alpha1.AuthPodIdentity{Provider: kedav1alpha1.PodIdentityProviderAzureWorkload}}) if err != nil && !testData.isError { @@ -476,7 +490,7 @@ func DeleteContainerInStorage(ctx context.Context, endpoint *url.URL, credential func TestEventHubGetMetricSpecForScaling(t *testing.T) { for _, testData := range eventHubMetricIdentifiers { - meta, err := parseAzureEventHubMetadata(&ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, ResolvedEnv: sampleEventHubResolvedEnv, AuthParams: map[string]string{}, ScalerIndex: testData.scalerIndex}) + meta, err := parseAzureEventHubMetadata(logr.Discard(), &ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, ResolvedEnv: sampleEventHubResolvedEnv, AuthParams: map[string]string{}, ScalerIndex: testData.scalerIndex}) if err != nil { t.Fatal("Could not parse metadata:", err) }