Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhancement httpjson rate-limit early-limit #28513

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -773,6 +773,7 @@ for a few releases. Please use other tools provided by Elastic to fetch data fro
- Release zoom module as GA. {pull}28106[28106]
- Add support for secondary object attribute handling in ThreatIntel MISP module {pull}28124[28124]
- Add `base64Decode` and `base64DecodeNoPad` functions to `httpsjon` templates. {pull}28385[28385]
- Add 'early_limit' config option for Rate-Limiting `httpjson`. Default rate-limiting for Okta will start when remaining is `1`. {pull}28513[28513]
- Add latency config option for aws-cloudwatch input. {pull}28509[28509]

*Heartbeat*
Expand Down
11 changes: 11 additions & 0 deletions filebeat/docs/modules/okta.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,17 @@ An initial interval can be defined. The first time the module starts, will fetch
var.initial_interval: 24h # will fetch events starting 24h ago.
----

*`input.request.rate_limit.early_limit`*::

You can override the default rate-limiting behavior in <<filebeat-input-httpjson>>.
The default for the Okta module is to use up to 89% of the Okta rate-limit,
which should avoid Okta Warnings on rate-limit usage.
+
[source.yaml]
----
input.request.rate_limit.early_limit: 0.89
----

[float]
=== Example dashboard

Expand Down
29 changes: 26 additions & 3 deletions x-pack/filebeat/docs/inputs/input-httpjson.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -434,19 +434,42 @@ The maximum number of redirects to follow for a request. Default: `10`.
[float]
==== `request.rate_limit.limit`

The value of the response that specifies the total limit. It is defined with a Go template value. Can read state from: [`.last_response.header`]
The value of the response that specifies the total limit. It is defined with a Go template value.
Can read state from: [`.last_response.header`]

[float]
==== `request.rate_limit.remaining`

The value of the response that specifies the remaining quota of the rate limit. It is defined with a Go template value. Can read state from: [`.last_response.header`]
The value of the response that specifies the remaining quota of the rate limit.
It is defined with a Go template value. Can read state from: [`.last_response.header`]
If the `remaining` header is missing from the Response, no rate-limiting will occur.

[float]
==== `request.rate_limit.reset`

The value of the response that specifies the epoch time when the rate limit will reset. It is defined with a Go template value. Can read state from: [`.last_response.header`]
The value of the response that specifies the epoch time when the rate limit will reset.
It is defined with a Go template value. Can read state from: [`.last_response.header`]

[[request-transforms]]
[float]
==== `request.rate_limit.early_limit`

Optionally start rate-limiting prior to the value specified in the Response.

Under the default behavior, Requests will continue while the `remaining` value is non-zero.
Specifying an `early_limit` will mean that rate-limiting will occur prior to reaching `0`.

* If the value specified for `early_limit` is less than `1`,
the value is treated as a percentage of the Response provided `limit`.
e.g. specifying `0.9` will mean that Requests will continue until reaching 90% of the rate-limit --
for a `limit` value of `120`, the rate-limit starts when the `remaining` reaches `12`.
If the `limit` header is missing from the Response, default rate-limiting will occur (when `remaining` reaches `0`).
* If the value specified for `early_limit` is greater than or equal to `1`,
the value is treated as the target value for `remaining`.
e.g. instead of rate-limiting when `remaining` hits `0`, rate-limiting will occur when `remaining` hits the value specified.

It is not set by default (by default the rate-limiting as specified in the Response is followed).

[float]
==== `request.transforms`

Expand Down
15 changes: 12 additions & 3 deletions x-pack/filebeat/input/httpjson/config_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,18 @@ func (c retryConfig) getWaitMax() time.Duration {
}

type rateLimitConfig struct {
Limit *valueTpl `config:"limit"`
Reset *valueTpl `config:"reset"`
Remaining *valueTpl `config:"remaining"`
Limit *valueTpl `config:"limit"`
Reset *valueTpl `config:"reset"`
Remaining *valueTpl `config:"remaining"`
EarlyLimit *float64 `config:"early_limit"`
}

