From 08923cfc2074fa2592cd6d911ceb47eb5970d764 Mon Sep 17 00:00:00 2001 From: Marc Guasch Date: Wed, 17 Jun 2020 14:47:30 +0200 Subject: [PATCH 1/4] [Filebeat][httpjson] Add split_events_by config setting --- x-pack/filebeat/input/httpjson/config.go | 1 + .../filebeat/input/httpjson/httpjson_test.go | 82 ++++++++++++++++++- x-pack/filebeat/input/httpjson/input.go | 55 ++++++++++--- 3 files changed, 125 insertions(+), 13 deletions(-) diff --git a/x-pack/filebeat/input/httpjson/config.go b/x-pack/filebeat/input/httpjson/config.go index 0d677abd5616..2fcc2fc8941c 100644 --- a/x-pack/filebeat/input/httpjson/config.go +++ b/x-pack/filebeat/input/httpjson/config.go @@ -26,6 +26,7 @@ type config struct { HTTPRequestBody common.MapStr `config:"http_request_body"` Interval time.Duration `config:"interval"` JSONObjects string `config:"json_objects_array"` + SplitEventsBy string `config:"split_events_by"` NoHTTPBody bool `config:"no_http_body"` Pagination *Pagination `config:"pagination"` RateLimit *RateLimit `config:"rate_limit"` diff --git a/x-pack/filebeat/input/httpjson/httpjson_test.go b/x-pack/filebeat/input/httpjson/httpjson_test.go index 416836782bac..c2a3387c43fb 100644 --- a/x-pack/filebeat/input/httpjson/httpjson_test.go +++ b/x-pack/filebeat/input/httpjson/httpjson_test.go @@ -38,7 +38,6 @@ const ( var ( once sync.Once - url string ) func testSetup(t *testing.T) { @@ -91,6 +90,10 @@ func createServer(newServer func(handler http.Handler) *httptest.Server) *httpte "embedded": map[string]string{ "hello": "world", }, + "list": []map[string]interface{}{ + {"foo": "bar"}, + {"hello": "world"}, + }, } b, _ := json.Marshal(message) w.Header().Set("Content-Type", "application/json") @@ -157,8 +160,14 @@ func createCustomServerWithArrayResponse(newServer func(handler http.Handler) *h return newServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") message := map[string]interface{}{ - "hello": []map[string]string{ - {"foo": "bar"}, + "hello": []map[string]interface{}{ + { + "foo": "bar", + "list": []map[string]interface{}{ + {"foo": "bar"}, + {"hello": "world"}, + }, + }, {"bar": "foo"}, }, } @@ -604,3 +613,70 @@ func TestOAuth2(t *testing.T) { } }) } + +func TestSplitResponseWithKey(t *testing.T) { + m := map[string]interface{}{ + "http_method": "GET", + "split_events_by": "list", + "interval": 0, + } + runTest(t, false, false, false, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) { + group, _ := errgroup.WithContext(context.Background()) + group.Go(input.run) + + events, ok := out.waitForEvents(2) + if !ok { + t.Fatalf("Expected 2 events, but got %d.", len(events)) + } + input.Stop() + + if err := group.Wait(); err != nil { + t.Fatal(err) + } + }) +} + +func TestSplitResponseWithoutKey(t *testing.T) { + m := map[string]interface{}{ + "http_method": "GET", + "split_events_by": "not_found", + "interval": 0, + } + runTest(t, false, false, false, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) { + group, _ := errgroup.WithContext(context.Background()) + group.Go(input.run) + + events, ok := out.waitForEvents(1) + if !ok { + t.Fatalf("Expected 1 events, but got %d.", len(events)) + } + input.Stop() + + if err := group.Wait(); err != nil { + t.Fatal(err) + } + }) +} + +func TestArrayWithSplitResponse(t *testing.T) { + m := map[string]interface{}{ + "http_method": "GET", + "json_objects_array": "hello", + "split_events_by": "list", + "interval": 0, + } + runTest(t, false, false, true, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) { + group, _ := errgroup.WithContext(context.Background()) + group.Go(input.run) + + events, ok := out.waitForEvents(3) + if !ok { + t.Fatalf("Expected 3 events, but got %d.", len(events)) + } + input.Stop() + + if err := group.Wait(); err != nil { + t.Fatal(err) + } + }) +} diff --git a/x-pack/filebeat/input/httpjson/input.go b/x-pack/filebeat/input/httpjson/input.go index 34180591ee09..160ef930a0f3 100644 --- a/x-pack/filebeat/input/httpjson/input.go +++ b/x-pack/filebeat/input/httpjson/input.go @@ -181,24 +181,59 @@ func (in *HttpjsonInput) createHTTPRequest(ctx context.Context, ri *RequestInfo) // processEventArray publishes an event for each object contained in the array. It returns the last object in the array and an error if any. func (in *HttpjsonInput) processEventArray(events []interface{}) (map[string]interface{}, error) { - var m map[string]interface{} for _, t := range events { switch v := t.(type) { case map[string]interface{}: - m = v - d, err := json.Marshal(v) - if err != nil { - return nil, errors.Wrapf(err, "failed to marshal %+v", v) - } - ok := in.outlet.OnEvent(makeEvent(string(d))) - if !ok { - return nil, errors.New("function OnEvent returned false") + for _, e := range in.splitEvent(v) { + d, err := json.Marshal(e) + if err != nil { + return nil, errors.Wrapf(err, "failed to marshal %+v", e) + } + ok := in.outlet.OnEvent(makeEvent(string(d))) + if !ok { + return nil, errors.New("function OnEvent returned false") + } } default: return nil, errors.Errorf("expected only JSON objects in the array but got a %T", v) } } - return m, nil + return nil, nil +} + +func (in *HttpjsonInput) splitEvent(event map[string]interface{}) []map[string]interface{} { + m := common.MapStr(event) + + hasSplitKey, _ := m.HasKey(in.config.SplitEventsBy) + if in.config.SplitEventsBy == "" || !hasSplitKey { + return []map[string]interface{}{event} + } + + splitOnIfc, _ := m.GetValue(in.config.SplitEventsBy) + splitOn, ok := splitOnIfc.([]interface{}) + // if not an array, we do nothing + if !ok { + return []map[string]interface{}{event} + } + + var events []map[string]interface{} + for _, split := range splitOn { + s, ok := split.(map[string]interface{}) + // if not an object, we do nothing + if !ok { + return []map[string]interface{}{event} + } + + mm := m.Clone() + _, err := mm.Put(in.config.SplitEventsBy, s) + if err != nil { + return []map[string]interface{}{event} + } + + events = append(events, mm) + } + + return events } // getNextLinkFromHeader retrieves the next URL for pagination from the HTTP Header of the response From a6597fe0170f4ef1d5213790ab90ccb31758f1b3 Mon Sep 17 00:00:00 2001 From: Marc Guasch Date: Thu, 18 Jun 2020 09:42:18 +0200 Subject: [PATCH 2/4] Add more tests --- .../filebeat/input/httpjson/httpjson_test.go | 48 ++++++++++++++++++- x-pack/filebeat/input/httpjson/input.go | 8 ++-- 2 files changed, 51 insertions(+), 5 deletions(-) diff --git a/x-pack/filebeat/input/httpjson/httpjson_test.go b/x-pack/filebeat/input/httpjson/httpjson_test.go index c2a3387c43fb..7d9b2c49f848 100644 --- a/x-pack/filebeat/input/httpjson/httpjson_test.go +++ b/x-pack/filebeat/input/httpjson/httpjson_test.go @@ -21,6 +21,8 @@ import ( "golang.org/x/sync/errgroup" + "github.com/stretchr/testify/assert" + "github.com/elastic/beats/v7/filebeat/channel" "github.com/elastic/beats/v7/filebeat/input" "github.com/elastic/beats/v7/libbeat/beat" @@ -168,6 +170,16 @@ func createCustomServerWithArrayResponse(newServer func(handler http.Handler) *h {"hello": "world"}, }, }, + { + "foo": "bar", + "list": []map[string]interface{}{ + {"foo": "bar"}, + }, + }, + { + "bar": "foo", + "list": []map[string]interface{}{}, + }, {"bar": "foo"}, }, } @@ -665,18 +677,50 @@ func TestArrayWithSplitResponse(t *testing.T) { "split_events_by": "list", "interval": 0, } + + expectedFields := []string{ + `{ + "foo": "bar", + "list": { + "foo": "bar" + } + }`, + `{ + "foo": "bar", + "list": { + "hello": "world" + } + }`, + `{ + "foo": "bar", + "list": { + "foo": "bar" + } + }`, + `{ + "bar": "foo", + "list": [] + }`, + `{"bar": "foo"}`, + } + runTest(t, false, false, true, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) { group, _ := errgroup.WithContext(context.Background()) group.Go(input.run) - events, ok := out.waitForEvents(3) + events, ok := out.waitForEvents(5) if !ok { - t.Fatalf("Expected 3 events, but got %d.", len(events)) + t.Fatalf("Expected 5 events, but got %d.", len(events)) } input.Stop() if err := group.Wait(); err != nil { t.Fatal(err) } + + for i, e := range events { + message, _ := e.GetValue("message") + assert.JSONEq(t, expectedFields[i], message.(string)) + } }) } diff --git a/x-pack/filebeat/input/httpjson/input.go b/x-pack/filebeat/input/httpjson/input.go index 160ef930a0f3..78f5eba5344e 100644 --- a/x-pack/filebeat/input/httpjson/input.go +++ b/x-pack/filebeat/input/httpjson/input.go @@ -181,10 +181,12 @@ func (in *HttpjsonInput) createHTTPRequest(ctx context.Context, ri *RequestInfo) // processEventArray publishes an event for each object contained in the array. It returns the last object in the array and an error if any. func (in *HttpjsonInput) processEventArray(events []interface{}) (map[string]interface{}, error) { + var last map[string]interface{} for _, t := range events { switch v := t.(type) { case map[string]interface{}: for _, e := range in.splitEvent(v) { + last = e d, err := json.Marshal(e) if err != nil { return nil, errors.Wrapf(err, "failed to marshal %+v", e) @@ -198,7 +200,7 @@ func (in *HttpjsonInput) processEventArray(events []interface{}) (map[string]int return nil, errors.Errorf("expected only JSON objects in the array but got a %T", v) } } - return nil, nil + return last, nil } func (in *HttpjsonInput) splitEvent(event map[string]interface{}) []map[string]interface{} { @@ -211,8 +213,8 @@ func (in *HttpjsonInput) splitEvent(event map[string]interface{}) []map[string]i splitOnIfc, _ := m.GetValue(in.config.SplitEventsBy) splitOn, ok := splitOnIfc.([]interface{}) - // if not an array, we do nothing - if !ok { + // if not an array or is empty, we do nothing + if !ok || len(splitOn) == 0 { return []map[string]interface{}{event} } From c531508eac1cdde1336eef3db9a11451671c29ef Mon Sep 17 00:00:00 2001 From: Marc Guasch Date: Thu, 25 Jun 2020 11:38:39 +0200 Subject: [PATCH 3/4] Add docs and entry in CHANGELOG --- CHANGELOG.next.asciidoc | 1 + .../docs/inputs/input-httpjson.asciidoc | 67 +++++++++++++++++++ 2 files changed, 68 insertions(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 812b6ff5a5ec..7a9746fa51ab 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -52,6 +52,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Adds check on `` config option value for the azure input `resource_manager_endpoint`. {pull}18890[18890] - Okta module now requires objects instead of JSON strings for the `http_headers`, `http_request_body`, `pagination`, `rate_limit`, and `ssl` variables. {pull}18953[18953] - Adds oauth support for httpjson input. {issue}18415[18415] {pull}18892[18892] +- Adds `split_events_by` option to httpjson input. {pull}19246[19246] *Heartbeat* diff --git a/x-pack/filebeat/docs/inputs/input-httpjson.asciidoc b/x-pack/filebeat/docs/inputs/input-httpjson.asciidoc index cdb24b513605..24e673d09e6d 100644 --- a/x-pack/filebeat/docs/inputs/input-httpjson.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-httpjson.asciidoc @@ -178,6 +178,73 @@ The config needs to specify `events` as the `json_objects_array` value. json_objects_array: events ---- +[float] +==== `split_events_by` + +If the response body contains a JSON object containing an array then this option +specifies the key containing that array. Each object in that array will generate +an event, but will maintain the common fields of the document as well. + +["source","json",subs="attributes"] +---- +{ + "time": "2020-06-02 23:22:32 UTC", + "user": "Bob", + "events": [ + { + "timestamp": "2020-05-02 11:10:03 UTC", + "event": { + "category": "authorization" + } + }, + { + "timestamp": "2020-05-05 13:03:11 UTC", + "event": { + "category": "authorization" + } + } + ] +} +---- + +The config needs to specify `events` as the `split_events_by` value. + +["source","yaml",subs="attributes"] +---- +- type: httpjson + split_events_by: events +---- + +And will output the following events: + +["source","json",subs="attributes"] +---- +[ + { + "time": "2020-06-02 23:22:32 UTC", + "user": "Bob", + "events": { + "timestamp": "2020-05-02 11:10:03 UTC", + "event": { + "category": "authorization" + } + } + }, + { + "time": "2020-06-02 23:22:32 UTC", + "user": "Bob", + "events": { + "timestamp": "2020-05-05 13:03:11 UTC", + "event": { + "category": "authorization" + } + } + } +] +---- + +It can be used in combination with `json_objects_array`, which will look for the field inside each element. + [float] ==== `no_http_body` From c97f6e665fb0fcbd530567651b856ec53f7d4c65 Mon Sep 17 00:00:00 2001 From: Marc Guasch Date: Thu, 25 Jun 2020 12:00:58 +0200 Subject: [PATCH 4/4] Fix tests --- x-pack/filebeat/input/httpjson/httpjson_test.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/x-pack/filebeat/input/httpjson/httpjson_test.go b/x-pack/filebeat/input/httpjson/httpjson_test.go index 7d9b2c49f848..4e70fe72472f 100644 --- a/x-pack/filebeat/input/httpjson/httpjson_test.go +++ b/x-pack/filebeat/input/httpjson/httpjson_test.go @@ -632,7 +632,8 @@ func TestSplitResponseWithKey(t *testing.T) { "split_events_by": "list", "interval": 0, } - runTest(t, false, false, false, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) { + ts := createTestServer(HTTPTestServer) + runTest(t, ts, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) { group, _ := errgroup.WithContext(context.Background()) group.Go(input.run) @@ -654,7 +655,8 @@ func TestSplitResponseWithoutKey(t *testing.T) { "split_events_by": "not_found", "interval": 0, } - runTest(t, false, false, false, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) { + ts := createTestServer(HTTPTestServer) + runTest(t, ts, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) { group, _ := errgroup.WithContext(context.Background()) group.Go(input.run) @@ -704,7 +706,8 @@ func TestArrayWithSplitResponse(t *testing.T) { `{"bar": "foo"}`, } - runTest(t, false, false, true, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) { + ts := createTestServer(ArrayResponseServer) + runTest(t, ts, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) { group, _ := errgroup.WithContext(context.Background()) group.Go(input.run)