Skip to content

Commit

Permalink
enhancement httpjson rate-limit early-limit (#28513)
Browse files Browse the repository at this point in the history
* #28475 enhancement filebeat httpjson rate-limit early

* #28475 okta should default to early limit to avoid violations

* adding changelog entry

* lint fix

* additional comments and tests on behavior with missing headers

* changing default okta-rate limit to 89%. adding docs in Okta module

* Re-generate okta docs

* removing unused lines from test

Co-authored-by: Marc Guasch <[email protected]>
  • Loading branch information
hinchliff and marc-gr authored Oct 28, 2021
1 parent 8562134 commit c77bf19
Show file tree
Hide file tree
Showing 8 changed files with 243 additions and 14 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -779,6 +779,7 @@ for a few releases. Please use other tools provided by Elastic to fetch data fro
- Add support for secondary object attribute handling in ThreatIntel MISP module {pull}28124[28124]
- Azure signinlogs - Add support for ManagedIdentitySignInLogs, NonInteractiveUserSignInLogs, and ServicePrincipalSignInLogs. {issue}23653[23653]
- Add `base64Decode` and `base64DecodeNoPad` functions to `httpsjon` templates. {pull}28385[28385]
- Add 'early_limit' config option for Rate-Limiting `httpjson`. Default rate-limiting for Okta will start when remaining is `1`. {pull}28513[28513]
- Add latency config option for aws-cloudwatch input. {pull}28509[28509]
- Added proxy support to threatintel/malwarebazaar. {pull}28533[28533]
- Add `text/csv` decoder to `httpjson` input {pull}28564[28564]
Expand Down
11 changes: 11 additions & 0 deletions filebeat/docs/modules/okta.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,17 @@ An initial interval can be defined. The first time the module starts, will fetch
var.initial_interval: 24h # will fetch events starting 24h ago.
----

*`input.request.rate_limit.early_limit`*::

You can override the default rate-limiting behavior in <<filebeat-input-httpjson>>.
The default for the Okta module is to use up to 89% of the Okta rate-limit,
which should avoid Okta Warnings on rate-limit usage.
+
[source.yaml]
----
input.request.rate_limit.early_limit: 0.89
----

[float]
=== Example dashboard

Expand Down
29 changes: 26 additions & 3 deletions x-pack/filebeat/docs/inputs/input-httpjson.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -434,19 +434,42 @@ The maximum number of redirects to follow for a request. Default: `10`.
[float]
==== `request.rate_limit.limit`

The value of the response that specifies the total limit. It is defined with a Go template value. Can read state from: [`.last_response.header`]
The value of the response that specifies the total limit. It is defined with a Go template value.
Can read state from: [`.last_response.header`]

[float]
==== `request.rate_limit.remaining`

The value of the response that specifies the remaining quota of the rate limit. It is defined with a Go template value. Can read state from: [`.last_response.header`]
The value of the response that specifies the remaining quota of the rate limit.
It is defined with a Go template value. Can read state from: [`.last_response.header`]
If the `remaining` header is missing from the Response, no rate-limiting will occur.

[float]
==== `request.rate_limit.reset`

The value of the response that specifies the epoch time when the rate limit will reset. It is defined with a Go template value. Can read state from: [`.last_response.header`]
The value of the response that specifies the epoch time when the rate limit will reset.
It is defined with a Go template value. Can read state from: [`.last_response.header`]

[[request-transforms]]
[float]
==== `request.rate_limit.early_limit`

Optionally start rate-limiting prior to the value specified in the Response.

Under the default behavior, Requests will continue while the `remaining` value is non-zero.
Specifying an `early_limit` will mean that rate-limiting will occur prior to reaching `0`.

* If the value specified for `early_limit` is less than `1`,
the value is treated as a percentage of the Response provided `limit`.
e.g. specifying `0.9` will mean that Requests will continue until reaching 90% of the rate-limit --
for a `limit` value of `120`, the rate-limit starts when the `remaining` reaches `12`.
If the `limit` header is missing from the Response, default rate-limiting will occur (when `remaining` reaches `0`).
* If the value specified for `early_limit` is greater than or equal to `1`,
the value is treated as the target value for `remaining`.
e.g. instead of rate-limiting when `remaining` hits `0`, rate-limiting will occur when `remaining` hits the value specified.

It is not set by default (by default the rate-limiting as specified in the Response is followed).

[float]
==== `request.transforms`

Expand Down
15 changes: 12 additions & 3 deletions x-pack/filebeat/input/httpjson/config_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,18 @@ func (c retryConfig) getWaitMax() time.Duration {
}

type rateLimitConfig struct {
Limit *valueTpl `config:"limit"`
Reset *valueTpl `config:"reset"`
Remaining *valueTpl `config:"remaining"`
Limit *valueTpl `config:"limit"`
Reset *valueTpl `config:"reset"`
Remaining *valueTpl `config:"remaining"`
EarlyLimit *float64 `config:"early_limit"`
}

func (c rateLimitConfig) Validate() error {
switch {
case c.EarlyLimit != nil && *c.EarlyLimit < 0:
return errors.New("early_limit must be greater than or equal to 0")
}
return nil
}

type urlConfig struct {
Expand Down
37 changes: 29 additions & 8 deletions x-pack/filebeat/input/httpjson/rate_limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ import (
type rateLimiter struct {
log *logp.Logger

limit *valueTpl
reset *valueTpl
remaining *valueTpl
limit *valueTpl
reset *valueTpl
remaining *valueTpl
earlyLimit *float64
}

func newRateLimiterFromConfig(config *rateLimitConfig, log *logp.Logger) *rateLimiter {
Expand All @@ -29,10 +30,11 @@ func newRateLimiterFromConfig(config *rateLimitConfig, log *logp.Logger) *rateLi
}

return &rateLimiter{
log: log,
limit: config.Limit,
reset: config.Reset,
remaining: config.Remaining,
log: log,
limit: config.Limit,
reset: config.Reset,
remaining: config.Remaining,
earlyLimit: config.EarlyLimit,
}
}

Expand Down Expand Up @@ -114,7 +116,26 @@ func (r *rateLimiter) getRateLimit(resp *http.Response) (int64, error) {
return 0, fmt.Errorf("failed to parse rate-limit remaining value: %w", err)
}

if m != 0 {
// by default, httpjson will continue requests until Limit is 0
// can optionally stop requests "early"
var activeLimit int64 = 0
if r.earlyLimit != nil {
var earlyLimit float64 = *r.earlyLimit
if earlyLimit > 0 && earlyLimit < 1 {
limit, _ := r.limit.Execute(ctx, tr, nil, r.log)
if limit != "" {
l, err := strconv.ParseInt(limit, 10, 64)
if err == nil {
activeLimit = l - int64(earlyLimit*float64(l))
}
}
} else if earlyLimit >= 1 {
activeLimit = int64(earlyLimit)
}
}

r.log.Debugf("Rate Limit: Using active Early Limit: %f", activeLimit)
if m > activeLimit {
return 0, nil
}

Expand Down
152 changes: 152 additions & 0 deletions x-pack/filebeat/input/httpjson/rate_limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,3 +88,155 @@ func TestGetRateLimitReturnsResetValue(t *testing.T) {
assert.NoError(t, err)
assert.EqualValues(t, 1604582832, epoch2)
}

// Test getRateLimit function with a remaining quota, using default early limit
// expect to receive 0, nil.
func TestGetRateLimitReturns0IfEarlyLimit0(t *testing.T) {
resetEpoch := int64(1634579974 + 100)
timeNow = func() time.Time { return time.Unix(1634579974, 0).UTC() }
t.Cleanup(func() { timeNow = time.Now })

header := make(http.Header)
header.Add("X-Rate-Limit-Limit", "120")
header.Add("X-Rate-Limit-Remaining", "1")
header.Add("X-Rate-Limit-Reset", strconv.FormatInt(resetEpoch, 10))
tplLimit := &valueTpl{}
tplReset := &valueTpl{}
tplRemaining := &valueTpl{}
earlyLimit := func(i float64) *float64 { return &i }(0)
assert.NoError(t, tplLimit.Unpack(`[[.last_response.header.Get "X-Rate-Limit-Limit"]]`))
assert.NoError(t, tplReset.Unpack(`[[.last_response.header.Get "X-Rate-Limit-Reset"]]`))
assert.NoError(t, tplRemaining.Unpack(`[[.last_response.header.Get "X-Rate-Limit-Remaining"]]`))
rateLimit := &rateLimiter{
limit: tplLimit,
reset: tplReset,
remaining: tplRemaining,
log: logp.NewLogger("TestGetRateLimitReturns0IfEarlyLimit0"),
earlyLimit: earlyLimit,
}
resp := &http.Response{Header: header}
epoch, err := rateLimit.getRateLimit(resp)
assert.NoError(t, err)
assert.EqualValues(t, 0, epoch)
}

// Test getRateLimit function with a remaining limit, but early limit
// expect to receive Reset Time
func TestGetRateLimitReturnsResetValueIfEarlyLimit1(t *testing.T) {
resetEpoch := int64(1634579974 + 100)
timeNow = func() time.Time { return time.Unix(1634579974, 0).UTC() }
t.Cleanup(func() { timeNow = time.Now })

header := make(http.Header)
header.Add("X-Rate-Limit-Limit", "120")
header.Add("X-Rate-Limit-Remaining", "1")
header.Add("X-Rate-Limit-Reset", strconv.FormatInt(resetEpoch, 10))
tplLimit := &valueTpl{}
tplReset := &valueTpl{}
tplRemaining := &valueTpl{}
earlyLimit := func(i float64) *float64 { return &i }(1)
assert.NoError(t, tplLimit.Unpack(`[[.last_response.header.Get "X-Rate-Limit-Limit"]]`))
assert.NoError(t, tplReset.Unpack(`[[.last_response.header.Get "X-Rate-Limit-Reset"]]`))
assert.NoError(t, tplRemaining.Unpack(`[[.last_response.header.Get "X-Rate-Limit-Remaining"]]`))
rateLimit := &rateLimiter{
limit: tplLimit,
reset: tplReset,
remaining: tplRemaining,
log: logp.NewLogger("TestGetRateLimitReturnsResetValueIfEarlyLimit1"),
earlyLimit: earlyLimit,
}
resp := &http.Response{Header: header}
epoch, err := rateLimit.getRateLimit(resp)
assert.NoError(t, err)
assert.EqualValues(t, resetEpoch, epoch)
}

// Test getRateLimit function with a remaining quota, using 90% early limit
// expect to receive 0, nil.
func TestGetRateLimitReturns0IfEarlyLimitPercent(t *testing.T) {
resetEpoch := int64(1634579974 + 100)
timeNow = func() time.Time { return time.Unix(1634579974, 0).UTC() }
t.Cleanup(func() { timeNow = time.Now })

header := make(http.Header)
header.Add("X-Rate-Limit-Limit", "120")
header.Add("X-Rate-Limit-Remaining", "13")
header.Add("X-Rate-Limit-Reset", strconv.FormatInt(resetEpoch, 10))
tplLimit := &valueTpl{}
tplReset := &valueTpl{}
tplRemaining := &valueTpl{}
earlyLimit := func(i float64) *float64 { return &i }(0.9)
assert.NoError(t, tplLimit.Unpack(`[[.last_response.header.Get "X-Rate-Limit-Limit"]]`))
assert.NoError(t, tplReset.Unpack(`[[.last_response.header.Get "X-Rate-Limit-Reset"]]`))
assert.NoError(t, tplRemaining.Unpack(`[[.last_response.header.Get "X-Rate-Limit-Remaining"]]`))
rateLimit := &rateLimiter{
limit: tplLimit,
reset: tplReset,
remaining: tplRemaining,
log: logp.NewLogger("TestGetRateLimitReturns0IfEarlyLimitPercent"),
earlyLimit: earlyLimit,
}
resp := &http.Response{Header: header}
epoch, err := rateLimit.getRateLimit(resp)
assert.NoError(t, err)
assert.EqualValues(t, 0, epoch)
}

// Test getRateLimit function with a remaining limit, but early limit of 90%
// expect to receive Reset Time
func TestGetRateLimitReturnsResetValueIfEarlyLimitPercent(t *testing.T) {
resetEpoch := int64(1634579974 + 100)
timeNow = func() time.Time { return time.Unix(1634579974, 0).UTC() }
t.Cleanup(func() { timeNow = time.Now })

header := make(http.Header)
header.Add("X-Rate-Limit-Limit", "120")
header.Add("X-Rate-Limit-Remaining", "12")
header.Add("X-Rate-Limit-Reset", strconv.FormatInt(resetEpoch, 10))
tplLimit := &valueTpl{}
tplReset := &valueTpl{}
tplRemaining := &valueTpl{}
earlyLimit := func(i float64) *float64 { return &i }(0.9)
assert.NoError(t, tplLimit.Unpack(`[[.last_response.header.Get "X-Rate-Limit-Limit"]]`))
assert.NoError(t, tplReset.Unpack(`[[.last_response.header.Get "X-Rate-Limit-Reset"]]`))
assert.NoError(t, tplRemaining.Unpack(`[[.last_response.header.Get "X-Rate-Limit-Remaining"]]`))
rateLimit := &rateLimiter{
limit: tplLimit,
reset: tplReset,
remaining: tplRemaining,
log: logp.NewLogger("TestGetRateLimitReturnsResetValueIfEarlyLimitPercent"),
earlyLimit: earlyLimit,
}
resp := &http.Response{Header: header}
epoch, err := rateLimit.getRateLimit(resp)
assert.NoError(t, err)
assert.EqualValues(t, resetEpoch, epoch)
}

// Test getRateLimit function when "Limit" header is missing, when using a Percentage early-limit
// expect to receive 0, nil. (default rate-limiting)
func TestGetRateLimitWhenMissingLimit(t *testing.T) {
resetEpoch := int64(1634579974 + 100)
timeNow = func() time.Time { return time.Unix(1634579974, 0).UTC() }
t.Cleanup(func() { timeNow = time.Now })

header := make(http.Header)
header.Add("X-Rate-Limit-Remaining", "1")
header.Add("X-Rate-Limit-Reset", strconv.FormatInt(resetEpoch, 10))
tplReset := &valueTpl{}
tplRemaining := &valueTpl{}
earlyLimit := func(i float64) *float64 { return &i }(0.9)
assert.NoError(t, tplReset.Unpack(`[[.last_response.header.Get "X-Rate-Limit-Reset"]]`))
assert.NoError(t, tplRemaining.Unpack(`[[.last_response.header.Get "X-Rate-Limit-Remaining"]]`))
rateLimit := &rateLimiter{
limit: nil,
reset: tplReset,
remaining: tplRemaining,
log: logp.NewLogger("TestGetRateLimitWhenMissingLimit"),
earlyLimit: earlyLimit,
}
resp := &http.Response{Header: header}
epoch, err := rateLimit.getRateLimit(resp)
assert.NoError(t, err)
assert.EqualValues(t, 0, epoch)
}
11 changes: 11 additions & 0 deletions x-pack/filebeat/module/okta/_meta/docs.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,17 @@ An initial interval can be defined. The first time the module starts, will fetch
var.initial_interval: 24h # will fetch events starting 24h ago.
----

*`input.request.rate_limit.early_limit`*::

You can override the default rate-limiting behavior in <<filebeat-input-httpjson>>.
The default for the Okta module is to use up to 89% of the Okta rate-limit,
which should avoid Okta Warnings on rate-limit usage.
+
[source.yaml]
----
input.request.rate_limit.early_limit: 0.89
----

[float]
=== Example dashboard

Expand Down
1 change: 1 addition & 0 deletions x-pack/filebeat/module/okta/system/config/input.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ request.rate_limit:
limit: '[[.last_response.header.Get "X-Rate-Limit-Limit"]]'
remaining: '[[.last_response.header.Get "X-Rate-Limit-Remaining"]]'
reset: '[[.last_response.header.Get "X-Rate-Limit-Reset"]]'
early_limit: 0.89
request.transforms:
- set:
target: header.Authorization
Expand Down

0 comments on commit c77bf19

Please sign in to comment.