-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Filebeat][httpjson] Add split_events_by config setting #19246
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -181,24 +181,61 @@ func (in *HttpjsonInput) createHTTPRequest(ctx context.Context, ri *RequestInfo) | |
|
||
// processEventArray publishes an event for each object contained in the array. It returns the last object in the array and an error if any. | ||
func (in *HttpjsonInput) processEventArray(events []interface{}) (map[string]interface{}, error) { | ||
var m map[string]interface{} | ||
var last map[string]interface{} | ||
for _, t := range events { | ||
switch v := t.(type) { | ||
case map[string]interface{}: | ||
m = v | ||
d, err := json.Marshal(v) | ||
if err != nil { | ||
return nil, errors.Wrapf(err, "failed to marshal %+v", v) | ||
} | ||
ok := in.outlet.OnEvent(makeEvent(string(d))) | ||
if !ok { | ||
return nil, errors.New("function OnEvent returned false") | ||
for _, e := range in.splitEvent(v) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Leaving a comment about our previous discussion, maybe @andrewkroh can pitch in on this, if we want splitEvent to happen on every single event even when splitEvent is not configured, and handle the config checks in the splitEvent method, or do it here instead. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe renaming from There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think splitEvent makes more sense because it's the term that has been used over time regarding logstash, then it's easier to think "yeah split filters, just like splitEvent" :D In terms of having to branches, the only reason I comment is that based on my personal taste, I don't like to run functions that is made for a specific usecase if the conditions are not met. Let's wait and see if we can get another comment before making a decision. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. FYI the s3 input has "expand_event_list_from_field" which does something very similar, might be worth comparing implementation. |
||
last = e | ||
d, err := json.Marshal(e) | ||
if err != nil { | ||
return nil, errors.Wrapf(err, "failed to marshal %+v", e) | ||
} | ||
ok := in.outlet.OnEvent(makeEvent(string(d))) | ||
if !ok { | ||
return nil, errors.New("function OnEvent returned false") | ||
} | ||
} | ||
default: | ||
return nil, errors.Errorf("expected only JSON objects in the array but got a %T", v) | ||
} | ||
} | ||
return m, nil | ||
return last, nil | ||
} | ||
|
||
func (in *HttpjsonInput) splitEvent(event map[string]interface{}) []map[string]interface{} { | ||
m := common.MapStr(event) | ||
|
||
hasSplitKey, _ := m.HasKey(in.config.SplitEventsBy) | ||
if in.config.SplitEventsBy == "" || !hasSplitKey { | ||
return []map[string]interface{}{event} | ||
} | ||
|
||
splitOnIfc, _ := m.GetValue(in.config.SplitEventsBy) | ||
splitOn, ok := splitOnIfc.([]interface{}) | ||
// if not an array or is empty, we do nothing | ||
if !ok || len(splitOn) == 0 { | ||
return []map[string]interface{}{event} | ||
} | ||
|
||
var events []map[string]interface{} | ||
for _, split := range splitOn { | ||
s, ok := split.(map[string]interface{}) | ||
// if not an object, we do nothing | ||
if !ok { | ||
return []map[string]interface{}{event} | ||
} | ||
|
||
mm := m.Clone() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems that Clone() is not needed here since the field always gets overridden below. so using m should work. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Isn't m storing the JSON object while mm is storing the array inside? mm is creating a shallow copy, to ensure that m is not modified while iterating over it here:
|
||
_, err := mm.Put(in.config.SplitEventsBy, s) | ||
if err != nil { | ||
return []map[string]interface{}{event} | ||
} | ||
|
||
events = append(events, mm) | ||
} | ||
|
||
return events | ||
} | ||
|
||
// getNextLinkFromHeader retrieves the next URL for pagination from the HTTP Header of the response | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Testing is not really my field of expertise, but since we have one with key, one without, what happened if a key is empty? And if we only have one object in the array? I am unsure if that needs coverage or not, but just wanted to mention it.