Skip to content

Commit

Permalink
Use rate limiter with all interactions with API
Browse files Browse the repository at this point in the history
The metricset uses multiple API endpoint, so we need to double-check
if the max request rate applies to all endpoints or single ones.
  • Loading branch information
zmoog committed Feb 23, 2024
1 parent 4163848 commit 5727f58
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 2 deletions.
3 changes: 3 additions & 0 deletions x-pack/metricbeat/module/azure/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package azure

import (
"fmt"
"golang.org/x/time/rate"

Check failure on line 9 in x-pack/metricbeat/module/azure/config.go

View workflow job for this annotation

GitHub Actions / lint (windows)

File is not `goimports`-ed with -local github.com/elastic (goimports)

Check failure on line 9 in x-pack/metricbeat/module/azure/config.go

View workflow job for this annotation

GitHub Actions / lint (linux)

File is not `goimports`-ed with -local github.com/elastic (goimports)
"time"

"github.com/elastic/elastic-agent-libs/mapstr"
Expand Down Expand Up @@ -41,6 +42,8 @@ type Config struct {
RefreshListInterval time.Duration `config:"refresh_list_interval"`
DefaultResourceType string `config:"default_resource_type"`
AddCloudMetadata bool `config:"add_cloud_metadata"`
RateLimit rate.Limit `config:"rate_limit"`
RateBurst int `config:"rate_burst"`
// specific to billing
BillingScopeDepartment string `config:"billing_scope_department"` // retrieve usage details from department scope
BillingScopeAccountId string `config:"billing_scope_account_id"` // retrieve usage details from billing account ID scope
Expand Down
56 changes: 54 additions & 2 deletions x-pack/metricbeat/module/azure/monitor_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package azure
import (
"context"
"fmt"
"golang.org/x/time/rate"

Check failure on line 10 in x-pack/metricbeat/module/azure/monitor_service.go

View workflow job for this annotation

GitHub Actions / lint (windows)

File is not `goimports`-ed with -local github.com/elastic (goimports)

Check failure on line 10 in x-pack/metricbeat/module/azure/monitor_service.go

View workflow job for this annotation

GitHub Actions / lint (linux)

File is not `goimports`-ed with -local github.com/elastic (goimports)
"strings"

"github.com/elastic/elastic-agent-libs/logp"
Expand All @@ -27,6 +28,7 @@ type MonitorService struct {
resourceClient *armresources.Client
context context.Context
log *logp.Logger
limiter *rate.Limiter
}

const (
Expand Down Expand Up @@ -91,13 +93,28 @@ func NewService(config Config) (*MonitorService, error) {
return nil, fmt.Errorf("couldn't create metric namespaces client: %w", err)
}

// Rate limiter to avoid hitting the Azure Monitoring API rate limits.
// According to https://learn.microsoft.com/en-us/azure/azure-monitor/service-limits
// the maximum number of queries is 200 per 30 seconds.
rateLimit := rate.Limit(6)
rateBurst := 1

if config.RateLimit >= 0 {
rateLimit = config.RateLimit
}
if config.RateBurst > 0 {
rateBurst = config.RateBurst
}
limiter := rate.NewLimiter(rateLimit, rateBurst) // 6 requests per second (200/30 = 6.67, rounded down to 6 requests per second

service := &MonitorService{
metricDefinitionClient: metricsDefinitionClient,
metricsClient: metricsClient,
metricNamespaceClient: metricNamespaceClient,
resourceClient: resourceClient,
context: context.Background(),
log: logp.NewLogger("azure monitor service"),
limiter: limiter,
}

return service, nil
Expand All @@ -118,6 +135,13 @@ func (service MonitorService) GetResourceDefinitions(id []string, group []string
})

for pager.More() {
// Wait for the rate limiter to allow the request to be made to
// the Azure Monitoring API before making the request.
err := service.limiter.Wait(service.context)
if err != nil {
return nil, err
}

nextResult, err := pager.NextPage(service.context)
if err != nil {
return nil, err
Expand Down Expand Up @@ -169,6 +193,13 @@ func (service MonitorService) GetResourceDefinitions(id []string, group []string

// GetResourceDefinitionById will retrieve the azure resource based on the resource Id
func (service MonitorService) GetResourceDefinitionById(id string) (armresources.GenericResource, error) {
// Wait for the rate limiter to allow the request to be made to
// the Azure Monitoring API before making the request.
err := service.limiter.Wait(service.context)
if err != nil {
return armresources.GenericResource{}, err
}

resp, err := service.resourceClient.GetByID(service.context, id, ApiVersion, nil)
if err != nil {
return armresources.GenericResource{}, err
Expand All @@ -184,6 +215,13 @@ func (service *MonitorService) GetMetricNamespaces(resourceId string) (armmonito
metricNamespaceCollection := armmonitor.MetricNamespaceCollection{}

for pager.More() {
// Wait for the rate limiter to allow the request to be made to
// the Azure Monitoring API before making the request.
err := service.limiter.Wait(service.context)
if err != nil {
return armmonitor.MetricNamespaceCollection{}, err
}

nextPage, err := pager.NextPage(service.context)
if err != nil {
return armmonitor.MetricNamespaceCollection{}, err
Expand All @@ -208,6 +246,13 @@ func (service *MonitorService) GetMetricDefinitions(resourceId string, namespace
metricDefinitionCollection := armmonitor.MetricDefinitionCollection{}

for pager.More() {
// Wait for the rate limiter to allow the request to be made to
// the Azure Monitoring API before making the request.
err := service.limiter.Wait(service.context)
if err != nil {
return armmonitor.MetricDefinitionCollection{}, err
}

nextPage, err := pager.NextPage(service.context)
if err != nil {
return armmonitor.MetricDefinitionCollection{}, err
Expand Down Expand Up @@ -257,14 +302,21 @@ func (service *MonitorService) GetMetricValues(resourceId string, namespace stri
Metricnames: &metricNames,
Timespan: &timespan,
Top: nil,
// Orderby: &orderBy,
ResultType: &resultTypeData,
ResultType: &resultTypeData,
}

if namespace != "" {
opts.Metricnamespace = &namespace
}

// Wait for the rate limiter to allow the request to be made to
// the Azure Monitoring API before making the request.
err := service.limiter.Wait(service.context)
if err != nil {
// TODO: should we check if the context is cancelled?
return metrics, "", err
}

resp, err := service.metricsClient.List(service.context, resourceId, opts)

// check for applied charges before returning any errors
Expand Down

0 comments on commit 5727f58

Please sign in to comment.