Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add 'expand_keys' option to JSON input/processor #22849

Merged
merged 11 commits into from
Dec 14, 2020
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add support for ephemeral containers in kubernetes autodiscover and `add_kubernetes_metadata`. {pull}22389[22389] {pull}22439[22439]
- Added support for wildcard fields and keyword fallback in beats setup commands. {pull}22521[22521]
- Fix polling node when it is not ready and monitor by hostname {pull}22666[22666]
- Add `expand_keys` option to `decode_json_fields` processor and `json` input, to recusively de-dot and expand json keys into hierarchical object structures {pull}22849[22849]
- Improve equals check. {pull}22778[22778]

*Auditbeat*
Expand Down
5 changes: 5 additions & 0 deletions filebeat/_meta/config/filebeat.inputs.reference.yml.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,11 @@ filebeat.inputs:
# in case of conflicts.
#json.overwrite_keys: false

# If this setting is enabled, then keys in the decoded JSON object will be recursively
# de-dotted, and expanded into a hierarchical object structure.
# For example, `{"a.b.c": 123}` would be expanded into `{"a":{"b":{"c":123}}}`.
#json.expand_keys: false

# If this setting is enabled, Filebeat adds a "error.message" and "error.key: json" key in case of JSON
# unmarshaling errors or when a text key is defined in the configuration but cannot
# be used.
Expand Down
7 changes: 6 additions & 1 deletion filebeat/docs/inputs/input-common-harvester-options.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,12 @@ level in the output document. The default is false.
values from the decoded JSON object overwrite the fields that {beatname_uc}
normally adds (type, source, offset, etc.) in case of conflicts.

*`expand_keys`*:: If this setting is enabled, {beatname_uc} will recursively
de-dot keys in the decoded JSON, and expand them into a hierarchical object
structure. For example, `{"a.b.c": 123}` would be expanded into `{"a":{"b":{"c":123}}}`.
This setting should be enabled when the input is produced by an
https://github.com/elastic/ecs-logging[ECS logger].

*`add_error_key`*:: If this setting is enabled, {beatname_uc} adds a
"error.message" and "error.type: json" key in case of JSON unmarshalling errors
or when a `message_key` is defined in the configuration but cannot be used.
Expand All @@ -206,4 +212,3 @@ Options that control how {beatname_uc} deals with log messages that span
multiple lines. See <<multiline-examples>> for more information about
configuring multiline options.


5 changes: 5 additions & 0 deletions filebeat/filebeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,11 @@ filebeat.inputs:
# in case of conflicts.
#json.overwrite_keys: false

# If this setting is enabled, then keys in the decoded JSON object will be recursively
# de-dotted, and expanded into a hierarchical object structure.
# For example, `{"a.b.c": 123}` would be expanded into `{"a":{"b":{"c":123}}}`.
#json.expand_keys: false

# If this setting is enabled, Filebeat adds a "error.message" and "error.key: json" key in case of JSON
# unmarshaling errors or when a text key is defined in the configuration but cannot
# be used.
Expand Down
114 changes: 114 additions & 0 deletions libbeat/common/jsontransform/expand.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package jsontransform

import (
"fmt"
"strings"

"github.com/pkg/errors"

"github.com/elastic/beats/v7/libbeat/common"
)

// ExpandFields de-dots the keys in m by expanding them in-place into a
// nested object structure, merging objects as necessary. If there are any
// conflicts (i.e. a common prefix where one field is an object and another
// is a non-object), an error will be returned.
//
// Note that ExpandFields is destructive, and in the case of an error the
// map may be left in a semi-expanded state.
func ExpandFields(m common.MapStr) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible / make sense to make this a method on common.MapStr instead?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It certainly is. I gathered from #20489 (comment) that @urso would prefer not to add to MapStr, but I can rearrange if preferred. Unless there's an expectation of reuse I typically avoid adding to common types/packages to avoid creating huge interfaces.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless there's an expectation of reuse I typically avoid adding to common types/packages to avoid creating huge interfaces.

Agreed. The MapStr interface is too big with redundant functionality at times. I'd rather have a small interface for Events in the future with a set of functions that operate on the public interface.

If ExpandFields is not used somewhere else I would not export it (keep package interface smaller).
For consistency, if it is supposed to be used in other places, move it (as function) to the libbeat/common/mapstr.go. The libbeat/common package is where MapStr and helpers for MapStr currently live in.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, it's not needed elsewhere (I originally thought it would be) - so I'll unexport it.

