Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Filebeat][httpjson] Add split_events_by config setting #19246

Merged
merged 4 commits into from
Jun 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Adds check on `<no value>` config option value for the azure input `resource_manager_endpoint`. {pull}18890[18890]
- Okta module now requires objects instead of JSON strings for the `http_headers`, `http_request_body`, `pagination`, `rate_limit`, and `ssl` variables. {pull}18953[18953]
- Adds oauth support for httpjson input. {issue}18415[18415] {pull}18892[18892]
- Adds `split_events_by` option to httpjson input. {pull}19246[19246]

*Heartbeat*

Expand Down
67 changes: 67 additions & 0 deletions x-pack/filebeat/docs/inputs/input-httpjson.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,73 @@ The config needs to specify `events` as the `json_objects_array` value.
json_objects_array: events
----

[float]
==== `split_events_by`

If the response body contains a JSON object containing an array then this option
specifies the key containing that array. Each object in that array will generate
an event, but will maintain the common fields of the document as well.

["source","json",subs="attributes"]
----
{
"time": "2020-06-02 23:22:32 UTC",
"user": "Bob",
"events": [
{
"timestamp": "2020-05-02 11:10:03 UTC",
"event": {
"category": "authorization"
}
},
{
"timestamp": "2020-05-05 13:03:11 UTC",
"event": {
"category": "authorization"
}
}
]
}
----

The config needs to specify `events` as the `split_events_by` value.

["source","yaml",subs="attributes"]
----
- type: httpjson
split_events_by: events
----

And will output the following events:

["source","json",subs="attributes"]
----
[
{
"time": "2020-06-02 23:22:32 UTC",
"user": "Bob",
"events": {
"timestamp": "2020-05-02 11:10:03 UTC",
"event": {
"category": "authorization"
}
}
},
{
"time": "2020-06-02 23:22:32 UTC",
"user": "Bob",
"events": {
"timestamp": "2020-05-05 13:03:11 UTC",
"event": {
"category": "authorization"
}
}
}
]
----

It can be used in combination with `json_objects_array`, which will look for the field inside each element.

[float]
==== `no_http_body`

Expand Down
1 change: 1 addition & 0 deletions x-pack/filebeat/input/httpjson/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type config struct {
HTTPRequestBody common.MapStr `config:"http_request_body"`
Interval time.Duration `config:"interval"`
JSONObjects string `config:"json_objects_array"`
SplitEventsBy string `config:"split_events_by"`
NoHTTPBody bool `config:"no_http_body"`
Pagination *Pagination `config:"pagination"`
RateLimit *RateLimit `config:"rate_limit"`
Expand Down
129 changes: 126 additions & 3 deletions x-pack/filebeat/input/httpjson/httpjson_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (

"golang.org/x/sync/errgroup"

"github.com/stretchr/testify/assert"

"github.com/elastic/beats/v7/filebeat/channel"
"github.com/elastic/beats/v7/filebeat/input"
"github.com/elastic/beats/v7/libbeat/beat"
Expand All @@ -38,7 +40,6 @@ const (

var (
once sync.Once
url string
)

func testSetup(t *testing.T) {
Expand Down Expand Up @@ -91,6 +92,10 @@ func createServer(newServer func(handler http.Handler) *httptest.Server) *httpte
"embedded": map[string]string{
"hello": "world",
},
"list": []map[string]interface{}{
{"foo": "bar"},
{"hello": "world"},
},
}
b, _ := json.Marshal(message)
w.Header().Set("Content-Type", "application/json")
Expand Down Expand Up @@ -157,8 +162,24 @@ func createCustomServerWithArrayResponse(newServer func(handler http.Handler) *h
return newServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
message := map[string]interface{}{
"hello": []map[string]string{
{"foo": "bar"},
"hello": []map[string]interface{}{
{
"foo": "bar",
"list": []map[string]interface{}{
{"foo": "bar"},
{"hello": "world"},
},
},
{
"foo": "bar",
"list": []map[string]interface{}{
{"foo": "bar"},
},
},
{
"bar": "foo",
"list": []map[string]interface{}{},
},
{"bar": "foo"},
},
}
Expand Down Expand Up @@ -604,3 +625,105 @@ func TestOAuth2(t *testing.T) {
}
})
}

func TestSplitResponseWithKey(t *testing.T) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Testing is not really my field of expertise, but since we have one with key, one without, what happened if a key is empty? And if we only have one object in the array? I am unsure if that needs coverage or not, but just wanted to mention it.

m := map[string]interface{}{
"http_method": "GET",
"split_events_by": "list",
"interval": 0,
}
ts := createTestServer(HTTPTestServer)
runTest(t, ts, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) {
group, _ := errgroup.WithContext(context.Background())
group.Go(input.run)

events, ok := out.waitForEvents(2)
if !ok {
t.Fatalf("Expected 2 events, but got %d.", len(events))
}
input.Stop()

if err := group.Wait(); err != nil {
t.Fatal(err)
}
})
}

