-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add 'expand_keys' option to JSON input/processor #22849
Changes from 9 commits
45dfeaf
28a769a
cf6a340
63693ed
50289ed
b254164
fe630f7
909987b
ce8b7bf
236ee7f
9916f26
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,114 @@ | ||
// Licensed to Elasticsearch B.V. under one or more contributor | ||
// license agreements. See the NOTICE file distributed with | ||
// this work for additional information regarding copyright | ||
// ownership. Elasticsearch B.V. licenses this file to you under | ||
// the Apache License, Version 2.0 (the "License"); you may | ||
// not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, | ||
// software distributed under the License is distributed on an | ||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
// KIND, either express or implied. See the License for the | ||
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
package jsontransform | ||
|
||
import ( | ||
"fmt" | ||
"strings" | ||
|
||
"github.com/pkg/errors" | ||
|
||
"github.com/elastic/beats/v7/libbeat/common" | ||
) | ||
|
||
// ExpandFields de-dots the keys in m by expanding them in-place into a | ||
// nested object structure, merging objects as necessary. If there are any | ||
// conflicts (i.e. a common prefix where one field is an object and another | ||
// is a non-object), an error will be returned. | ||
// | ||
// Note that ExpandFields is destructive, and in the case of an error the | ||
// map may be left in a semi-expanded state. | ||
func ExpandFields(m common.MapStr) error { | ||
for k, v := range m { | ||
newMap, newIsMap := getMap(v) | ||
if newIsMap { | ||
if err := ExpandFields(newMap); err != nil { | ||
return errors.Wrapf(err, "error expanding %q", k) | ||
} | ||
} | ||
if dot := strings.IndexRune(k, '.'); dot < 0 { | ||
continue | ||
} | ||
|
||
// Delete the dotted key. | ||
delete(m, k) | ||
|
||
// Put expands k, returning the original value if any. | ||
// | ||
// If v is a map then we will merge with an existing map if any, | ||
// otherwise there must not be an existing value. | ||
old, err := m.Put(k, v) | ||
if err != nil { | ||
// Put will return an error if we attempt to insert into a non-object value. | ||
return fmt.Errorf("cannot expand %q: found conflicting key", k) | ||
} | ||
if old == nil { | ||
continue | ||
} | ||
if !newIsMap { | ||
return fmt.Errorf("cannot expand %q: found existing (%T) value", k, old) | ||
} else { | ||
oldMap, oldIsMap := getMap(old) | ||
if !oldIsMap { | ||
return fmt.Errorf("cannot expand %q: found conflicting key", k) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This seems to happen on type conflict only. I think we have similar cases in metricbeat. In that case we modify the key for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this comment is effectively the same as #22849 (comment) - or is this something else? The intended behaviour is to recursively merge objects, returning an error if there are two matching keys which either both have scalar values, or with one having a scalar value and one having an object value. This is intentionally strict for the first implementation; we could later either relax by default, or add options to relax. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I'm ok if we follow up with this one later on. Yeah, the two comments belong together. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've opened #23135 to track this. |
||
} | ||
if err := mergeObjects(newMap, oldMap); err != nil { | ||
axw marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return errors.Wrapf(err, "cannot expand %q", k) | ||
} | ||
} | ||
} | ||
return nil | ||
} | ||
|
||
// mergeObjects deep merges the elements of rhs into lhs. | ||
// | ||
// mergeObjects will recursively combine the entries of | ||
// objects with the same key in each object. If there exist | ||
// two entries with the same key in each object which | ||
// are not both objects, then an error will result. | ||
func mergeObjects(lhs, rhs common.MapStr) error { | ||
for k, rhsValue := range rhs { | ||
lhsValue, ok := lhs[k] | ||
if !ok { | ||
lhs[k] = rhsValue | ||
continue | ||
} | ||
lhsMap, ok := getMap(lhsValue) | ||
if !ok { | ||
return fmt.Errorf("cannot merge %q: found (%T) value", k, lhsValue) | ||
} | ||
rhsMap, ok := getMap(rhsValue) | ||
if !ok { | ||
return fmt.Errorf("cannot merge %q: found (%T) value", k, rhsValue) | ||
} | ||
axw marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if err := mergeObjects(lhsMap, rhsMap); err != nil { | ||
return errors.Wrapf(err, "cannot merge %q", k) | ||
} | ||
} | ||
return nil | ||
} | ||
|
||
func getMap(v interface{}) (map[string]interface{}, bool) { | ||
switch v := v.(type) { | ||
case map[string]interface{}: | ||
return v, true | ||
case common.MapStr: | ||
return v, true | ||
} | ||
return nil, false | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,133 @@ | ||
// Licensed to Elasticsearch B.V. under one or more contributor | ||
// license agreements. See the NOTICE file distributed with | ||
// this work for additional information regarding copyright | ||
// ownership. Elasticsearch B.V. licenses this file to you under | ||
// the Apache License, Version 2.0 (the "License"); you may | ||
// not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, | ||
// software distributed under the License is distributed on an | ||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
// KIND, either express or implied. See the License for the | ||
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
package jsontransform | ||
|
||
import ( | ||
"testing" | ||
|
||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/require" | ||
|
||
"github.com/elastic/beats/v7/libbeat/common" | ||
) | ||
|
||
func TestExpand(t *testing.T) { | ||
type data struct { | ||
Event common.MapStr | ||
Expected common.MapStr | ||
Err string | ||
} | ||
tests := []data{ | ||
{ | ||
Event: common.MapStr{ | ||
"hello.world": 15, | ||
}, | ||
Expected: common.MapStr{ | ||
"hello": common.MapStr{ | ||
"world": 15, | ||
}, | ||
}, | ||
}, | ||
{ | ||
Event: common.MapStr{ | ||
"test": 15, | ||
}, | ||
Expected: common.MapStr{ | ||
"test": 15, | ||
}, | ||
}, | ||
{ | ||
Event: common.MapStr{ | ||
"test": 15, | ||
"hello.there": 1, | ||
"hello.world.ok": "test", | ||
"elastic.for": "search", | ||
}, | ||
Expected: common.MapStr{ | ||
"test": 15, | ||
"hello": common.MapStr{ | ||
"there": 1, | ||
"world": common.MapStr{ | ||
"ok": "test", | ||
}, | ||
}, | ||
"elastic": common.MapStr{ | ||
"for": "search", | ||
}, | ||
}, | ||
}, | ||
{ | ||
Event: common.MapStr{ | ||
"root": common.MapStr{ | ||
"ok": 1, | ||
}, | ||
"root.shared": "yes", | ||
"root.one.two.three": 4, | ||
}, | ||
Expected: common.MapStr{ | ||
"root": common.MapStr{ | ||
"ok": 1, | ||
"shared": "yes", | ||
"one": common.MapStr{"two": common.MapStr{"three": 4}}, | ||
}, | ||
}, | ||
}, | ||
{ | ||
Event: common.MapStr{ | ||
"root": common.MapStr{ | ||
"seven": 1, | ||
}, | ||
"root.seven.eight": 2, | ||
}, | ||
Err: `cannot expand .*`, | ||
}, | ||
{ | ||
Event: common.MapStr{ | ||
"a.b": 1, | ||
"a": common.MapStr{ | ||
"b": 2, | ||
}, | ||
}, | ||
Err: `cannot expand .*`, | ||
}, | ||
{ | ||
Event: common.MapStr{ | ||
"a.b": common.MapStr{ | ||
"c": common.MapStr{ | ||
"d": 1, | ||
}, | ||
}, | ||
"a.b.c": common.MapStr{ | ||
"d": 2, | ||
}, | ||
}, | ||
Err: `cannot expand .*`, | ||
}, | ||
} | ||
|
||
for _, test := range tests { | ||
err := ExpandFields(test.Event) | ||
if test.Err != "" { | ||
require.Error(t, err) | ||
assert.Regexp(t, test.Err, err.Error()) | ||
continue | ||
} | ||
require.NoError(t, err) | ||
assert.Equal(t, test.Expected, test.Event) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,8 +27,15 @@ import ( | |
) | ||
|
||
// WriteJSONKeys writes the json keys to the given event based on the overwriteKeys option and the addErrKey | ||
func WriteJSONKeys(event *beat.Event, keys map[string]interface{}, overwriteKeys bool, addErrKey bool) { | ||
func WriteJSONKeys(event *beat.Event, keys map[string]interface{}, expandKeys, overwriteKeys, addErrKey bool) { | ||
logger := logp.NewLogger("jsonhelper") | ||
if expandKeys { | ||
if err := ExpandFields(keys); err != nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if we error here There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was expecting the original document to show up under There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I forgot to respond to the first part:
As long as we include the original input (in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The JSON decoder in the log input does not store the original raw line in the Anyways, the new fields are merged into the event after expansion via |
||
logger.Errorf("JSON: failed to expand fields: %s", err) | ||
event.SetErrorWithOption(createJSONError(err.Error()), addErrKey) | ||
return | ||
} | ||
} | ||
if !overwriteKeys { | ||
// @timestamp and @metadata fields are root-level fields. We remove them so they | ||
// don't become part of event.Fields. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be possible / make sense to make this a method on
common.MapStr
instead?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It certainly is. I gathered from #20489 (comment) that @urso would prefer not to add to MapStr, but I can rearrange if preferred. Unless there's an expectation of reuse I typically avoid adding to common types/packages to avoid creating huge interfaces.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed. The
MapStr
interface is too big with redundant functionality at times. I'd rather have a small interface for Events in the future with a set of functions that operate on the public interface.If
ExpandFields
is not used somewhere else I would not export it (keep package interface smaller).For consistency, if it is supposed to be used in other places, move it (as function) to the libbeat/common/mapstr.go. The
libbeat/common
package is where MapStr and helpers for MapStr currently live in.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, it's not needed elsewhere (I originally thought it would be) - so I'll unexport it.