Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

x-pack/filebeat/input/entityanalytics/provider/okta: Rate limiting fixes #41583

Merged
merged 15 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Fix missing key in streaming input logging. {pull}41600[41600]
- Improve S3 object size metric calculation to support situations where Content-Length is not available. {pull}41755[41755]
- Fix handling of http_endpoint request exceeding memory limits. {issue}41764[41764] {pull}41765[41765]
- Rate limiting fixes in the Okta provider of the Entity Analytics input. {issue}40106[40106] {pull}41583[41583]

*Heartbeat*

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,9 @@
"io"
"net/http"
"net/url"
"path"
"strconv"
"strings"
"time"

"golang.org/x/time/rate"

"github.com/elastic/elastic-agent-libs/logp"
)

Expand Down Expand Up @@ -195,16 +191,23 @@
// https://${yourOktaDomain}/reports/rate-limit.
//
// See https://developer.okta.com/docs/reference/api/users/#list-users for details.
func GetUserDetails(ctx context.Context, cli *http.Client, host, key, user string, query url.Values, omit Response, lim *rate.Limiter, window time.Duration, log *logp.Logger) ([]User, http.Header, error) {
const endpoint = "/api/v1/users"
func GetUserDetails(ctx context.Context, cli *http.Client, host, key, user string, query url.Values, omit Response, lim RateLimiter, window time.Duration, log *logp.Logger) ([]User, http.Header, error) {
var endpoint, path string
if user == "" {
endpoint = "/api/v1/users"
path = endpoint
} else {
endpoint = "/api/v1/users/{user}"
path = strings.Replace(endpoint, "{user}", user, 1)
Comment on lines +200 to +201
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we not just use path.Join here rather than a search/replace?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do also need endpoint now, without the ID inserted, for the rate limiter.

I think it's good that the final path is derived from the endpoint pattern. If there's a change they're less likely to diverge.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't particularly like it, but OK.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. Leaving it like that.

}

u := &url.URL{
Scheme: "https",
Host: host,
Path: path.Join(endpoint, user),
Path: path,
RawQuery: query.Encode(),
}
return getDetails[User](ctx, cli, u, key, user == "", omit, lim, window, log)
return getDetails[User](ctx, cli, u, endpoint, key, user == "", omit, lim, window, log)
}

// GetUserFactors returns Okta group roles using the groups API endpoint. host is the
Expand All @@ -213,19 +216,20 @@
// See GetUserDetails for details of the query and rate limit parameters.
//
// See https://developer.okta.com/docs/api/openapi/okta-management/management/tag/UserFactor/#tag/UserFactor/operation/listFactors.
func GetUserFactors(ctx context.Context, cli *http.Client, host, key, user string, lim *rate.Limiter, window time.Duration, log *logp.Logger) ([]Factor, http.Header, error) {
const endpoint = "/api/v1/users"

func GetUserFactors(ctx context.Context, cli *http.Client, host, key, user string, lim RateLimiter, window time.Duration, log *logp.Logger) ([]Factor, http.Header, error) {
if user == "" {
return nil, nil, errors.New("no user specified")
}

const endpoint = "/api/v1/users/{user}/factors"
path := strings.Replace(endpoint, "{user}", user, 1)

u := &url.URL{
Scheme: "https",
Host: host,
Path: path.Join(endpoint, user, "factors"),
Path: path,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think keeping the use if path.Join would be better (also below).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above. Leaving it.

}
return getDetails[Factor](ctx, cli, u, key, true, OmitNone, lim, window, log)
return getDetails[Factor](ctx, cli, u, endpoint, key, true, OmitNone, lim, window, log)
}

// GetUserRoles returns Okta group roles using the groups API endpoint. host is the
Expand All @@ -234,19 +238,20 @@
// See GetUserDetails for details of the query and rate limit parameters.
//
// See https://developer.okta.com/docs/api/openapi/okta-management/management/tag/RoleAssignmentBGroup/#tag/RoleAssignmentBGroup/operation/listGroupAssignedRoles.
func GetUserRoles(ctx context.Context, cli *http.Client, host, key, user string, lim *rate.Limiter, window time.Duration, log *logp.Logger) ([]Role, http.Header, error) {
const endpoint = "/api/v1/users"

func GetUserRoles(ctx context.Context, cli *http.Client, host, key, user string, lim RateLimiter, window time.Duration, log *logp.Logger) ([]Role, http.Header, error) {
if user == "" {
return nil, nil, errors.New("no user specified")
}

const endpoint = "/api/v1/users/{user}/roles"
path := strings.Replace(endpoint, "{user}", user, 1)

u := &url.URL{
Scheme: "https",
Host: host,
Path: path.Join(endpoint, user, "roles"),
Path: path,
}
return getDetails[Role](ctx, cli, u, key, true, OmitNone, lim, window, log)
return getDetails[Role](ctx, cli, u, endpoint, key, true, OmitNone, lim, window, log)
}

// GetUserGroupDetails returns Okta group details using the users API endpoint. host is the
Expand All @@ -255,19 +260,20 @@
// See GetUserDetails for details of the query and rate limit parameters.
//
// See https://developer.okta.com/docs/reference/api/users/#request-parameters-8 (no anchor exists on the page for this endpoint) for details.
func GetUserGroupDetails(ctx context.Context, cli *http.Client, host, key, user string, lim *rate.Limiter, window time.Duration, log *logp.Logger) ([]Group, http.Header, error) {
const endpoint = "/api/v1/users"

func GetUserGroupDetails(ctx context.Context, cli *http.Client, host, key, user string, lim RateLimiter, window time.Duration, log *logp.Logger) ([]Group, http.Header, error) {
if user == "" {
return nil, nil, errors.New("no user specified")
}

const endpoint = "/api/v1/users/{user}/groups"
path := strings.Replace(endpoint, "{user}", user, 1)

u := &url.URL{
Scheme: "https",
Host: host,
Path: path.Join(endpoint, user, "groups"),
Path: path,
}
return getDetails[Group](ctx, cli, u, key, true, OmitNone, lim, window, log)
return getDetails[Group](ctx, cli, u, endpoint, key, true, OmitNone, lim, window, log)
}

// GetGroupRoles returns Okta group roles using the groups API endpoint. host is the
Expand All @@ -276,19 +282,20 @@
// See GetUserDetails for details of the query and rate limit parameters.
//
// See https://developer.okta.com/docs/api/openapi/okta-management/management/tag/RoleAssignmentBGroup/#tag/RoleAssignmentBGroup/operation/listGroupAssignedRoles.
func GetGroupRoles(ctx context.Context, cli *http.Client, host, key, group string, lim *rate.Limiter, window time.Duration, log *logp.Logger) ([]Role, http.Header, error) {
const endpoint = "/api/v1/groups"

func GetGroupRoles(ctx context.Context, cli *http.Client, host, key, group string, lim RateLimiter, window time.Duration, log *logp.Logger) ([]Role, http.Header, error) {
if group == "" {
return nil, nil, errors.New("no group specified")
}

const endpoint = "/api/v1/groups/{group}/rules"
path := strings.Replace(endpoint, "{group}", group, 1)

u := &url.URL{
Scheme: "https",
Host: host,
Path: path.Join(endpoint, group, "roles"),
Path: path,
}
return getDetails[Role](ctx, cli, u, key, true, OmitNone, lim, window, log)
return getDetails[Role](ctx, cli, u, endpoint, key, true, OmitNone, lim, window, log)
}

// GetDeviceDetails returns Okta device details using the list devices API endpoint. host is the
Expand All @@ -298,16 +305,24 @@
// See GetUserDetails for details of the query and rate limit parameters.
//
// See https://developer.okta.com/docs/api/openapi/okta-management/management/tag/Device/#tag/Device/operation/listDevices for details.
func GetDeviceDetails(ctx context.Context, cli *http.Client, host, key, device string, query url.Values, lim *rate.Limiter, window time.Duration, log *logp.Logger) ([]Device, http.Header, error) {
const endpoint = "/api/v1/devices"
func GetDeviceDetails(ctx context.Context, cli *http.Client, host, key, device string, query url.Values, lim RateLimiter, window time.Duration, log *logp.Logger) ([]Device, http.Header, error) {
var endpoint string
var path string
if device == "" {
endpoint = "/api/v1/devices"
path = endpoint
} else {
endpoint = "/api/v1/devices/{device}"
path = strings.Replace(endpoint, "{device}", device, 1)
}

u := &url.URL{
Scheme: "https",
Host: host,
Path: path.Join(endpoint, device),
Path: path,
RawQuery: query.Encode(),
}
return getDetails[Device](ctx, cli, u, key, device == "", OmitNone, lim, window, log)
return getDetails[Device](ctx, cli, u, endpoint, key, device == "", OmitNone, lim, window, log)
}

// GetDeviceUsers returns Okta user details for users associated with the provided device identifier
Expand All @@ -317,21 +332,22 @@
// See GetUserDetails for details of the query and rate limit parameters.
//
// See https://developer.okta.com/docs/api/openapi/okta-management/management/tag/Device/#tag/Device/operation/listDeviceUsers for details.
func GetDeviceUsers(ctx context.Context, cli *http.Client, host, key, device string, query url.Values, omit Response, lim *rate.Limiter, window time.Duration, log *logp.Logger) ([]User, http.Header, error) {
func GetDeviceUsers(ctx context.Context, cli *http.Client, host, key, device string, query url.Values, omit Response, lim RateLimiter, window time.Duration, log *logp.Logger) ([]User, http.Header, error) {
if device == "" {
// No user associated with a null device. Not an error.
return nil, nil, nil
}

const endpoint = "/api/v1/devices"
const endpoint = "/api/v1/devices/{device}/users"
path := strings.Replace(endpoint, "{device}", device, 1)

u := &url.URL{
Scheme: "https",
Host: host,
Path: path.Join(endpoint, device, "users"),
Path: path,
RawQuery: query.Encode(),
}
du, h, err := getDetails[devUser](ctx, cli, u, key, true, omit, lim, window, log)
du, h, err := getDetails[devUser](ctx, cli, u, endpoint, key, true, omit, lim, window, log)
if err != nil {
return nil, h, err
}
Expand All @@ -356,7 +372,7 @@
// for the specific user are returned, otherwise a list of all users is returned.
//
// See GetUserDetails for details of the query and rate limit parameters.
func getDetails[E entity](ctx context.Context, cli *http.Client, u *url.URL, key string, all bool, omit Response, lim *rate.Limiter, window time.Duration, log *logp.Logger) ([]E, http.Header, error) {
func getDetails[E entity](ctx context.Context, cli *http.Client, u *url.URL, endpoint string, key string, all bool, omit Response, lim RateLimiter, window time.Duration, log *logp.Logger) ([]E, http.Header, error) {
url := u.String()
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
Expand All @@ -370,8 +386,7 @@
req.Header.Set("Content-Type", contentType)
req.Header.Set("Authorization", fmt.Sprintf("SSWS %s", key))

log.Debugw("rate limit", "limit", lim.Limit(), "burst", lim.Burst(), "url", url)
err = lim.Wait(ctx)
err = lim.Wait(ctx, endpoint, u, log)
if err != nil {
return nil, nil, err
}
Expand All @@ -380,9 +395,9 @@
return nil, nil, err
}
defer resp.Body.Close()
err = oktaRateLimit(resp.Header, window, lim, log)
err = lim.Update(endpoint, resp.Header, window, log)
if err != nil {
io.Copy(io.Discard, resp.Body)

Check failure on line 400 in x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/okta.go

View workflow job for this annotation

GitHub Actions / lint (windows)

Error return value of `io.Copy` is not checked (errcheck)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is one of the places where the linter should just be ignored? This is awful (most errcheck complaints are).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall I add annotations to have the linter ignore things or do we just merge with those checks failing?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just ignore it. It's a bad linter and I do not think we should clutter good code with bad annotations for that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reverted.

return nil, nil, err
}

Expand Down Expand Up @@ -443,59 +458,6 @@
return fmt.Sprintf("%s: %s", summary, strings.Join(causes, ","))
}

// oktaRateLimit implements the Okta rate limit policy translation.
//
// See https://developer.okta.com/docs/reference/rl-best-practices/ for details.
func oktaRateLimit(h http.Header, window time.Duration, limiter *rate.Limiter, log *logp.Logger) error {
limit := h.Get("X-Rate-Limit-Limit")
remaining := h.Get("X-Rate-Limit-Remaining")
reset := h.Get("X-Rate-Limit-Reset")
log.Debugw("rate limit header", "X-Rate-Limit-Limit", limit, "X-Rate-Limit-Remaining", remaining, "X-Rate-Limit-Reset", reset)
if limit == "" || remaining == "" || reset == "" {
return nil
}

lim, err := strconv.ParseFloat(limit, 64)
if err != nil {
return err
}
rem, err := strconv.ParseFloat(remaining, 64)
if err != nil {
return err
}
rst, err := strconv.ParseInt(reset, 10, 64)
if err != nil {
return err
}
resetTime := time.Unix(rst, 0)
per := time.Until(resetTime).Seconds()

// Be conservative here; the docs don't exactly specify burst rates.
// Make sure we can make at least one new request, even if we fail
// to get a non-zero rate.Limit. We could set to zero for the case
// that limit=rate.Inf, but that detail is not important.
burst := 1

rateLimit := rate.Limit(rem / per)

// Process reset if we need to wait until reset to avoid a request against a zero quota.
if rateLimit <= 0 {
waitUntil := resetTime.UTC()
// next gives us a sane next window estimate, but the
// estimate will be overwritten when we make the next
// permissible API request.
next := rate.Limit(lim / window.Seconds())
limiter.SetLimitAt(waitUntil, next)
limiter.SetBurstAt(waitUntil, burst)
log.Debugw("rate limit adjust", "reset_time", waitUntil, "next_rate", next, "next_burst", burst)
return nil
}
limiter.SetLimit(rateLimit)
limiter.SetBurst(burst)
log.Debugw("rate limit adjust", "set_rate", rateLimit, "set_burst", burst)
return nil
}

// Next returns the next URL query for a pagination sequence. If no further
// page is available, Next returns io.EOF.
func Next(h http.Header) (query url.Values, err error) {
Expand Down
Loading
Loading