func TestSplitResponseWithoutKey(t *testing.T) {
m := map[string]interface{}{
"http_method": "GET",
"split_events_by": "not_found",
"interval": 0,
}
ts := createTestServer(HTTPTestServer)
runTest(t, ts, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) {
group, _ := errgroup.WithContext(context.Background())
group.Go(input.run)

events, ok := out.waitForEvents(1)
if !ok {
t.Fatalf("Expected 1 events, but got %d.", len(events))
}
input.Stop()

if err := group.Wait(); err != nil {
t.Fatal(err)
}
})
}

func TestArrayWithSplitResponse(t *testing.T) {
m := map[string]interface{}{
"http_method": "GET",
"json_objects_array": "hello",
"split_events_by": "list",
"interval": 0,
}

expectedFields := []string{
`{
"foo": "bar",
"list": {
"foo": "bar"
}
}`,
`{
"foo": "bar",
"list": {
"hello": "world"
}
}`,
`{
"foo": "bar",
"list": {
"foo": "bar"
}
}`,
`{
"bar": "foo",
"list": []
}`,
`{"bar": "foo"}`,
}

ts := createTestServer(ArrayResponseServer)
runTest(t, ts, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) {
group, _ := errgroup.WithContext(context.Background())
group.Go(input.run)

events, ok := out.waitForEvents(5)
if !ok {
t.Fatalf("Expected 5 events, but got %d.", len(events))
}
input.Stop()

if err := group.Wait(); err != nil {
t.Fatal(err)
}

for i, e := range events {
message, _ := e.GetValue("message")
assert.JSONEq(t, expectedFields[i], message.(string))
}
})
}
57 changes: 47 additions & 10 deletions x-pack/filebeat/input/httpjson/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,24 +181,61 @@ func (in *HttpjsonInput) createHTTPRequest(ctx context.Context, ri *RequestInfo)

// processEventArray publishes an event for each object contained in the array. It returns the last object in the array and an error if any.
func (in *HttpjsonInput) processEventArray(events []interface{}) (map[string]interface{}, error) {
var m map[string]interface{}
var last map[string]interface{}
for _, t := range events {
switch v := t.(type) {
case map[string]interface{}:
m = v
d, err := json.Marshal(v)
if err != nil {
return nil, errors.Wrapf(err, "failed to marshal %+v", v)
}
ok := in.outlet.OnEvent(makeEvent(string(d)))
if !ok {
return nil, errors.New("function OnEvent returned false")
for _, e := range in.splitEvent(v) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leaving a comment about our previous discussion, maybe @andrewkroh can pitch in on this, if we want splitEvent to happen on every single event even when splitEvent is not configured, and handle the config checks in the splitEvent method, or do it here instead.

Copy link
Contributor Author

@marc-gr marc-gr Jun 17, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe renaming from splitEvent to eventAsList or similar makes for a better name here? wanted to avoid having two branches to handle the single event and the list inside the same case, but if it makes for a better understanding I am up for it

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think splitEvent makes more sense because it's the term that has been used over time regarding logstash, then it's easier to think "yeah split filters, just like splitEvent" :D

In terms of having to branches, the only reason I comment is that based on my personal taste, I don't like to run functions that is made for a specific usecase if the conditions are not met.
When reading the code now, it seems more that splitEvent should always happen, until you scroll further down and see that it handles another if condition.

Let's wait and see if we can get another comment before making a decision.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI the s3 input has "expand_event_list_from_field" which does something very similar, might be worth comparing implementation.

last = e
d, err := json.Marshal(e)
if err != nil {
return nil, errors.Wrapf(err, "failed to marshal %+v", e)
}
ok := in.outlet.OnEvent(makeEvent(string(d)))
if !ok {
return nil, errors.New("function OnEvent returned false")
}
}
default:
return nil, errors.Errorf("expected only JSON objects in the array but got a %T", v)
}
}
return m, nil
return last, nil
}

func (in *HttpjsonInput) splitEvent(event map[string]interface{}) []map[string]interface{} {
m := common.MapStr(event)

hasSplitKey, _ := m.HasKey(in.config.SplitEventsBy)
if in.config.SplitEventsBy == "" || !hasSplitKey {
return []map[string]interface{}{event}
}

splitOnIfc, _ := m.GetValue(in.config.SplitEventsBy)
splitOn, ok := splitOnIfc.([]interface{})
// if not an array or is empty, we do nothing
if !ok || len(splitOn) == 0 {
return []map[string]interface{}{event}
}

var events []map[string]interface{}
for _, split := range splitOn {
s, ok := split.(map[string]interface{})
// if not an object, we do nothing
if !ok {
return []map[string]interface{}{event}
}

mm := m.Clone()
Copy link

@alakahakai alakahakai Jun 23, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that Clone() is not needed here since the field always gets overridden below. so using m should work.

Copy link
Member

@P1llus P1llus Jun 25, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't m storing the JSON object while mm is storing the array inside? mm is creating a shallow copy, to ensure that m is not modified while iterating over it here:

for _, split := range splitOn {

_, err := mm.Put(in.config.SplitEventsBy, s)
if err != nil {
return []map[string]interface{}{event}
}

events = append(events, mm)
}

return events
}

// getNextLinkFromHeader retrieves the next URL for pagination from the HTTP Header of the response
Expand Down