Skip to content

Commit

Permalink
[Filebeat][httpjson] Add split_events_by config setting (elastic#19246)
Browse files Browse the repository at this point in the history
  • Loading branch information
marc-gr authored and melchiormoulin committed Oct 14, 2020
1 parent 9910f4a commit 790bb19
Show file tree
Hide file tree
Showing 5 changed files with 242 additions and 13 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Adds check on `<no value>` config option value for the azure input `resource_manager_endpoint`. {pull}18890[18890]
- Okta module now requires objects instead of JSON strings for the `http_headers`, `http_request_body`, `pagination`, `rate_limit`, and `ssl` variables. {pull}18953[18953]
- Adds oauth support for httpjson input. {issue}18415[18415] {pull}18892[18892]
- Adds `split_events_by` option to httpjson input. {pull}19246[19246]

*Heartbeat*

Expand Down
67 changes: 67 additions & 0 deletions x-pack/filebeat/docs/inputs/input-httpjson.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,73 @@ The config needs to specify `events` as the `json_objects_array` value.
json_objects_array: events
----

[float]
==== `split_events_by`

If the response body contains a JSON object containing an array then this option
specifies the key containing that array. Each object in that array will generate
an event, but will maintain the common fields of the document as well.

["source","json",subs="attributes"]
----
{
"time": "2020-06-02 23:22:32 UTC",
"user": "Bob",
"events": [
{
"timestamp": "2020-05-02 11:10:03 UTC",
"event": {
"category": "authorization"
}
},
{
"timestamp": "2020-05-05 13:03:11 UTC",
"event": {
"category": "authorization"
}
}
]
}
----

The config needs to specify `events` as the `split_events_by` value.

["source","yaml",subs="attributes"]
----
- type: httpjson
split_events_by: events
----

And will output the following events:

["source","json",subs="attributes"]
----
[
{
"time": "2020-06-02 23:22:32 UTC",
"user": "Bob",
"events": {
"timestamp": "2020-05-02 11:10:03 UTC",
"event": {
"category": "authorization"
}
}
},
{
"time": "2020-06-02 23:22:32 UTC",
"user": "Bob",
"events": {
"timestamp": "2020-05-05 13:03:11 UTC",
"event": {
"category": "authorization"
}
}
}
]
----

It can be used in combination with `json_objects_array`, which will look for the field inside each element.

[float]
==== `no_http_body`

Expand Down
1 change: 1 addition & 0 deletions x-pack/filebeat/input/httpjson/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type config struct {
HTTPRequestBody common.MapStr `config:"http_request_body"`
Interval time.Duration `config:"interval"`
JSONObjects string `config:"json_objects_array"`
SplitEventsBy string `config:"split_events_by"`
NoHTTPBody bool `config:"no_http_body"`
Pagination *Pagination `config:"pagination"`
RateLimit *RateLimit `config:"rate_limit"`
Expand Down
129 changes: 126 additions & 3 deletions x-pack/filebeat/input/httpjson/httpjson_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (

"golang.org/x/sync/errgroup"

"github.com/stretchr/testify/assert"

"github.com/elastic/beats/v7/filebeat/channel"
"github.com/elastic/beats/v7/filebeat/input"
"github.com/elastic/beats/v7/libbeat/beat"
Expand All @@ -38,7 +40,6 @@ const (

var (
once sync.Once
url string
)

func testSetup(t *testing.T) {
Expand Down Expand Up @@ -91,6 +92,10 @@ func createServer(newServer func(handler http.Handler) *httptest.Server) *httpte
"embedded": map[string]string{
"hello": "world",
},
"list": []map[string]interface{}{
{"foo": "bar"},
{"hello": "world"},
},
}
b, _ := json.Marshal(message)
w.Header().Set("Content-Type", "application/json")
Expand Down Expand Up @@ -157,8 +162,24 @@ func createCustomServerWithArrayResponse(newServer func(handler http.Handler) *h
return newServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
message := map[string]interface{}{
"hello": []map[string]string{
{"foo": "bar"},
"hello": []map[string]interface{}{
{
"foo": "bar",
"list": []map[string]interface{}{
{"foo": "bar"},
{"hello": "world"},
},
},
{
"foo": "bar",
"list": []map[string]interface{}{
{"foo": "bar"},
},
},
{
"bar": "foo",
"list": []map[string]interface{}{},
},
{"bar": "foo"},
},
}
Expand Down Expand Up @@ -604,3 +625,105 @@ func TestOAuth2(t *testing.T) {
}
})
}

func TestSplitResponseWithKey(t *testing.T) {
m := map[string]interface{}{
"http_method": "GET",
"split_events_by": "list",
"interval": 0,
}
ts := createTestServer(HTTPTestServer)
runTest(t, ts, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) {
group, _ := errgroup.WithContext(context.Background())
group.Go(input.run)

events, ok := out.waitForEvents(2)
if !ok {
t.Fatalf("Expected 2 events, but got %d.", len(events))
}
input.Stop()

if err := group.Wait(); err != nil {
t.Fatal(err)
}
})
}

func TestSplitResponseWithoutKey(t *testing.T) {
m := map[string]interface{}{
"http_method": "GET",
"split_events_by": "not_found",
"interval": 0,
}
ts := createTestServer(HTTPTestServer)
runTest(t, ts, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) {
group, _ := errgroup.WithContext(context.Background())
group.Go(input.run)

events, ok := out.waitForEvents(1)
if !ok {
t.Fatalf("Expected 1 events, but got %d.", len(events))
}
input.Stop()

if err := group.Wait(); err != nil {
t.Fatal(err)
}
})
}

func TestArrayWithSplitResponse(t *testing.T) {
m := map[string]interface{}{
"http_method": "GET",
"json_objects_array": "hello",
"split_events_by": "list",
"interval": 0,
}

expectedFields := []string{
`{
"foo": "bar",
"list": {
"foo": "bar"
}
}`,
`{
"foo": "bar",
"list": {
"hello": "world"
}
}`,
`{
"foo": "bar",
"list": {
"foo": "bar"
}
}`,
`{
"bar": "foo",
"list": []
}`,
`{"bar": "foo"}`,
}

ts := createTestServer(ArrayResponseServer)
runTest(t, ts, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) {
group, _ := errgroup.WithContext(context.Background())
group.Go(input.run)

events, ok := out.waitForEvents(5)
if !ok {
t.Fatalf("Expected 5 events, but got %d.", len(events))
}
input.Stop()

if err := group.Wait(); err != nil {
t.Fatal(err)
}

for i, e := range events {
message, _ := e.GetValue("message")
assert.JSONEq(t, expectedFields[i], message.(string))
}
})
}
57 changes: 47 additions & 10 deletions x-pack/filebeat/input/httpjson/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,24 +181,61 @@ func (in *HttpjsonInput) createHTTPRequest(ctx context.Context, ri *RequestInfo)

