Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update filebeat httpjson input to support pagination via Header and Okta module #16354

Merged
merged 21 commits into from
Mar 11, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 32 additions & 1 deletion x-pack/filebeat/docs/inputs/input-httpjson.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -91,17 +91,32 @@ Time duration between repeated data retrievals. Default: 0s, meaning no repeated
If the HTTP API returns data in a JSON array, then this option can be set to decode these records
from the array. Default: not used.

[float]
==== `no_http_body`

If set, do not use HTTP request body. Default: false.

[float]
==== `pagination.enabled`

This option specifies whether pagination is enabled. Default: false.
This option specifies whether pagination is enabled. Default: true.

[float]
==== `pagination.extra_body_content`

Any additional data that needs to be set in the HTTP pagination request can be specified in
this JSON blob. Default: not used.

[float]
==== `pagination.header.field_name`

The field name in the HTTP Header that is used for pagination control.

[float]
==== `pagination.header.regex_pattern`

The regular expression pattern to use for retrieving the pagination information from the HTTP Header field specified above.

[float]
==== `pagination.id_field`

Expand All @@ -120,6 +135,22 @@ Required when pagination is enabled.
This specifies the URL for sending pagination request. Required if the pagination URL is different
than the HTTP API URL.

[float]
==== `rate_limit.limit`

This specifies the field in the HTTP Header of the response that specifies the total limit.

[float]
==== `rate_limit.remaining`

This specifies the field in the HTTP Header of the response that specifies the remaining quota of the rate limit.

[float]
==== `rate_limit.reset`

This specifies the field in the HTTP Header of the response that specifies the epoch time
when the rate limit will be reset.

[float]
==== `ssl`

Expand Down
35 changes: 31 additions & 4 deletions x-pack/filebeat/input/httpjson/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package httpjson

