Skip to content

Commit

Permalink
Add automatic retries and exponential backoff to Filebeat httpjson in…
Browse files Browse the repository at this point in the history
…put (elastic#18956)

- Add automatic retries and exponential backoff to Filebeat httpjson input (elastic#18956)
- Add test cases
- Support the new OAuth2 functionality

(cherry picked from commit 0dcb3df)
  • Loading branch information
Lei Qiu authored and marc-gr committed Jul 13, 2020
1 parent 42542f5 commit 94a0bdf
Show file tree
Hide file tree
Showing 8 changed files with 7,377 additions and 6,476 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,7 @@ field. You can revert this change by configuring tags for the module and omittin
- Add support for timezone offsets and `Z` to decode_cef timestamp parser. {pull}19346[19346]
- Improve ECS categorization field mappings in traefik module. {issue}16183[16183] {pull}19379[19379]
- Improve ECS categorization field mappings in azure module. {issue}16155[16155] {pull}19376[19376]
- Add automatic retries and exponential backoff to httpjson input. {pull}18956[18956]

*Heartbeat*

Expand Down
13,641 changes: 7,209 additions & 6,432 deletions NOTICE.txt

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ require (
github.com/grpc-ecosystem/grpc-gateway v1.13.0 // indirect
github.com/h2non/filetype v1.0.12
github.com/hashicorp/go-multierror v1.1.0
github.com/hashicorp/go-retryablehttp v0.6.6
github.com/hashicorp/golang-lru v0.5.2-0.20190520140433-59383c442f7d // indirect
github.com/hectane/go-acl v0.0.0-20190604041725-da78bae5fc95
github.com/insomniacslk/dhcp v0.0.0-20180716145214-633285ba52b2
Expand Down
7 changes: 6 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -406,10 +406,15 @@ github.com/hashicorp/errwrap v0.0.0-20141028054710-7554cd9344ce h1:prjrVgOk2Yg6w
github.com/hashicorp/errwrap v0.0.0-20141028054710-7554cd9344ce/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-multierror v0.0.0-20161216184304-ed905158d874 h1:cAv7ZbSmyb1wjn6T4TIiyFCkpcfgpbcNNC3bM2srLaI=
github.com/hashicorp/go-cleanhttp v0.5.1 h1:dH3aiDG9Jvb5r5+bYHsikaOUIpcM0xvgMXVoDkXMzJM=
github.com/hashicorp/go-cleanhttp v0.5.1/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80=
github.com/hashicorp/go-hclog v0.9.2 h1:CG6TE5H9/JXsFWJCfoIVpKFIkFe6ysEuHirp4DxCsHI=
github.com/hashicorp/go-hclog v0.9.2/go.mod h1:5CU+agLiy3J7N7QjHK5d05KxGsuXiQLrjA0H7acj2lQ=
github.com/hashicorp/go-multierror v0.0.0-20161216184304-ed905158d874/go.mod h1:JMRHfdO9jKNzS/+BTlxCjKNQHg/jZAft8U7LloJvN7I=
github.com/hashicorp/go-multierror v1.1.0 h1:B9UzwGQJehnUY1yNrnwREHc3fGbC2xefo8g4TbElacI=
github.com/hashicorp/go-multierror v1.1.0/go.mod h1:spPvp8C1qA32ftKqdAHm4hHTbPw+vmowP0z+KUhOZdA=
github.com/hashicorp/go-retryablehttp v0.6.6 h1:HJunrbHTDDbBb/ay4kxa1n+dLmttUlnP3V9oNE4hmsM=
github.com/hashicorp/go-retryablehttp v0.6.6/go.mod h1:vAew36LZh98gCBJNLH42IQ1ER/9wtLZZ8meHqQvEYWY=
github.com/hashicorp/go-uuid v1.0.2 h1:cfejS+Tpcp13yd5nYHWDI6qVCny6wyX2Mt5SGur2IGE=
github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
github.com/hashicorp/go-version v1.0.0 h1:21MVWPKDphxa7ineQQTrCU5brh7OuVVAzGOCnnCPtE8=
Expand Down
15 changes: 15 additions & 0 deletions x-pack/filebeat/docs/inputs/input-httpjson.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,21 @@ remaining quota of the rate limit.
This specifies the field in the HTTP Header of the response that specifies the
epoch time when the rate limit will reset.

[float]
==== `retry.max_attempts`

This specifies the maximum number of retries for the retryable HTTP client. Default: 5.

[float]
==== `retry.wait_min`

This specifies the minimum time to wait before a retry is attempted. Default: 1s.

[float]
==== `retry.wait_max`

This specifies the maximum time to wait before a retry is attempted. Default: 60s.

[float]
==== `ssl`

Expand Down
6 changes: 6 additions & 0 deletions x-pack/filebeat/input/httpjson/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ type config struct {
NoHTTPBody bool `config:"no_http_body"`
Pagination *Pagination `config:"pagination"`
RateLimit *RateLimit `config:"rate_limit"`
RetryMax int `config:"retry.max_attempts"`
RetryWaitMin time.Duration `config:"retry.wait_min"`
RetryWaitMax time.Duration `config:"retry.wait_max"`
TLS *tlscommon.Config `config:"ssl"`
URL string `config:"url" validate:"required"`
}
Expand Down Expand Up @@ -95,5 +98,8 @@ func defaultConfig() config {
var c config
c.HTTPMethod = "GET"
c.HTTPClientTimeout = 60 * time.Second
c.RetryWaitMin = 1 * time.Second
c.RetryWaitMax = 60 * time.Second
c.RetryMax = 5
return c
}
124 changes: 100 additions & 24 deletions x-pack/filebeat/input/httpjson/httpjson_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"encoding/json"
"io/ioutil"
"log"
"math/rand"
"net/http"
"net/http/httptest"
"reflect"
Expand All @@ -27,6 +28,14 @@ import (
"github.com/elastic/beats/v7/libbeat/logp"
)

const (
HTTPTestServer int = iota
TLSTestServer
RateLimitRetryServer
ErrorRetryServer
ArrayResponseServer
)

var (
once sync.Once
url string
Expand All @@ -39,6 +48,26 @@ func testSetup(t *testing.T) {
})
}

func createTestServer(testServer int) *httptest.Server {
var ts *httptest.Server
newServer := httptest.NewServer
switch testServer {
case HTTPTestServer:
ts = createServer(newServer)
case TLSTestServer:
ts = createServer(httptest.NewTLSServer)
case RateLimitRetryServer:
ts = createCustomServer(newServer)
case ErrorRetryServer:
ts = createCustomRetryServer(newServer)
case ArrayResponseServer:
ts = createCustomServerWithArrayResponse(newServer)
default:
ts = createServer(newServer)
}
return ts
}

func createServer(newServer func(handler http.Handler) *httptest.Server) *httptest.Server {
return newServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method == http.MethodPost {
Expand Down Expand Up @@ -92,6 +121,34 @@ func createCustomServer(newServer func(handler http.Handler) *httptest.Server) *
b, _ := json.Marshal(message)
w.WriteHeader(http.StatusOK)
w.Write(b)
isRetry = false
}
}))
}