// processEventArray publishes an event for each object contained in the array. It returns the last object in the array and an error if any.
func (in *HttpjsonInput) processEventArray(events []interface{}) (map[string]interface{}, error) {
var m map[string]interface{}
var last map[string]interface{}
for _, t := range events {
switch v := t.(type) {
case map[string]interface{}:
m = v
d, err := json.Marshal(v)
if err != nil {
return nil, errors.Wrapf(err, "failed to marshal %+v", v)
}
ok := in.outlet.OnEvent(makeEvent(string(d)))
if !ok {
return nil, errors.New("function OnEvent returned false")
for _, e := range in.splitEvent(v) {
last = e
d, err := json.Marshal(e)
if err != nil {
return nil, errors.Wrapf(err, "failed to marshal %+v", e)
}
ok := in.outlet.OnEvent(makeEvent(string(d)))
if !ok {
return nil, errors.New("function OnEvent returned false")
}
}
default:
return nil, errors.Errorf("expected only JSON objects in the array but got a %T", v)
}
}
return m, nil
return last, nil
}

func (in *HttpjsonInput) splitEvent(event map[string]interface{}) []map[string]interface{} {
m := common.MapStr(event)

hasSplitKey, _ := m.HasKey(in.config.SplitEventsBy)
if in.config.SplitEventsBy == "" || !hasSplitKey {
return []map[string]interface{}{event}
}

splitOnIfc, _ := m.GetValue(in.config.SplitEventsBy)
splitOn, ok := splitOnIfc.([]interface{})
// if not an array or is empty, we do nothing
if !ok || len(splitOn) == 0 {
return []map[string]interface{}{event}
}

var events []map[string]interface{}
for _, split := range splitOn {
s, ok := split.(map[string]interface{})
// if not an object, we do nothing
if !ok {
return []map[string]interface{}{event}
}

mm := m.Clone()
_, err := mm.Put(in.config.SplitEventsBy, s)
if err != nil {
return []map[string]interface{}{event}
}

events = append(events, mm)
}

return events
}

// getNextLinkFromHeader retrieves the next URL for pagination from the HTTP Header of the response
Expand Down

0 comments on commit 790bb19

Please sign in to comment.