import (
"regexp"
"strings"
"time"

Expand Down Expand Up @@ -33,18 +34,24 @@ type config struct {

// Pagination contains information about httpjson pagination settings
type Pagination struct {
IsEnabled bool `config:"enabled"`
Enabled *bool `config:"enabled"`
ExtraBodyContent common.MapStr `config:"extra_body_content"`
Header *Header `config:"header"`
IDField string `config:"id_field"`
RequestField string `config:"req_field"`
URL string `config:"url"`
}

// IsEnabled returns true if the `enable` field is set to true in the yaml.
func (p *Pagination) IsEnabled() bool {
return p != nil && (p.Enabled == nil || *p.Enabled)
}

// HTTP Header information for pagination
type Header struct {
FieldName string `config:"field_name"`
RegexPattern string `config:"regex_pattern"`
FieldName string `config:"field_name" validate:"required"`
RegexPattern string `config:"regex_pattern" validate:"required"`
alakahakai marked this conversation as resolved.
Show resolved Hide resolved
re *regexp.Regexp
}

// HTTP Header Rate Limit information
Expand All @@ -61,7 +68,27 @@ func (c *config) Validate() error {
case "POST":
break
default:
return errors.Errorf("httpjson input: Invalid http_method, %s - ", c.HTTPMethod)
return errors.Errorf("httpjson input: Invalid http_method, %s ", c.HTTPMethod)
alakahakai marked this conversation as resolved.
Show resolved Hide resolved
}
if c.NoHTTPBody {
if len(c.HTTPRequestBody) > 0 {
return errors.Errorf("invalid configuration: both np_http_bpdy and http_request_body cannot be set simultaneously")
alakahakai marked this conversation as resolved.
Show resolved Hide resolved
}
if c.Pagination != nil && (len(c.Pagination.ExtraBodyContent) > 0 || c.Pagination.RequestField != "") {
return errors.Errorf("invalid configuration: both no_http_body and pagination.extra_body_content or pagination.req_field cannot be set simultaneously")
}
}
if c.Pagination != nil {
if c.Pagination.Header != nil {
if c.Pagination.RequestField != "" || c.Pagination.IDField != "" {
return errors.Errorf("invalid configuration: both pagination.header and pagination.req_field or pagination.id_field cannot be set simultaneously")
}
re, err := regexp.Compile(c.Pagination.Header.RegexPattern)
if err != nil {
return err
}
c.Pagination.Header.re = re
}
}
return nil
}
Expand Down
68 changes: 39 additions & 29 deletions x-pack/filebeat/input/httpjson/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"regexp"
Expand Down Expand Up @@ -129,10 +129,15 @@ func (in *httpjsonInput) Run() {

// createHTTPRequest creates an HTTP/HTTPs request for the input
func (in *httpjsonInput) createHTTPRequest(ctx context.Context, ri *requestInfo) (*http.Request, error) {
b, _ := json.Marshal(ri.ContentMap)
body := bytes.NewReader(b)
if in.config.NoHTTPBody {
body = bytes.NewReader([]byte{})
var body io.Reader
if len(ri.ContentMap) == 0 || in.config.NoHTTPBody {
body = nil
} else {
b, err := json.Marshal(ri.ContentMap)
if err != nil {
return nil, err
}
body = bytes.NewReader(b)
}
req, err := http.NewRequest(in.config.HTTPMethod, ri.URL, body)
if err != nil {
Expand All @@ -144,7 +149,7 @@ func (in *httpjsonInput) createHTTPRequest(ctx context.Context, ri *requestInfo)
req.Header.Set("User-Agent", userAgent)
if in.config.APIKey != "" {
if in.config.AuthenticationScheme != "" {
req.Header.Set("Authorization", fmt.Sprintf("%s %s", in.config.AuthenticationScheme, in.config.APIKey))
req.Header.Set("Authorization", in.config.AuthenticationScheme+" "+in.config.APIKey)
} else {
req.Header.Set("Authorization", in.config.APIKey)
}
Expand All @@ -159,7 +164,7 @@ func (in *httpjsonInput) createHTTPRequest(ctx context.Context, ri *requestInfo)
return req, nil
}

// processEvent processes an array of events
// processEventArray publishes an event for each object contained in the array. It returns the last object in the array and an error if any.
func (in *httpjsonInput) processEventArray(events []interface{}) (map[string]interface{}, error) {
var m map[string]interface{}
for _, t := range events {
Expand All @@ -175,18 +180,14 @@ func (in *httpjsonInput) processEventArray(events []interface{}) (map[string]int
return nil, errors.New("function OnEvent returned false")
}
default:
return nil, errors.Errorf("invalid JSON object")
return nil, errors.Errorf("expected only JSON objects in the array but got a %T", v)
}
}
return m, nil
}

// getNextLinkFromHeader retrieves the next URL for pagination from the HTTP Header of the response
func getNextLinkFromHeader(header http.Header, fieldName string, regexPattern string) (string, error) {
re, err := regexp.Compile(regexPattern)
if err != nil {
return "", err
}
func getNextLinkFromHeader(header http.Header, fieldName string, re *regexp.Regexp) (string, error) {
links, ok := header[fieldName]
if !ok {
return "", errors.Errorf("field %s does not exist in the HTTP Header", fieldName)
Expand All @@ -201,7 +202,7 @@ func getNextLinkFromHeader(header http.Header, fieldName string, regexPattern st
}

// applyRateLimit applies appropriate rate limit if specified in the HTTP Header of the response
func (in *httpjsonInput) applyRateLimit(header http.Header, rateLimit *RateLimit) error {
func (in *httpjsonInput) applyRateLimit(ctx context.Context, header http.Header, rateLimit *RateLimit) error {
if rateLimit != nil {
if rateLimit.Remaining != "" {
remaining := header.Get(rateLimit.Remaining)
Expand All @@ -223,8 +224,17 @@ func (in *httpjsonInput) applyRateLimit(header http.Header, rateLimit *RateLimit
return errors.Wrapf(err, "failed to parse rate-limit reset value")
}
t := time.Unix(epoch, 0)
in.log.Debugw("Rate Limit: Wait until %v for the rate limit to reset.", t)
time.Sleep(time.Until(t))
in.log.Debugf("Rate Limit: Wait until %v for the rate limit to reset.", t)
ticker := time.NewTicker(time.Until(t))
defer ticker.Stop()
for {
select {
case <-ctx.Done():
in.log.Info("Context done.")
case <-ticker.C:
in.log.Debug("Rate Limit: time is up.")
}
}
}
}
}
Expand All @@ -249,14 +259,14 @@ func (in *httpjsonInput) processHTTPRequest(ctx context.Context, client *http.Cl
return errors.Wrapf(err, "failed to read http.response.body")
}
if msg.StatusCode != http.StatusOK {
in.log.Debugw("HTTP request failed", "http.response.status_code", msg.StatusCode, "http.response.body", string(responseData))
in.log.Debug("HTTP request failed", "http.response.status_code", msg.StatusCode, "http.response.body", string(responseData))
alakahakai marked this conversation as resolved.
Show resolved Hide resolved
return errors.Errorf("http request was unsuccessful with a status code %d", msg.StatusCode)
}
var m, v interface{}
var mm map[string]interface{}
err = json.Unmarshal(responseData, &m)
if err != nil {
in.log.Debugw("failed to unmarshal http.response.body", string(responseData))
in.log.Debug("failed to unmarshal http.response.body", string(responseData))
return errors.Wrapf(err, "failed to unmarshal http.response.body")
}
switch obj := m.(type) {
Expand Down Expand Up @@ -288,14 +298,14 @@ func (in *httpjsonInput) processHTTPRequest(ctx context.Context, client *http.Cl
}
}
default:
in.log.Debugw("http.response.body is not valid JSON", string(responseData))
return errors.New("http.response.body is not valid JSON")
in.log.Debugf("http.response.body is not a valid JSON object", string(responseData))
alakahakai marked this conversation as resolved.
Show resolved Hide resolved
return errors.Errorf("http.response.body is not a valid JSON object, but a %T", obj)
}

if mm != nil && in.config.Pagination != nil && in.config.Pagination.IsEnabled {
if mm != nil && in.config.Pagination != nil && in.config.Pagination.IsEnabled() {
if in.config.Pagination.Header != nil {
// Pagination control using HTTP Header
url, err := getNextLinkFromHeader(header, in.config.Pagination.Header.FieldName, in.config.Pagination.Header.RegexPattern)
url, err := getNextLinkFromHeader(header, in.config.Pagination.Header.FieldName, in.config.Pagination.Header.re)
if err != nil {
return errors.Wrapf(err, "failed to retrieve the next URL for pagination")
}
Expand All @@ -304,37 +314,37 @@ func (in *httpjsonInput) processHTTPRequest(ctx context.Context, client *http.Cl
return nil
}
ri.URL = url
err = in.applyRateLimit(header, in.config.RateLimit)
if err != nil {
if err = in.applyRateLimit(ctx, header, in.config.RateLimit); err != nil {
return err
}
in.log.Info("Continuing with pagination to URL: ", ri.URL)
continue
} else {
// Pagination control using HTTP Body fields
v, err = common.MapStr(mm).GetValue(in.config.Pagination.IDField)
if err != nil {
if err == common.ErrKeyNotFound {
in.log.Info("Pagination finished.")
return nil
} else {
return errors.Wrapf(err, "failed to retrieve id_field for pagination")
}
if in.config.Pagination.RequestField != "" {
ri.ContentMap.Put(in.config.Pagination.RequestField, v)
if in.config.Pagination.URL != "" {
ri.URL = in.config.Pagination.URL
}
} else {
switch v.(type) {
switch vt := v.(type) {
case string:
ri.URL = v.(string)
ri.URL = vt
default:
return errors.New("pagination ID is not of string type")
}
}
if in.config.Pagination.ExtraBodyContent != nil {
alakahakai marked this conversation as resolved.
Show resolved Hide resolved
ri.ContentMap.Update(common.MapStr(in.config.Pagination.ExtraBodyContent))
}
err = in.applyRateLimit(header, in.config.RateLimit)
if err != nil {
if err = in.applyRateLimit(ctx, header, in.config.RateLimit); err != nil {
return err
}
in.log.Info("Continuing with pagination to URL: ", ri.URL)
Expand Down