func createCustomRetryServer(newServer func(handler http.Handler) *httptest.Server) *httptest.Server {
retryCount := 0
statusCodes := []int{http.StatusInternalServerError, http.StatusBadGateway, http.StatusServiceUnavailable, http.StatusGatewayTimeout, http.StatusHTTPVersionNotSupported, http.StatusVariantAlsoNegotiates, http.StatusInsufficientStorage, http.StatusLoopDetected, http.StatusNotExtended, http.StatusNetworkAuthenticationRequired}
return newServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
// Test retry for two times
if retryCount < 2 {
rand.Seed(time.Now().Unix())
code := statusCodes[rand.Intn(len(statusCodes))]
w.WriteHeader(code)
w.Write([]byte{})
retryCount++
} else {
message := map[string]interface{}{
"hello": "world",
"embedded": map[string]string{
"hello": "world",
},
}
b, _ := json.Marshal(message)
w.WriteHeader(http.StatusOK)
w.Write(b)
retryCount = 0
}
}))
}
Expand All @@ -111,20 +168,8 @@ func createCustomServerWithArrayResponse(newServer func(handler http.Handler) *h
}))
}

func runTest(t *testing.T, isTLS bool, testRateLimitRetry bool, testArrayResponse bool, m map[string]interface{}, run func(input *HttpjsonInput, out *stubOutleter, t *testing.T)) {
func runTest(t *testing.T, ts *httptest.Server, m map[string]interface{}, run func(input *HttpjsonInput, out *stubOutleter, t *testing.T)) {
testSetup(t)
// Create an http test server according to whether TLS is used
var newServer = httptest.NewServer
if isTLS {
newServer = httptest.NewTLSServer
}
ts := createServer(newServer)
if testRateLimitRetry {
ts = createCustomServer(newServer)
}
if testArrayResponse {
ts = createCustomServerWithArrayResponse(newServer)
}
defer ts.Close()
m["url"] = ts.URL
cfg := common.MustNewConfigFrom(m)
Expand Down Expand Up @@ -360,7 +405,8 @@ func TestGET(t *testing.T) {
"http_method": "GET",
"interval": 0,
}
runTest(t, false, false, false, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) {
ts := createTestServer(HTTPTestServer)
runTest(t, ts, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) {
group, _ := errgroup.WithContext(context.Background())
group.Go(input.run)

Expand All @@ -382,7 +428,8 @@ func TestGetHTTPS(t *testing.T) {
"interval": 0,
"ssl.verification_mode": "none",
}
runTest(t, true, false, false, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) {
ts := createTestServer(HTTPTestServer)
runTest(t, ts, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) {
group, _ := errgroup.WithContext(context.Background())
group.Go(input.run)

Expand All @@ -403,7 +450,30 @@ func TestRateLimitRetry(t *testing.T) {
"http_method": "GET",
"interval": 0,
}
runTest(t, false, true, false, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) {
ts := createTestServer(RateLimitRetryServer)
runTest(t, ts, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) {
group, _ := errgroup.WithContext(context.Background())
group.Go(input.run)

events, ok := out.waitForEvents(1)
if !ok {
t.Fatalf("Expected 1 events, but got %d.", len(events))
}
input.Stop()

if err := group.Wait(); err != nil {
t.Fatal(err)
}
})
}

func TestErrorRetry(t *testing.T) {
m := map[string]interface{}{
"http_method": "GET",
"interval": 0,
}
ts := createTestServer(ErrorRetryServer)
runTest(t, ts, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) {
group, _ := errgroup.WithContext(context.Background())
group.Go(input.run)

Expand All @@ -425,7 +495,8 @@ func TestArrayResponse(t *testing.T) {
"json_objects_array": "hello",
"interval": 0,
}
runTest(t, false, false, true, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) {
ts := createTestServer(ArrayResponseServer)
runTest(t, ts, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) {
group, _ := errgroup.WithContext(context.Background())
group.Go(input.run)

Expand All @@ -447,7 +518,8 @@ func TestPOST(t *testing.T) {
"http_request_body": map[string]interface{}{"test": "abc", "testNested": map[string]interface{}{"testNested1": 123}},
"interval": 0,
}
runTest(t, false, false, false, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) {
ts := createTestServer(HTTPTestServer)
runTest(t, ts, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) {
group, _ := errgroup.WithContext(context.Background())
group.Go(input.run)

Expand All @@ -469,7 +541,8 @@ func TestRepeatedPOST(t *testing.T) {
"http_request_body": map[string]interface{}{"test": "abc", "testNested": map[string]interface{}{"testNested1": 123}},
"interval": 10 ^ 9,
}
runTest(t, false, false, false, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) {
ts := createTestServer(HTTPTestServer)
runTest(t, ts, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) {
group, _ := errgroup.WithContext(context.Background())
group.Go(input.run)

Expand All @@ -490,7 +563,8 @@ func TestRunStop(t *testing.T) {
"http_method": "GET",
"interval": 0,
}
runTest(t, false, false, false, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) {
ts := createTestServer(HTTPTestServer)
runTest(t, ts, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) {
input.Run()
input.Stop()
input.Run()
Expand All @@ -499,21 +573,23 @@ func TestRunStop(t *testing.T) {
}

func TestOAuth2(t *testing.T) {
ts := newOAuth2TestServer(t)
oAuth2Server := newOAuth2TestServer(t)
defer oAuth2Server.Close()
ts := createTestServer(HTTPTestServer)
defer ts.Close()
m := map[string]interface{}{
"http_method": "GET",
"oauth2.client.id": "a_client_id",
"oauth2.client.secret": "a_client_secret",
"oauth2.token_url": ts.URL,
"oauth2.token_url": oAuth2Server.URL,
"oauth2.endpoint_params": map[string][]string{
"param1": {"v1", "v2"},
},
"oauth2.scopes": []string{"scope1", "scope2"},
"interval": 0,
}
defer ts.Close()

runTest(t, false, false, false, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) {
runTest(t, ts, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) {
group, _ := errgroup.WithContext(context.Background())
group.Go(input.run)

Expand Down
Loading

0 comments on commit 94a0bdf

Please sign in to comment.