for k, v := range m {
newMap, newIsMap := getMap(v)
if newIsMap {
if err := ExpandFields(newMap); err != nil {
return errors.Wrapf(err, "error expanding %q", k)
}
}
if dot := strings.IndexRune(k, '.'); dot < 0 {
continue
}

// Delete the dotted key.
delete(m, k)

// Put expands k, returning the original value if any.
//
// If v is a map then we will merge with an existing map if any,
// otherwise there must not be an existing value.
old, err := m.Put(k, v)
if err != nil {
// Put will return an error if we attempt to insert into a non-object value.
return fmt.Errorf("cannot expand %q: found conflicting key", k)
}
if old == nil {
continue
}
if !newIsMap {
return fmt.Errorf("cannot expand %q: found existing (%T) value", k, old)
} else {
oldMap, oldIsMap := getMap(old)
if !oldIsMap {
return fmt.Errorf("cannot expand %q: found conflicting key", k)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to happen on type conflict only. I think we have similar cases in metricbeat. In that case we modify the key for old to be <k>.value. If the new object has a field named value we can drop old (because it is overwritten).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this comment is effectively the same as #22849 (comment) - or is this something else?

The intended behaviour is to recursively merge objects, returning an error if there are two matching keys which either both have scalar values, or with one having a scalar value and one having an object value. This is intentionally strict for the first implementation; we could later either relax by default, or add options to relax.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is intentionally strict for the first implementation; we could later either relax by default, or add options to relax.

I'm ok if we follow up with this one later on.

Yeah, the two comments belong together.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've opened #23135 to track this.

}
if err := mergeObjects(newMap, oldMap); err != nil {
axw marked this conversation as resolved.
Show resolved Hide resolved
return errors.Wrapf(err, "cannot expand %q", k)
}
}
}
return nil
}

// mergeObjects deep merges the elements of rhs into lhs.
//
// mergeObjects will recursively combine the entries of
// objects with the same key in each object. If there exist
// two entries with the same key in each object which
// are not both objects, then an error will result.
func mergeObjects(lhs, rhs common.MapStr) error {
for k, rhsValue := range rhs {
lhsValue, ok := lhs[k]
if !ok {
lhs[k] = rhsValue
continue
}
lhsMap, ok := getMap(lhsValue)
if !ok {
return fmt.Errorf("cannot merge %q: found (%T) value", k, lhsValue)
}
rhsMap, ok := getMap(rhsValue)
if !ok {
return fmt.Errorf("cannot merge %q: found (%T) value", k, rhsValue)
}
axw marked this conversation as resolved.
Show resolved Hide resolved
if err := mergeObjects(lhsMap, rhsMap); err != nil {
return errors.Wrapf(err, "cannot merge %q", k)
}
}
return nil
}

func getMap(v interface{}) (map[string]interface{}, bool) {
switch v := v.(type) {
case map[string]interface{}:
return v, true
case common.MapStr:
return v, true
}
return nil, false
}
133 changes: 133 additions & 0 deletions libbeat/common/jsontransform/expand_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package jsontransform

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/libbeat/common"
)

func TestExpand(t *testing.T) {
type data struct {
Event common.MapStr
Expected common.MapStr
Err string
}
tests := []data{
{
Event: common.MapStr{
"hello.world": 15,
},
Expected: common.MapStr{
"hello": common.MapStr{
"world": 15,
},
},
},
{
Event: common.MapStr{
"test": 15,
},
Expected: common.MapStr{
"test": 15,
},
},
{
Event: common.MapStr{
"test": 15,
"hello.there": 1,
"hello.world.ok": "test",
"elastic.for": "search",
},
Expected: common.MapStr{
"test": 15,
"hello": common.MapStr{
"there": 1,
"world": common.MapStr{
"ok": "test",
},
},
"elastic": common.MapStr{
"for": "search",
},
},
},
{
Event: common.MapStr{
"root": common.MapStr{
"ok": 1,
},
"root.shared": "yes",
"root.one.two.three": 4,
},
Expected: common.MapStr{
"root": common.MapStr{
"ok": 1,
"shared": "yes",
"one": common.MapStr{"two": common.MapStr{"three": 4}},
},
},
},
{
Event: common.MapStr{
"root": common.MapStr{
"seven": 1,
},
"root.seven.eight": 2,
},
Err: `cannot expand .*`,
},
{
Event: common.MapStr{
"a.b": 1,
"a": common.MapStr{
"b": 2,
},
},
Err: `cannot expand .*`,
},
{
Event: common.MapStr{
"a.b": common.MapStr{
"c": common.MapStr{
"d": 1,
},
},
"a.b.c": common.MapStr{
"d": 2,
},
},
Err: `cannot expand .*`,
},
}

for _, test := range tests {
err := ExpandFields(test.Event)
if test.Err != "" {
require.Error(t, err)
assert.Regexp(t, test.Err, err.Error())
continue
}
require.NoError(t, err)
assert.Equal(t, test.Expected, test.Event)
}
}
9 changes: 8 additions & 1 deletion libbeat/common/jsontransform/jsonhelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,15 @@ import (
)