func (c rateLimitConfig) Validate() error {
switch {
case c.EarlyLimit != nil && *c.EarlyLimit < 0:
return errors.New("early_limit must be greater than or equal to 0")
}
return nil
}

type urlConfig struct {
Expand Down
37 changes: 29 additions & 8 deletions x-pack/filebeat/input/httpjson/rate_limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ import (
type rateLimiter struct {
log *logp.Logger

limit *valueTpl
reset *valueTpl
remaining *valueTpl
limit *valueTpl
reset *valueTpl
remaining *valueTpl
earlyLimit *float64
}

func newRateLimiterFromConfig(config *rateLimitConfig, log *logp.Logger) *rateLimiter {
Expand All @@ -29,10 +30,11 @@ func newRateLimiterFromConfig(config *rateLimitConfig, log *logp.Logger) *rateLi
}

return &rateLimiter{
log: log,
limit: config.Limit,
reset: config.Reset,
remaining: config.Remaining,
log: log,
limit: config.Limit,
reset: config.Reset,
remaining: config.Remaining,
earlyLimit: config.EarlyLimit,
}
}

Expand Down Expand Up @@ -114,7 +116,26 @@ func (r *rateLimiter) getRateLimit(resp *http.Response) (int64, error) {
return 0, fmt.Errorf("failed to parse rate-limit remaining value: %w", err)
}

if m != 0 {
// by default, httpjson will continue requests until Limit is 0
// can optionally stop requests "early"
var activeLimit int64 = 0
if r.earlyLimit != nil {
var earlyLimit float64 = *r.earlyLimit
if earlyLimit > 0 && earlyLimit < 1 {
limit, _ := r.limit.Execute(ctx, tr, nil, r.log)
if limit != "" {
l, err := strconv.ParseInt(limit, 10, 64)
if err == nil {
activeLimit = l - int64(earlyLimit*float64(l))
}
}
} else if earlyLimit >= 1 {
activeLimit = int64(earlyLimit)
}
}

r.log.Debugf("Rate Limit: Using active Early Limit: %f", activeLimit)
if m > activeLimit {
return 0, nil
}

Expand Down
152 changes: 152 additions & 0 deletions x-pack/filebeat/input/httpjson/rate_limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,3 +88,155 @@ func TestGetRateLimitReturnsResetValue(t *testing.T) {
assert.NoError(t, err)
assert.EqualValues(t, 1604582832, epoch2)
}

// Test getRateLimit function with a remaining quota, using default early limit
// expect to receive 0, nil.
func TestGetRateLimitReturns0IfEarlyLimit0(t *testing.T) {
resetEpoch := int64(1634579974 + 100)
timeNow = func() time.Time { return time.Unix(1634579974, 0).UTC() }
t.Cleanup(func() { timeNow = time.Now })

header := make(http.Header)
header.Add("X-Rate-Limit-Limit", "120")
header.Add("X-Rate-Limit-Remaining", "1")
header.Add("X-Rate-Limit-Reset", strconv.FormatInt(resetEpoch, 10))
tplLimit := &valueTpl{}
tplReset := &valueTpl{}
tplRemaining := &valueTpl{}
earlyLimit := func(i float64) *float64 { return &i }(0)
assert.NoError(t, tplLimit.Unpack(`[[.last_response.header.Get "X-Rate-Limit-Limit"]]`))
assert.NoError(t, tplReset.Unpack(`[[.last_response.header.Get "X-Rate-Limit-Reset"]]`))
assert.NoError(t, tplRemaining.Unpack(`[[.last_response.header.Get "X-Rate-Limit-Remaining"]]`))
rateLimit := &rateLimiter{
limit: tplLimit,
reset: tplReset,
remaining: tplRemaining,
log: logp.NewLogger("TestGetRateLimitReturns0IfEarlyLimit0"),
earlyLimit: earlyLimit,
}
resp := &http.Response{Header: header}
epoch, err := rateLimit.getRateLimit(resp)
assert.NoError(t, err)
assert.EqualValues(t, 0, epoch)
}

// Test getRateLimit function with a remaining limit, but early limit
// expect to receive Reset Time
func TestGetRateLimitReturnsResetValueIfEarlyLimit1(t *testing.T) {
resetEpoch := int64(1634579974 + 100)
timeNow = func() time.Time { return time.Unix(1634579974, 0).UTC() }
t.Cleanup(func() { timeNow = time.Now })

header := make(http.Header)
header.Add("X-Rate-Limit-Limit", "120")
header.Add("X-Rate-Limit-Remaining", "1")
header.Add("X-Rate-Limit-Reset", strconv.FormatInt(resetEpoch, 10))
tplLimit := &valueTpl{}
tplReset := &valueTpl{}
tplRemaining := &valueTpl{}
earlyLimit := func(i float64) *float64 { return &i }(1)
assert.NoError(t, tplLimit.Unpack(`[[.last_response.header.Get "X-Rate-Limit-Limit"]]`))
assert.NoError(t, tplReset.Unpack(`[[.last_response.header.Get "X-Rate-Limit-Reset"]]`))
assert.NoError(t, tplRemaining.Unpack(`[[.last_response.header.Get "X-Rate-Limit-Remaining"]]`))
rateLimit := &rateLimiter{
limit: tplLimit,
reset: tplReset,
remaining: tplRemaining,
log: logp.NewLogger("TestGetRateLimitReturnsResetValueIfEarlyLimit1"),
earlyLimit: earlyLimit,
}
resp := &http.Response{Header: header}
epoch, err := rateLimit.getRateLimit(resp)
assert.NoError(t, err)
assert.EqualValues(t, resetEpoch, epoch)
}

// Test getRateLimit function with a remaining quota, using 90% early limit
// expect to receive 0, nil.
func TestGetRateLimitReturns0IfEarlyLimitPercent(t *testing.T) {
resetEpoch := int64(1634579974 + 100)
timeNow = func() time.Time { return time.Unix(1634579974, 0).UTC() }
t.Cleanup(func() { timeNow = time.Now })

header := make(http.Header)
header.Add("X-Rate-Limit-Limit", "120")
header.Add("X-Rate-Limit-Remaining", "13")
header.Add("X-Rate-Limit-Reset", strconv.FormatInt(resetEpoch, 10))
tplLimit := &valueTpl{}
tplReset := &valueTpl{}
tplRemaining := &valueTpl{}
earlyLimit := func(i float64) *float64 { return &i }(0.9)
assert.NoError(t, tplLimit.Unpack(`[[.last_response.header.Get "X-Rate-Limit-Limit"]]`))
assert.NoError(t, tplReset.Unpack(`[[.last_response.header.Get "X-Rate-Limit-Reset"]]`))
assert.NoError(t, tplRemaining.Unpack(`[[.last_response.header.Get "X-Rate-Limit-Remaining"]]`))
rateLimit := &rateLimiter{
limit: tplLimit,
reset: tplReset,
remaining: tplRemaining,
log: logp.NewLogger("TestGetRateLimitReturns0IfEarlyLimitPercent"),
earlyLimit: earlyLimit,
}
resp := &http.Response{Header: header}
epoch, err := rateLimit.getRateLimit(resp)
assert.NoError(t, err)
assert.EqualValues(t, 0, epoch)
}

// Test getRateLimit function with a remaining limit, but early limit of 90%
// expect to receive Reset Time
func TestGetRateLimitReturnsResetValueIfEarlyLimitPercent(t *testing.T) {
resetEpoch := int64(1634579974 + 100)
timeNow = func() time.Time { return time.Unix(1634579974, 0).UTC() }
t.Cleanup(func() { timeNow = time.Now })

header := make(http.Header)
header.Add("X-Rate-Limit-Limit", "120")
header.Add("X-Rate-Limit-Remaining", "12")
header.Add("X-Rate-Limit-Reset", strconv.FormatInt(resetEpoch, 10))
tplLimit := &valueTpl{}
tplReset := &valueTpl{}
tplRemaining := &valueTpl{}
earlyLimit := func(i float64) *float64 { return &i }(0.9)
assert.NoError(t, tplLimit.Unpack(`[[.last_response.header.Get "X-Rate-Limit-Limit"]]`))
assert.NoError(t, tplReset.Unpack(`[[.last_response.header.Get "X-Rate-Limit-Reset"]]`))
assert.NoError(t, tplRemaining.Unpack(`[[.last_response.header.Get "X-Rate-Limit-Remaining"]]`))
rateLimit := &rateLimiter{
limit: tplLimit,
reset: tplReset,
remaining: tplRemaining,
log: logp.NewLogger("TestGetRateLimitReturnsResetValueIfEarlyLimitPercent"),
earlyLimit: earlyLimit,
}
resp := &http.Response{Header: header}
epoch, err := rateLimit.getRateLimit(resp)
assert.NoError(t, err)
assert.EqualValues(t, resetEpoch, epoch)
}

// Test getRateLimit function when "Limit" header is missing, when using a Percentage early-limit
// expect to receive 0, nil. (default rate-limiting)
func TestGetRateLimitWhenMissingLimit(t *testing.T) {
resetEpoch := int64(1634579974 + 100)
timeNow = func() time.Time { return time.Unix(1634579974, 0).UTC() }
t.Cleanup(func() { timeNow = time.Now })

header := make(http.Header)
header.Add("X-Rate-Limit-Remaining", "1")
header.Add("X-Rate-Limit-Reset", strconv.FormatInt(resetEpoch, 10))
tplReset := &valueTpl{}
tplRemaining := &valueTpl{}
earlyLimit := func(i float64) *float64 { return &i }(0.9)
assert.NoError(t, tplReset.Unpack(`[[.last_response.header.Get "X-Rate-Limit-Reset"]]`))
assert.NoError(t, tplRemaining.Unpack(`[[.last_response.header.Get "X-Rate-Limit-Remaining"]]`))
rateLimit := &rateLimiter{
limit: nil,
reset: tplReset,
remaining: tplRemaining,
log: logp.NewLogger("TestGetRateLimitWhenMissingLimit"),
earlyLimit: earlyLimit,
}
resp := &http.Response{Header: header}
epoch, err := rateLimit.getRateLimit(resp)
assert.NoError(t, err)
assert.EqualValues(t, 0, epoch)
}
11 changes: 11 additions & 0 deletions x-pack/filebeat/module/okta/_meta/docs.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,17 @@ An initial interval can be defined. The first time the module starts, will fetch
var.initial_interval: 24h # will fetch events starting 24h ago.
----

*`input.request.rate_limit.early_limit`*::

You can override the default rate-limiting behavior in <<filebeat-input-httpjson>>.
The default for the Okta module is to use up to 89% of the Okta rate-limit,
which should avoid Okta Warnings on rate-limit usage.
+
[source.yaml]
----
input.request.rate_limit.early_limit: 0.89
----

[float]
=== Example dashboard

Expand Down
1 change: 1 addition & 0 deletions x-pack/filebeat/module/okta/system/config/input.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ request.rate_limit:
limit: '[[.last_response.header.Get "X-Rate-Limit-Limit"]]'
remaining: '[[.last_response.header.Get "X-Rate-Limit-Remaining"]]'
reset: '[[.last_response.header.Get "X-Rate-Limit-Reset"]]'
early_limit: 0.89
request.transforms:
- set:
target: header.Authorization
Expand Down