From 8cc1cf1ebe5d02a24308287e280430205e80a321 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Mon, 8 Nov 2021 12:15:47 +0100 Subject: [PATCH] enhancement httpjson rate-limit early-limit (#28513) (#28865) * #28475 enhancement filebeat httpjson rate-limit early * #28475 okta should default to early limit to avoid violations * adding changelog entry * lint fix * additional comments and tests on behavior with missing headers * changing default okta-rate limit to 89%. adding docs in Okta module * Re-generate okta docs * removing unused lines from test Co-authored-by: Marc Guasch (cherry picked from commit c77bf19b66a95c0bb669c06b4ffb8e7ba773a22d) Co-authored-by: Alan Hinchliff --- CHANGELOG.next.asciidoc | 1 + filebeat/docs/modules/okta.asciidoc | 11 ++ .../docs/inputs/input-httpjson.asciidoc | 29 +++- .../filebeat/input/httpjson/config_request.go | 15 +- .../filebeat/input/httpjson/rate_limiter.go | 37 ++++- .../input/httpjson/rate_limiter_test.go | 152 ++++++++++++++++++ .../filebeat/module/okta/_meta/docs.asciidoc | 11 ++ .../module/okta/system/config/input.yml | 1 + 8 files changed, 243 insertions(+), 14 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 25141a2e1749..9af16314a55a 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -320,6 +320,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Add support for secondary object attribute handling in ThreatIntel MISP module {pull}28124[28124] - Azure signinlogs - Add support for ManagedIdentitySignInLogs, NonInteractiveUserSignInLogs, and ServicePrincipalSignInLogs. {issue}23653[23653] - Add `base64Decode` and `base64DecodeNoPad` functions to `httpsjon` templates. {pull}28385[28385] +- Add 'early_limit' config option for Rate-Limiting `httpjson`. Default rate-limiting for Okta will start when remaining is `1`. {pull}28513[28513] - Add latency config option for aws-cloudwatch input. {pull}28509[28509] - Added proxy support to threatintel/malwarebazaar. {pull}28533[28533] - Add `text/csv` decoder to `httpjson` input {pull}28564[28564] diff --git a/filebeat/docs/modules/okta.asciidoc b/filebeat/docs/modules/okta.asciidoc index 5a5acbd19dc4..a9048f644eea 100644 --- a/filebeat/docs/modules/okta.asciidoc +++ b/filebeat/docs/modules/okta.asciidoc @@ -102,6 +102,17 @@ An initial interval can be defined. The first time the module starts, will fetch var.initial_interval: 24h # will fetch events starting 24h ago. ---- +*`input.request.rate_limit.early_limit`*:: + +You can override the default rate-limiting behavior in <>. +The default for the Okta module is to use up to 89% of the Okta rate-limit, +which should avoid Okta Warnings on rate-limit usage. ++ +[source.yaml] +---- + input.request.rate_limit.early_limit: 0.89 +---- + [float] === Example dashboard diff --git a/x-pack/filebeat/docs/inputs/input-httpjson.asciidoc b/x-pack/filebeat/docs/inputs/input-httpjson.asciidoc index 4548b40413e4..027ef66e3e70 100644 --- a/x-pack/filebeat/docs/inputs/input-httpjson.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-httpjson.asciidoc @@ -434,19 +434,42 @@ The maximum number of redirects to follow for a request. Default: `10`. [float] ==== `request.rate_limit.limit` -The value of the response that specifies the total limit. It is defined with a Go template value. Can read state from: [`.last_response.header`] +The value of the response that specifies the total limit. It is defined with a Go template value. +Can read state from: [`.last_response.header`] [float] ==== `request.rate_limit.remaining` -The value of the response that specifies the remaining quota of the rate limit. It is defined with a Go template value. Can read state from: [`.last_response.header`] +The value of the response that specifies the remaining quota of the rate limit. +It is defined with a Go template value. Can read state from: [`.last_response.header`] +If the `remaining` header is missing from the Response, no rate-limiting will occur. [float] ==== `request.rate_limit.reset` -The value of the response that specifies the epoch time when the rate limit will reset. It is defined with a Go template value. Can read state from: [`.last_response.header`] +The value of the response that specifies the epoch time when the rate limit will reset. +It is defined with a Go template value. Can read state from: [`.last_response.header`] [[request-transforms]] +[float] +==== `request.rate_limit.early_limit` + +Optionally start rate-limiting prior to the value specified in the Response. + +Under the default behavior, Requests will continue while the `remaining` value is non-zero. +Specifying an `early_limit` will mean that rate-limiting will occur prior to reaching `0`. + +* If the value specified for `early_limit` is less than `1`, +the value is treated as a percentage of the Response provided `limit`. +e.g. specifying `0.9` will mean that Requests will continue until reaching 90% of the rate-limit -- +for a `limit` value of `120`, the rate-limit starts when the `remaining` reaches `12`. +If the `limit` header is missing from the Response, default rate-limiting will occur (when `remaining` reaches `0`). +* If the value specified for `early_limit` is greater than or equal to `1`, +the value is treated as the target value for `remaining`. +e.g. instead of rate-limiting when `remaining` hits `0`, rate-limiting will occur when `remaining` hits the value specified. + +It is not set by default (by default the rate-limiting as specified in the Response is followed). + [float] ==== `request.transforms` diff --git a/x-pack/filebeat/input/httpjson/config_request.go b/x-pack/filebeat/input/httpjson/config_request.go index f5982e37d1a5..b9d6a494cce4 100644 --- a/x-pack/filebeat/input/httpjson/config_request.go +++ b/x-pack/filebeat/input/httpjson/config_request.go @@ -55,9 +55,18 @@ func (c retryConfig) getWaitMax() time.Duration { } type rateLimitConfig struct { - Limit *valueTpl `config:"limit"` - Reset *valueTpl `config:"reset"` - Remaining *valueTpl `config:"remaining"` + Limit *valueTpl `config:"limit"` + Reset *valueTpl `config:"reset"` + Remaining *valueTpl `config:"remaining"` + EarlyLimit *float64 `config:"early_limit"` +} + +func (c rateLimitConfig) Validate() error { + switch { + case c.EarlyLimit != nil && *c.EarlyLimit < 0: + return errors.New("early_limit must be greater than or equal to 0") + } + return nil } type urlConfig struct { diff --git a/x-pack/filebeat/input/httpjson/rate_limiter.go b/x-pack/filebeat/input/httpjson/rate_limiter.go index ab2ef2715e9d..6eb55f0159be 100644 --- a/x-pack/filebeat/input/httpjson/rate_limiter.go +++ b/x-pack/filebeat/input/httpjson/rate_limiter.go @@ -18,9 +18,10 @@ import ( type rateLimiter struct { log *logp.Logger - limit *valueTpl - reset *valueTpl - remaining *valueTpl + limit *valueTpl + reset *valueTpl + remaining *valueTpl + earlyLimit *float64 } func newRateLimiterFromConfig(config *rateLimitConfig, log *logp.Logger) *rateLimiter { @@ -29,10 +30,11 @@ func newRateLimiterFromConfig(config *rateLimitConfig, log *logp.Logger) *rateLi } return &rateLimiter{ - log: log, - limit: config.Limit, - reset: config.Reset, - remaining: config.Remaining, + log: log, + limit: config.Limit, + reset: config.Reset, + remaining: config.Remaining, + earlyLimit: config.EarlyLimit, } } @@ -114,7 +116,26 @@ func (r *rateLimiter) getRateLimit(resp *http.Response) (int64, error) { return 0, fmt.Errorf("failed to parse rate-limit remaining value: %w", err) } - if m != 0 { + // by default, httpjson will continue requests until Limit is 0 + // can optionally stop requests "early" + var activeLimit int64 = 0 + if r.earlyLimit != nil { + var earlyLimit float64 = *r.earlyLimit + if earlyLimit > 0 && earlyLimit < 1 { + limit, _ := r.limit.Execute(ctx, tr, nil, r.log) + if limit != "" { + l, err := strconv.ParseInt(limit, 10, 64) + if err == nil { + activeLimit = l - int64(earlyLimit*float64(l)) + } + } + } else if earlyLimit >= 1 { + activeLimit = int64(earlyLimit) + } + } + + r.log.Debugf("Rate Limit: Using active Early Limit: %f", activeLimit) + if m > activeLimit { return 0, nil } diff --git a/x-pack/filebeat/input/httpjson/rate_limiter_test.go b/x-pack/filebeat/input/httpjson/rate_limiter_test.go index 7ede16514d44..5f2ca5a03afc 100644 --- a/x-pack/filebeat/input/httpjson/rate_limiter_test.go +++ b/x-pack/filebeat/input/httpjson/rate_limiter_test.go @@ -88,3 +88,155 @@ func TestGetRateLimitReturnsResetValue(t *testing.T) { assert.NoError(t, err) assert.EqualValues(t, 1604582832, epoch2) } + +// Test getRateLimit function with a remaining quota, using default early limit +// expect to receive 0, nil. +func TestGetRateLimitReturns0IfEarlyLimit0(t *testing.T) { + resetEpoch := int64(1634579974 + 100) + timeNow = func() time.Time { return time.Unix(1634579974, 0).UTC() } + t.Cleanup(func() { timeNow = time.Now }) + + header := make(http.Header) + header.Add("X-Rate-Limit-Limit", "120") + header.Add("X-Rate-Limit-Remaining", "1") + header.Add("X-Rate-Limit-Reset", strconv.FormatInt(resetEpoch, 10)) + tplLimit := &valueTpl{} + tplReset := &valueTpl{} + tplRemaining := &valueTpl{} + earlyLimit := func(i float64) *float64 { return &i }(0) + assert.NoError(t, tplLimit.Unpack(`[[.last_response.header.Get "X-Rate-Limit-Limit"]]`)) + assert.NoError(t, tplReset.Unpack(`[[.last_response.header.Get "X-Rate-Limit-Reset"]]`)) + assert.NoError(t, tplRemaining.Unpack(`[[.last_response.header.Get "X-Rate-Limit-Remaining"]]`)) + rateLimit := &rateLimiter{ + limit: tplLimit, + reset: tplReset, + remaining: tplRemaining, + log: logp.NewLogger("TestGetRateLimitReturns0IfEarlyLimit0"), + earlyLimit: earlyLimit, + } + resp := &http.Response{Header: header} + epoch, err := rateLimit.getRateLimit(resp) + assert.NoError(t, err) + assert.EqualValues(t, 0, epoch) +} + +// Test getRateLimit function with a remaining limit, but early limit +// expect to receive Reset Time +func TestGetRateLimitReturnsResetValueIfEarlyLimit1(t *testing.T) { + resetEpoch := int64(1634579974 + 100) + timeNow = func() time.Time { return time.Unix(1634579974, 0).UTC() } + t.Cleanup(func() { timeNow = time.Now }) + + header := make(http.Header) + header.Add("X-Rate-Limit-Limit", "120") + header.Add("X-Rate-Limit-Remaining", "1") + header.Add("X-Rate-Limit-Reset", strconv.FormatInt(resetEpoch, 10)) + tplLimit := &valueTpl{} + tplReset := &valueTpl{} + tplRemaining := &valueTpl{} + earlyLimit := func(i float64) *float64 { return &i }(1) + assert.NoError(t, tplLimit.Unpack(`[[.last_response.header.Get "X-Rate-Limit-Limit"]]`)) + assert.NoError(t, tplReset.Unpack(`[[.last_response.header.Get "X-Rate-Limit-Reset"]]`)) + assert.NoError(t, tplRemaining.Unpack(`[[.last_response.header.Get "X-Rate-Limit-Remaining"]]`)) + rateLimit := &rateLimiter{ + limit: tplLimit, + reset: tplReset, + remaining: tplRemaining, + log: logp.NewLogger("TestGetRateLimitReturnsResetValueIfEarlyLimit1"), + earlyLimit: earlyLimit, + } + resp := &http.Response{Header: header} + epoch, err := rateLimit.getRateLimit(resp) + assert.NoError(t, err) + assert.EqualValues(t, resetEpoch, epoch) +} + +// Test getRateLimit function with a remaining quota, using 90% early limit +// expect to receive 0, nil. +func TestGetRateLimitReturns0IfEarlyLimitPercent(t *testing.T) { + resetEpoch := int64(1634579974 + 100) + timeNow = func() time.Time { return time.Unix(1634579974, 0).UTC() } + t.Cleanup(func() { timeNow = time.Now }) + + header := make(http.Header) + header.Add("X-Rate-Limit-Limit", "120") + header.Add("X-Rate-Limit-Remaining", "13") + header.Add("X-Rate-Limit-Reset", strconv.FormatInt(resetEpoch, 10)) + tplLimit := &valueTpl{} + tplReset := &valueTpl{} + tplRemaining := &valueTpl{} + earlyLimit := func(i float64) *float64 { return &i }(0.9) + assert.NoError(t, tplLimit.Unpack(`[[.last_response.header.Get "X-Rate-Limit-Limit"]]`)) + assert.NoError(t, tplReset.Unpack(`[[.last_response.header.Get "X-Rate-Limit-Reset"]]`)) + assert.NoError(t, tplRemaining.Unpack(`[[.last_response.header.Get "X-Rate-Limit-Remaining"]]`)) + rateLimit := &rateLimiter{ + limit: tplLimit, + reset: tplReset, + remaining: tplRemaining, + log: logp.NewLogger("TestGetRateLimitReturns0IfEarlyLimitPercent"), + earlyLimit: earlyLimit, + } + resp := &http.Response{Header: header} + epoch, err := rateLimit.getRateLimit(resp) + assert.NoError(t, err) + assert.EqualValues(t, 0, epoch) +} + +// Test getRateLimit function with a remaining limit, but early limit of 90% +// expect to receive Reset Time +func TestGetRateLimitReturnsResetValueIfEarlyLimitPercent(t *testing.T) { + resetEpoch := int64(1634579974 + 100) + timeNow = func() time.Time { return time.Unix(1634579974, 0).UTC() } + t.Cleanup(func() { timeNow = time.Now }) + + header := make(http.Header) + header.Add("X-Rate-Limit-Limit", "120") + header.Add("X-Rate-Limit-Remaining", "12") + header.Add("X-Rate-Limit-Reset", strconv.FormatInt(resetEpoch, 10)) + tplLimit := &valueTpl{} + tplReset := &valueTpl{} + tplRemaining := &valueTpl{} + earlyLimit := func(i float64) *float64 { return &i }(0.9) + assert.NoError(t, tplLimit.Unpack(`[[.last_response.header.Get "X-Rate-Limit-Limit"]]`)) + assert.NoError(t, tplReset.Unpack(`[[.last_response.header.Get "X-Rate-Limit-Reset"]]`)) + assert.NoError(t, tplRemaining.Unpack(`[[.last_response.header.Get "X-Rate-Limit-Remaining"]]`)) + rateLimit := &rateLimiter{ + limit: tplLimit, + reset: tplReset, + remaining: tplRemaining, + log: logp.NewLogger("TestGetRateLimitReturnsResetValueIfEarlyLimitPercent"), + earlyLimit: earlyLimit, + } + resp := &http.Response{Header: header} + epoch, err := rateLimit.getRateLimit(resp) + assert.NoError(t, err) + assert.EqualValues(t, resetEpoch, epoch) +} + +// Test getRateLimit function when "Limit" header is missing, when using a Percentage early-limit +// expect to receive 0, nil. (default rate-limiting) +func TestGetRateLimitWhenMissingLimit(t *testing.T) { + resetEpoch := int64(1634579974 + 100) + timeNow = func() time.Time { return time.Unix(1634579974, 0).UTC() } + t.Cleanup(func() { timeNow = time.Now }) + + header := make(http.Header) + header.Add("X-Rate-Limit-Remaining", "1") + header.Add("X-Rate-Limit-Reset", strconv.FormatInt(resetEpoch, 10)) + tplReset := &valueTpl{} + tplRemaining := &valueTpl{} + earlyLimit := func(i float64) *float64 { return &i }(0.9) + assert.NoError(t, tplReset.Unpack(`[[.last_response.header.Get "X-Rate-Limit-Reset"]]`)) + assert.NoError(t, tplRemaining.Unpack(`[[.last_response.header.Get "X-Rate-Limit-Remaining"]]`)) + rateLimit := &rateLimiter{ + limit: nil, + reset: tplReset, + remaining: tplRemaining, + log: logp.NewLogger("TestGetRateLimitWhenMissingLimit"), + earlyLimit: earlyLimit, + } + resp := &http.Response{Header: header} + epoch, err := rateLimit.getRateLimit(resp) + assert.NoError(t, err) + assert.EqualValues(t, 0, epoch) +} diff --git a/x-pack/filebeat/module/okta/_meta/docs.asciidoc b/x-pack/filebeat/module/okta/_meta/docs.asciidoc index e3cf3f47d1ad..03c7838aeec6 100644 --- a/x-pack/filebeat/module/okta/_meta/docs.asciidoc +++ b/x-pack/filebeat/module/okta/_meta/docs.asciidoc @@ -97,6 +97,17 @@ An initial interval can be defined. The first time the module starts, will fetch var.initial_interval: 24h # will fetch events starting 24h ago. ---- +*`input.request.rate_limit.early_limit`*:: + +You can override the default rate-limiting behavior in <>. +The default for the Okta module is to use up to 89% of the Okta rate-limit, +which should avoid Okta Warnings on rate-limit usage. ++ +[source.yaml] +---- + input.request.rate_limit.early_limit: 0.89 +---- + [float] === Example dashboard diff --git a/x-pack/filebeat/module/okta/system/config/input.yml b/x-pack/filebeat/module/okta/system/config/input.yml index 8beb7e70e346..a33e4654b803 100644 --- a/x-pack/filebeat/module/okta/system/config/input.yml +++ b/x-pack/filebeat/module/okta/system/config/input.yml @@ -22,6 +22,7 @@ request.rate_limit: limit: '[[.last_response.header.Get "X-Rate-Limit-Limit"]]' remaining: '[[.last_response.header.Get "X-Rate-Limit-Remaining"]]' reset: '[[.last_response.header.Get "X-Rate-Limit-Reset"]]' + early_limit: 0.89 request.transforms: - set: target: header.Authorization