-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
x-pack/filebeat/input/entityanalytics/provider/okta: Rate limiting fixes #41583
Changes from all commits
81faf96
033fd48
b13d02e
1ee6a58
4648f7e
41d4ec3
b55b47e
c55ade6
1d65a09
c667ab9
ab7232a
5066f31
350cf46
25ac97a
13a79f3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,13 +14,9 @@ | |
"io" | ||
"net/http" | ||
"net/url" | ||
"path" | ||
"strconv" | ||
"strings" | ||
"time" | ||
|
||
"golang.org/x/time/rate" | ||
|
||
"github.com/elastic/elastic-agent-libs/logp" | ||
) | ||
|
||
|
@@ -195,16 +191,23 @@ | |
// https://${yourOktaDomain}/reports/rate-limit. | ||
// | ||
// See https://developer.okta.com/docs/reference/api/users/#list-users for details. | ||
func GetUserDetails(ctx context.Context, cli *http.Client, host, key, user string, query url.Values, omit Response, lim *rate.Limiter, window time.Duration, log *logp.Logger) ([]User, http.Header, error) { | ||
const endpoint = "/api/v1/users" | ||
func GetUserDetails(ctx context.Context, cli *http.Client, host, key, user string, query url.Values, omit Response, lim RateLimiter, window time.Duration, log *logp.Logger) ([]User, http.Header, error) { | ||
var endpoint, path string | ||
if user == "" { | ||
endpoint = "/api/v1/users" | ||
path = endpoint | ||
} else { | ||
endpoint = "/api/v1/users/{user}" | ||
path = strings.Replace(endpoint, "{user}", user, 1) | ||
} | ||
|
||
u := &url.URL{ | ||
Scheme: "https", | ||
Host: host, | ||
Path: path.Join(endpoint, user), | ||
Path: path, | ||
RawQuery: query.Encode(), | ||
} | ||
return getDetails[User](ctx, cli, u, key, user == "", omit, lim, window, log) | ||
return getDetails[User](ctx, cli, u, endpoint, key, user == "", omit, lim, window, log) | ||
} | ||
|
||
// GetUserFactors returns Okta group roles using the groups API endpoint. host is the | ||
|
@@ -213,19 +216,20 @@ | |
// See GetUserDetails for details of the query and rate limit parameters. | ||
// | ||
// See https://developer.okta.com/docs/api/openapi/okta-management/management/tag/UserFactor/#tag/UserFactor/operation/listFactors. | ||
func GetUserFactors(ctx context.Context, cli *http.Client, host, key, user string, lim *rate.Limiter, window time.Duration, log *logp.Logger) ([]Factor, http.Header, error) { | ||
const endpoint = "/api/v1/users" | ||
|
||
func GetUserFactors(ctx context.Context, cli *http.Client, host, key, user string, lim RateLimiter, window time.Duration, log *logp.Logger) ([]Factor, http.Header, error) { | ||
if user == "" { | ||
return nil, nil, errors.New("no user specified") | ||
} | ||
|
||
const endpoint = "/api/v1/users/{user}/factors" | ||
path := strings.Replace(endpoint, "{user}", user, 1) | ||
|
||
u := &url.URL{ | ||
Scheme: "https", | ||
Host: host, | ||
Path: path.Join(endpoint, user, "factors"), | ||
Path: path, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think keeping the use if There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As above. Leaving it. |
||
} | ||
return getDetails[Factor](ctx, cli, u, key, true, OmitNone, lim, window, log) | ||
return getDetails[Factor](ctx, cli, u, endpoint, key, true, OmitNone, lim, window, log) | ||
} | ||
|
||
// GetUserRoles returns Okta group roles using the groups API endpoint. host is the | ||
|
@@ -234,19 +238,20 @@ | |
// See GetUserDetails for details of the query and rate limit parameters. | ||
// | ||
// See https://developer.okta.com/docs/api/openapi/okta-management/management/tag/RoleAssignmentBGroup/#tag/RoleAssignmentBGroup/operation/listGroupAssignedRoles. | ||
func GetUserRoles(ctx context.Context, cli *http.Client, host, key, user string, lim *rate.Limiter, window time.Duration, log *logp.Logger) ([]Role, http.Header, error) { | ||
const endpoint = "/api/v1/users" | ||
|
||
func GetUserRoles(ctx context.Context, cli *http.Client, host, key, user string, lim RateLimiter, window time.Duration, log *logp.Logger) ([]Role, http.Header, error) { | ||
if user == "" { | ||
return nil, nil, errors.New("no user specified") | ||
} | ||
|
||
const endpoint = "/api/v1/users/{user}/roles" | ||
path := strings.Replace(endpoint, "{user}", user, 1) | ||
|
||
u := &url.URL{ | ||
Scheme: "https", | ||
Host: host, | ||
Path: path.Join(endpoint, user, "roles"), | ||
Path: path, | ||
} | ||
return getDetails[Role](ctx, cli, u, key, true, OmitNone, lim, window, log) | ||
return getDetails[Role](ctx, cli, u, endpoint, key, true, OmitNone, lim, window, log) | ||
} | ||
|
||
// GetUserGroupDetails returns Okta group details using the users API endpoint. host is the | ||
|
@@ -255,19 +260,20 @@ | |
// See GetUserDetails for details of the query and rate limit parameters. | ||
// | ||
// See https://developer.okta.com/docs/reference/api/users/#request-parameters-8 (no anchor exists on the page for this endpoint) for details. | ||
func GetUserGroupDetails(ctx context.Context, cli *http.Client, host, key, user string, lim *rate.Limiter, window time.Duration, log *logp.Logger) ([]Group, http.Header, error) { | ||
const endpoint = "/api/v1/users" | ||
|
||
func GetUserGroupDetails(ctx context.Context, cli *http.Client, host, key, user string, lim RateLimiter, window time.Duration, log *logp.Logger) ([]Group, http.Header, error) { | ||
if user == "" { | ||
return nil, nil, errors.New("no user specified") | ||
} | ||
|
||
const endpoint = "/api/v1/users/{user}/groups" | ||
path := strings.Replace(endpoint, "{user}", user, 1) | ||
|
||
u := &url.URL{ | ||
Scheme: "https", | ||
Host: host, | ||
Path: path.Join(endpoint, user, "groups"), | ||
Path: path, | ||
} | ||
return getDetails[Group](ctx, cli, u, key, true, OmitNone, lim, window, log) | ||
return getDetails[Group](ctx, cli, u, endpoint, key, true, OmitNone, lim, window, log) | ||
} | ||
|
||
// GetGroupRoles returns Okta group roles using the groups API endpoint. host is the | ||
|
@@ -276,19 +282,20 @@ | |
// See GetUserDetails for details of the query and rate limit parameters. | ||
// | ||
// See https://developer.okta.com/docs/api/openapi/okta-management/management/tag/RoleAssignmentBGroup/#tag/RoleAssignmentBGroup/operation/listGroupAssignedRoles. | ||
func GetGroupRoles(ctx context.Context, cli *http.Client, host, key, group string, lim *rate.Limiter, window time.Duration, log *logp.Logger) ([]Role, http.Header, error) { | ||
const endpoint = "/api/v1/groups" | ||
|
||
func GetGroupRoles(ctx context.Context, cli *http.Client, host, key, group string, lim RateLimiter, window time.Duration, log *logp.Logger) ([]Role, http.Header, error) { | ||
if group == "" { | ||
return nil, nil, errors.New("no group specified") | ||
} | ||
|
||
const endpoint = "/api/v1/groups/{group}/rules" | ||
path := strings.Replace(endpoint, "{group}", group, 1) | ||
|
||
u := &url.URL{ | ||
Scheme: "https", | ||
Host: host, | ||
Path: path.Join(endpoint, group, "roles"), | ||
Path: path, | ||
} | ||
return getDetails[Role](ctx, cli, u, key, true, OmitNone, lim, window, log) | ||
return getDetails[Role](ctx, cli, u, endpoint, key, true, OmitNone, lim, window, log) | ||
} | ||
|
||
// GetDeviceDetails returns Okta device details using the list devices API endpoint. host is the | ||
|
@@ -298,16 +305,24 @@ | |
// See GetUserDetails for details of the query and rate limit parameters. | ||
// | ||
// See https://developer.okta.com/docs/api/openapi/okta-management/management/tag/Device/#tag/Device/operation/listDevices for details. | ||
func GetDeviceDetails(ctx context.Context, cli *http.Client, host, key, device string, query url.Values, lim *rate.Limiter, window time.Duration, log *logp.Logger) ([]Device, http.Header, error) { | ||
const endpoint = "/api/v1/devices" | ||
func GetDeviceDetails(ctx context.Context, cli *http.Client, host, key, device string, query url.Values, lim RateLimiter, window time.Duration, log *logp.Logger) ([]Device, http.Header, error) { | ||
var endpoint string | ||
var path string | ||
if device == "" { | ||
endpoint = "/api/v1/devices" | ||
path = endpoint | ||
} else { | ||
endpoint = "/api/v1/devices/{device}" | ||
path = strings.Replace(endpoint, "{device}", device, 1) | ||
} | ||
|
||
u := &url.URL{ | ||
Scheme: "https", | ||
Host: host, | ||
Path: path.Join(endpoint, device), | ||
Path: path, | ||
RawQuery: query.Encode(), | ||
} | ||
return getDetails[Device](ctx, cli, u, key, device == "", OmitNone, lim, window, log) | ||
return getDetails[Device](ctx, cli, u, endpoint, key, device == "", OmitNone, lim, window, log) | ||
} | ||
|
||
// GetDeviceUsers returns Okta user details for users associated with the provided device identifier | ||
|
@@ -317,21 +332,22 @@ | |
// See GetUserDetails for details of the query and rate limit parameters. | ||
// | ||
// See https://developer.okta.com/docs/api/openapi/okta-management/management/tag/Device/#tag/Device/operation/listDeviceUsers for details. | ||
func GetDeviceUsers(ctx context.Context, cli *http.Client, host, key, device string, query url.Values, omit Response, lim *rate.Limiter, window time.Duration, log *logp.Logger) ([]User, http.Header, error) { | ||
func GetDeviceUsers(ctx context.Context, cli *http.Client, host, key, device string, query url.Values, omit Response, lim RateLimiter, window time.Duration, log *logp.Logger) ([]User, http.Header, error) { | ||
if device == "" { | ||
// No user associated with a null device. Not an error. | ||
return nil, nil, nil | ||
} | ||
|
||
const endpoint = "/api/v1/devices" | ||
const endpoint = "/api/v1/devices/{device}/users" | ||
path := strings.Replace(endpoint, "{device}", device, 1) | ||
|
||
u := &url.URL{ | ||
Scheme: "https", | ||
Host: host, | ||
Path: path.Join(endpoint, device, "users"), | ||
Path: path, | ||
RawQuery: query.Encode(), | ||
} | ||
du, h, err := getDetails[devUser](ctx, cli, u, key, true, omit, lim, window, log) | ||
du, h, err := getDetails[devUser](ctx, cli, u, endpoint, key, true, omit, lim, window, log) | ||
if err != nil { | ||
return nil, h, err | ||
} | ||
|
@@ -356,7 +372,7 @@ | |
// for the specific user are returned, otherwise a list of all users is returned. | ||
// | ||
// See GetUserDetails for details of the query and rate limit parameters. | ||
func getDetails[E entity](ctx context.Context, cli *http.Client, u *url.URL, key string, all bool, omit Response, lim *rate.Limiter, window time.Duration, log *logp.Logger) ([]E, http.Header, error) { | ||
func getDetails[E entity](ctx context.Context, cli *http.Client, u *url.URL, endpoint string, key string, all bool, omit Response, lim RateLimiter, window time.Duration, log *logp.Logger) ([]E, http.Header, error) { | ||
url := u.String() | ||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) | ||
if err != nil { | ||
|
@@ -370,8 +386,7 @@ | |
req.Header.Set("Content-Type", contentType) | ||
req.Header.Set("Authorization", fmt.Sprintf("SSWS %s", key)) | ||
|
||
log.Debugw("rate limit", "limit", lim.Limit(), "burst", lim.Burst(), "url", url) | ||
err = lim.Wait(ctx) | ||
err = lim.Wait(ctx, endpoint, u, log) | ||
if err != nil { | ||
return nil, nil, err | ||
} | ||
|
@@ -380,9 +395,9 @@ | |
return nil, nil, err | ||
} | ||
defer resp.Body.Close() | ||
err = oktaRateLimit(resp.Header, window, lim, log) | ||
err = lim.Update(endpoint, resp.Header, window, log) | ||
if err != nil { | ||
io.Copy(io.Discard, resp.Body) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is one of the places where the linter should just be ignored? This is awful (most errcheck complaints are). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shall I add annotations to have the linter ignore things or do we just merge with those checks failing? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I just ignore it. It's a bad linter and I do not think we should clutter good code with bad annotations for that. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Reverted. |
||
return nil, nil, err | ||
} | ||
|
||
|
@@ -443,59 +458,6 @@ | |
return fmt.Sprintf("%s: %s", summary, strings.Join(causes, ",")) | ||
} | ||
|
||
// oktaRateLimit implements the Okta rate limit policy translation. | ||
// | ||
// See https://developer.okta.com/docs/reference/rl-best-practices/ for details. | ||
func oktaRateLimit(h http.Header, window time.Duration, limiter *rate.Limiter, log *logp.Logger) error { | ||
limit := h.Get("X-Rate-Limit-Limit") | ||
remaining := h.Get("X-Rate-Limit-Remaining") | ||
reset := h.Get("X-Rate-Limit-Reset") | ||
log.Debugw("rate limit header", "X-Rate-Limit-Limit", limit, "X-Rate-Limit-Remaining", remaining, "X-Rate-Limit-Reset", reset) | ||
if limit == "" || remaining == "" || reset == "" { | ||
return nil | ||
} | ||
|
||
lim, err := strconv.ParseFloat(limit, 64) | ||
if err != nil { | ||
return err | ||
} | ||
rem, err := strconv.ParseFloat(remaining, 64) | ||
if err != nil { | ||
return err | ||
} | ||
rst, err := strconv.ParseInt(reset, 10, 64) | ||
if err != nil { | ||
return err | ||
} | ||
resetTime := time.Unix(rst, 0) | ||
per := time.Until(resetTime).Seconds() | ||
|
||
// Be conservative here; the docs don't exactly specify burst rates. | ||
// Make sure we can make at least one new request, even if we fail | ||
// to get a non-zero rate.Limit. We could set to zero for the case | ||
// that limit=rate.Inf, but that detail is not important. | ||
burst := 1 | ||
|
||
rateLimit := rate.Limit(rem / per) | ||
|
||
// Process reset if we need to wait until reset to avoid a request against a zero quota. | ||
if rateLimit <= 0 { | ||
waitUntil := resetTime.UTC() | ||
// next gives us a sane next window estimate, but the | ||
// estimate will be overwritten when we make the next | ||
// permissible API request. | ||
next := rate.Limit(lim / window.Seconds()) | ||
limiter.SetLimitAt(waitUntil, next) | ||
limiter.SetBurstAt(waitUntil, burst) | ||
log.Debugw("rate limit adjust", "reset_time", waitUntil, "next_rate", next, "next_burst", burst) | ||
return nil | ||
} | ||
limiter.SetLimit(rateLimit) | ||
limiter.SetBurst(burst) | ||
log.Debugw("rate limit adjust", "set_rate", rateLimit, "set_burst", burst) | ||
return nil | ||
} | ||
|
||
// Next returns the next URL query for a pagination sequence. If no further | ||
// page is available, Next returns io.EOF. | ||
func Next(h http.Header) (query url.Values, err error) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we not just use
path.Join
here rather than a search/replace?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do also need
endpoint
now, without the ID inserted, for the rate limiter.I think it's good that the final
path
is derived from theendpoint
pattern. If there's a change they're less likely to diverge.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't particularly like it, but OK.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. Leaving it like that.