// WriteJSONKeys writes the json keys to the given event based on the overwriteKeys option and the addErrKey
func WriteJSONKeys(event *beat.Event, keys map[string]interface{}, overwriteKeys bool, addErrKey bool) {
func WriteJSONKeys(event *beat.Event, keys map[string]interface{}, expandKeys, overwriteKeys, addErrKey bool) {
logger := logp.NewLogger("jsonhelper")
if expandKeys {
if err := ExpandFields(keys); err != nil {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we error here keys is in an unknown state. Do we need to clone keys before this call in order to keep it intact on error? Logging the original document would be needed for users to understand why things did go wrong.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was expecting the original document to show up under message, like when a JSON decoding error occurs. That doesn't happen though. Is there any reason why we shouldn't do that, for a consistent debugging experience, instead of logging?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I forgot to respond to the first part:

if we error here keys is in an unknown state. Do we need to clone keys before this call in order to keep it intact on error?

As long as we include the original input (in message), I don't see a need. I'm not intimately familiar with Filebeat though.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The JSON decoder in the log input does not store the original raw line in the message field. One can configure a custom message field (which is extracted from the json document, not the original line), but by default message will not be set.

Anyways, the new fields are merged into the event after expansion via WriteJSONFields. Neither the processor nor the json decoder in the log input reference to any fields in keys (or keys itself). This should make the operation safe. No need to clone.

logger.Errorf("JSON: failed to expand fields: %s", err)
event.SetErrorWithOption(createJSONError(err.Error()), addErrKey)
return
}
}
if !overwriteKeys {
// @timestamp and @metadata fields are root-level fields. We remove them so they
// don't become part of event.Fields.
Expand Down
42 changes: 41 additions & 1 deletion libbeat/common/jsontransform/jsonhelper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func TestWriteJSONKeys(t *testing.T) {

tests := map[string]struct {
keys map[string]interface{}
expandKeys bool
overwriteKeys bool
expectedMetadata common.MapStr
expectedTimestamp time.Time
Expand Down Expand Up @@ -117,6 +118,45 @@ func TestWriteJSONKeys(t *testing.T) {
"top_c": "COMPLETELY_NEW_c",
},
},
"expand_true": {
expandKeys: true,
overwriteKeys: true,
keys: map[string]interface{}{
"top_b": map[string]interface{}{
"inner_d.inner_e": "COMPLETELY_NEW_e",
},
},
expectedMetadata: eventMetadata.Clone(),
expectedTimestamp: eventTimestamp,
expectedFields: common.MapStr{
"top_a": 23,
"top_b": common.MapStr{
"inner_c": "see",
"inner_d": common.MapStr{
"inner_e": "COMPLETELY_NEW_e",
},
},
},
},
"expand_false": {
expandKeys: false,
overwriteKeys: true,
keys: map[string]interface{}{
"top_b": map[string]interface{}{
"inner_d.inner_e": "COMPLETELY_NEW_e",
},
},
expectedMetadata: eventMetadata.Clone(),
expectedTimestamp: eventTimestamp,
expectedFields: common.MapStr{
"top_a": 23,
"top_b": common.MapStr{
"inner_c": "see",
"inner_d": "dee",
"inner_d.inner_e": "COMPLETELY_NEW_e",
},
},
},
}

for name, test := range tests {
Expand All @@ -127,7 +167,7 @@ func TestWriteJSONKeys(t *testing.T) {
Fields: eventFields.Clone(),
}

WriteJSONKeys(event, test.keys, test.overwriteKeys, false)
WriteJSONKeys(event, test.keys, test.expandKeys, test.overwriteKeys, false)
require.Equal(t, test.expectedMetadata, event.Meta)
require.Equal(t, test.expectedTimestamp.UnixNano(), event.Timestamp.UnixNano())
require.Equal(t, test.expectedFields, event.Fields)
Expand Down
Loading