diff --git a/pkg/scalers/azure_eventhub_scaler.go b/pkg/scalers/azure_eventhub_scaler.go index 51a308248ad..272e2420db8 100644 --- a/pkg/scalers/azure_eventhub_scaler.go +++ b/pkg/scalers/azure_eventhub_scaler.go @@ -96,12 +96,27 @@ func parseAzureEventHubMetadata(logger logr.Logger, config *ScalerConfig) (*even meta := eventHubMetadata{ eventHubInfo: azure.EventHubInfo{}, } + + err := parseCommonAzureEventHubMetadata(config, &meta) + if err != nil { + return nil, err + } + + err = parseAzureEventHubAuthenticationMetadata(logger, config, &meta) + if err != nil { + return nil, err + } + + return &meta, nil +} + +func parseCommonAzureEventHubMetadata(config *ScalerConfig, meta *eventHubMetadata) error { meta.threshold = defaultEventHubMessageThreshold if val, ok := config.TriggerMetadata[thresholdMetricName]; ok { threshold, err := strconv.ParseInt(val, 10, 64) if err != nil { - return nil, fmt.Errorf("error parsing azure eventhub metadata %s: %s", thresholdMetricName, err) + return fmt.Errorf("error parsing azure eventhub metadata %s: %s", thresholdMetricName, err) } meta.threshold = threshold @@ -111,7 +126,7 @@ func parseAzureEventHubMetadata(logger logr.Logger, config *ScalerConfig) (*even if val, ok := config.TriggerMetadata[activationThresholdMetricName]; ok { activationThreshold, err := strconv.ParseInt(val, 10, 64) if err != nil { - return nil, fmt.Errorf("error parsing azure eventhub metadata %s: %s", activationThresholdMetricName, err) + return fmt.Errorf("error parsing azure eventhub metadata %s: %s", activationThresholdMetricName, err) } meta.activationThreshold = activationThreshold @@ -144,7 +159,7 @@ func parseAzureEventHubMetadata(logger logr.Logger, config *ScalerConfig) (*even if resourceURL, ok := config.TriggerMetadata["eventHubResourceURL"]; ok { meta.eventHubInfo.EventHubResourceURL = resourceURL } else { - return nil, fmt.Errorf("eventHubResourceURL must be provided for %s cloud type", azure.PrivateCloud) + return fmt.Errorf("eventHubResourceURL must be provided for %s cloud type", azure.PrivateCloud) } } } @@ -154,21 +169,28 @@ func parseAzureEventHubMetadata(logger logr.Logger, config *ScalerConfig) (*even } serviceBusEndpointSuffix, err := azure.ParseEnvironmentProperty(config.TriggerMetadata, azure.DefaultEndpointSuffixKey, serviceBusEndpointSuffixProvider) if err != nil { - return nil, err + return err } meta.eventHubInfo.ServiceBusEndpointSuffix = serviceBusEndpointSuffix activeDirectoryEndpoint, err := azure.ParseActiveDirectoryEndpoint(config.TriggerMetadata) if err != nil { - return nil, err + return err } meta.eventHubInfo.ActiveDirectoryEndpoint = activeDirectoryEndpoint + meta.scalerIndex = config.ScalerIndex + + return nil +} + +func parseAzureEventHubAuthenticationMetadata(logger logr.Logger, config *ScalerConfig, meta *eventHubMetadata) error { meta.eventHubInfo.PodIdentity = config.PodIdentity + switch config.PodIdentity.Provider { case "", v1alpha1.PodIdentityProviderNone: if len(meta.eventHubInfo.StorageConnection) == 0 { - return nil, fmt.Errorf("no storage connection string given") + return fmt.Errorf("no storage connection string given") } if config.AuthParams["connection"] != "" { @@ -178,7 +200,7 @@ func parseAzureEventHubMetadata(logger logr.Logger, config *ScalerConfig) (*even } if len(meta.eventHubInfo.EventHubConnection) == 0 { - return nil, fmt.Errorf("no event hub connection string given") + return fmt.Errorf("no event hub connection string given") } case v1alpha1.PodIdentityProviderAzure, v1alpha1.PodIdentityProviderAzureWorkload: meta.eventHubInfo.StorageAccountName = "" @@ -194,13 +216,13 @@ func parseAzureEventHubMetadata(logger logr.Logger, config *ScalerConfig) (*even } storageEndpointSuffix, err := azure.ParseEnvironmentProperty(config.TriggerMetadata, azure.DefaultStorageSuffixKey, storageEndpointSuffixProvider) if err != nil { - return nil, err + return err } meta.eventHubInfo.BlobStorageEndpoint = "blob." + storageEndpointSuffix } if len(meta.eventHubInfo.StorageConnection) == 0 && len(meta.eventHubInfo.StorageAccountName) == 0 { - return nil, fmt.Errorf("no storage connection string or storage account name for pod identity based authentication given") + return fmt.Errorf("no storage connection string or storage account name for pod identity based authentication given") } if config.TriggerMetadata["eventHubNamespace"] != "" { @@ -210,7 +232,7 @@ func parseAzureEventHubMetadata(logger logr.Logger, config *ScalerConfig) (*even } if len(meta.eventHubInfo.Namespace) == 0 { - return nil, fmt.Errorf("no event hub namespace string given") + return fmt.Errorf("no event hub namespace string given") } if config.TriggerMetadata["eventHubName"] != "" { @@ -220,13 +242,11 @@ func parseAzureEventHubMetadata(logger logr.Logger, config *ScalerConfig) (*even } if len(meta.eventHubInfo.EventHubName) == 0 { - return nil, fmt.Errorf("no event hub name string given") + return fmt.Errorf("no event hub name string given") } } - meta.scalerIndex = config.ScalerIndex - - return &meta, nil + return nil } // GetUnprocessedEventCountInPartition gets number of unprocessed events in a given partition