Skip to content

Commit

Permalink
Add 'expand_keys' option to JSON input/processor (elastic#22849)
Browse files Browse the repository at this point in the history
Co-authored-by: Brandon Morelli <[email protected]>
(cherry picked from commit 4f4a553)
  • Loading branch information
axw authored and urso committed Dec 14, 2020
1 parent 9892811 commit f80771f
Show file tree
Hide file tree
Showing 15 changed files with 383 additions and 8 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add support for ephemeral containers in kubernetes autodiscover and `add_kubernetes_metadata`. {pull}22389[22389] {pull}22439[22439]
- Added support for wildcard fields and keyword fallback in beats setup commands. {pull}22521[22521]
- Fix polling node when it is not ready and monitor by hostname {pull}22666[22666]
- Add `expand_keys` option to `decode_json_fields` processor and `json` input, to recusively de-dot and expand json keys into hierarchical object structures {pull}22849[22849]
- Update k8s client and release k8s leader lock gracefully {pull}22919[22919]
- Add tini as init system in docker images {pull}22137[22137]
- Added "detect_mime_type" processor for detecting mime types {pull}22940[22940]
Expand Down
5 changes: 5 additions & 0 deletions filebeat/_meta/config/filebeat.inputs.reference.yml.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,11 @@ filebeat.inputs:
# in case of conflicts.
#json.overwrite_keys: false

# If this setting is enabled, then keys in the decoded JSON object will be recursively
# de-dotted, and expanded into a hierarchical object structure.
# For example, `{"a.b.c": 123}` would be expanded into `{"a":{"b":{"c":123}}}`.
#json.expand_keys: false

# If this setting is enabled, Filebeat adds a "error.message" and "error.key: json" key in case of JSON
# unmarshaling errors or when a text key is defined in the configuration but cannot
# be used.
Expand Down
7 changes: 6 additions & 1 deletion filebeat/docs/inputs/input-common-harvester-options.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,12 @@ level in the output document. The default is false.
values from the decoded JSON object overwrite the fields that {beatname_uc}
normally adds (type, source, offset, etc.) in case of conflicts.

*`expand_keys`*:: If this setting is enabled, {beatname_uc} will recursively
de-dot keys in the decoded JSON, and expand them into a hierarchical object
structure. For example, `{"a.b.c": 123}` would be expanded into `{"a":{"b":{"c":123}}}`.
This setting should be enabled when the input is produced by an
https://github.com/elastic/ecs-logging[ECS logger].

*`add_error_key`*:: If this setting is enabled, {beatname_uc} adds a
"error.message" and "error.type: json" key in case of JSON unmarshalling errors
or when a `message_key` is defined in the configuration but cannot be used.
Expand All @@ -206,4 +212,3 @@ Options that control how {beatname_uc} deals with log messages that span
multiple lines. See <<multiline-examples>> for more information about
configuring multiline options.


5 changes: 5 additions & 0 deletions filebeat/filebeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,11 @@ filebeat.inputs:
# in case of conflicts.
#json.overwrite_keys: false

# If this setting is enabled, then keys in the decoded JSON object will be recursively
# de-dotted, and expanded into a hierarchical object structure.
# For example, `{"a.b.c": 123}` would be expanded into `{"a":{"b":{"c":123}}}`.
#json.expand_keys: false

# If this setting is enabled, Filebeat adds a "error.message" and "error.key: json" key in case of JSON
# unmarshaling errors or when a text key is defined in the configuration but cannot
# be used.
Expand Down
114 changes: 114 additions & 0 deletions libbeat/common/jsontransform/expand.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package jsontransform

import (
"fmt"
"strings"

"github.com/pkg/errors"

"github.com/elastic/beats/v7/libbeat/common"
)

// expandFields de-dots the keys in m by expanding them in-place into a
// nested object structure, merging objects as necessary. If there are any
// conflicts (i.e. a common prefix where one field is an object and another
// is a non-object), an error will be returned.
//
// Note that expandFields is destructive, and in the case of an error the
// map may be left in a semi-expanded state.
func expandFields(m common.MapStr) error {
for k, v := range m {
newMap, newIsMap := getMap(v)
if newIsMap {
if err := expandFields(newMap); err != nil {
return errors.Wrapf(err, "error expanding %q", k)
}
}
if dot := strings.IndexRune(k, '.'); dot < 0 {
continue
}

// Delete the dotted key.
delete(m, k)

// Put expands k, returning the original value if any.
//
// If v is a map then we will merge with an existing map if any,
// otherwise there must not be an existing value.
old, err := m.Put(k, v)
if err != nil {
// Put will return an error if we attempt to insert into a non-object value.
return fmt.Errorf("cannot expand %q: found conflicting key", k)
}
if old == nil {
continue
}
if !newIsMap {
return fmt.Errorf("cannot expand %q: found existing (%T) value", k, old)
} else {
oldMap, oldIsMap := getMap(old)
if !oldIsMap {
return fmt.Errorf("cannot expand %q: found conflicting key", k)
}
if err := mergeObjects(newMap, oldMap); err != nil {
return errors.Wrapf(err, "cannot expand %q", k)
}
}
}
return nil
}

// mergeObjects deep merges the elements of rhs into lhs.
//
// mergeObjects will recursively combine the entries of
// objects with the same key in each object. If there exist
// two entries with the same key in each object which
// are not both objects, then an error will result.
func mergeObjects(lhs, rhs common.MapStr) error {
for k, rhsValue := range rhs {
lhsValue, ok := lhs[k]
if !ok {
lhs[k] = rhsValue
continue
}
lhsMap, ok := getMap(lhsValue)
if !ok {
return fmt.Errorf("cannot merge %q: found (%T) value", k, lhsValue)
}
rhsMap, ok := getMap(rhsValue)
if !ok {
return fmt.Errorf("cannot merge %q: found (%T) value", k, rhsValue)
}
if err := mergeObjects(lhsMap, rhsMap); err != nil {
return errors.Wrapf(err, "cannot merge %q", k)
}
}
return nil
}

func getMap(v interface{}) (map[string]interface{}, bool) {
switch v := v.(type) {
case map[string]interface{}:
return v, true
case common.MapStr:
return v, true
}
return nil, false
}
133 changes: 133 additions & 0 deletions libbeat/common/jsontransform/expand_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package jsontransform

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/libbeat/common"
)

func TestExpand(t *testing.T) {
type data struct {
Event common.MapStr
Expected common.MapStr
Err string
}
tests := []data{
{
Event: common.MapStr{
"hello.world": 15,
},
Expected: common.MapStr{
"hello": common.MapStr{
"world": 15,
},
},
},
{
Event: common.MapStr{
"test": 15,
},
Expected: common.MapStr{
"test": 15,
},
},
{
Event: common.MapStr{
"test": 15,
"hello.there": 1,
"hello.world.ok": "test",
"elastic.for": "search",
},
Expected: common.MapStr{
"test": 15,
"hello": common.MapStr{
"there": 1,
"world": common.MapStr{
"ok": "test",
},
},
"elastic": common.MapStr{
"for": "search",
},
},
},
{
Event: common.MapStr{
"root": common.MapStr{
"ok": 1,
},
"root.shared": "yes",
"root.one.two.three": 4,
},
Expected: common.MapStr{
"root": common.MapStr{
"ok": 1,
"shared": "yes",
"one": common.MapStr{"two": common.MapStr{"three": 4}},
},
},
},
{
Event: common.MapStr{
"root": common.MapStr{
"seven": 1,
},
"root.seven.eight": 2,
},
Err: `cannot expand .*`,
},
{
Event: common.MapStr{
"a.b": 1,
"a": common.MapStr{
"b": 2,
},
},
Err: `cannot expand .*`,
},
{
Event: common.MapStr{
"a.b": common.MapStr{
"c": common.MapStr{
"d": 1,
},
},
"a.b.c": common.MapStr{
"d": 2,
},
},
Err: `cannot expand .*`,
},
}

for _, test := range tests {
err := expandFields(test.Event)
if test.Err != "" {
require.Error(t, err)
assert.Regexp(t, test.Err, err.Error())
continue
}
require.NoError(t, err)
assert.Equal(t, test.Expected, test.Event)
}
}
9 changes: 8 additions & 1 deletion libbeat/common/jsontransform/jsonhelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,15 @@ import (
)

// WriteJSONKeys writes the json keys to the given event based on the overwriteKeys option and the addErrKey
func WriteJSONKeys(event *beat.Event, keys map[string]interface{}, overwriteKeys bool, addErrKey bool) {
func WriteJSONKeys(event *beat.Event, keys map[string]interface{}, expandKeys, overwriteKeys, addErrKey bool) {
logger := logp.NewLogger("jsonhelper")
if expandKeys {
if err := expandFields(keys); err != nil {
logger.Errorf("JSON: failed to expand fields: %s", err)
event.SetErrorWithOption(createJSONError(err.Error()), addErrKey)
return
}
}
if !overwriteKeys {
// @timestamp and @metadata fields are root-level fields. We remove them so they
// don't become part of event.Fields.
Expand Down
42 changes: 41 additions & 1 deletion libbeat/common/jsontransform/jsonhelper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func TestWriteJSONKeys(t *testing.T) {

tests := map[string]struct {
keys map[string]interface{}
expandKeys bool
overwriteKeys bool
expectedMetadata common.MapStr
expectedTimestamp time.Time
Expand Down Expand Up @@ -117,6 +118,45 @@ func TestWriteJSONKeys(t *testing.T) {
"top_c": "COMPLETELY_NEW_c",
},
},
"expand_true": {
expandKeys: true,
overwriteKeys: true,
keys: map[string]interface{}{
"top_b": map[string]interface{}{
"inner_d.inner_e": "COMPLETELY_NEW_e",
},
},
expectedMetadata: eventMetadata.Clone(),
expectedTimestamp: eventTimestamp,
expectedFields: common.MapStr{
"top_a": 23,
"top_b": common.MapStr{
"inner_c": "see",
"inner_d": common.MapStr{
"inner_e": "COMPLETELY_NEW_e",
},
},
},
},
"expand_false": {
expandKeys: false,
overwriteKeys: true,
keys: map[string]interface{}{
"top_b": map[string]interface{}{
"inner_d.inner_e": "COMPLETELY_NEW_e",
},
},
expectedMetadata: eventMetadata.Clone(),
expectedTimestamp: eventTimestamp,
expectedFields: common.MapStr{
"top_a": 23,
"top_b": common.MapStr{
"inner_c": "see",
"inner_d": "dee",
"inner_d.inner_e": "COMPLETELY_NEW_e",
},
},
},
}

for name, test := range tests {
Expand All @@ -127,7 +167,7 @@ func TestWriteJSONKeys(t *testing.T) {
Fields: eventFields.Clone(),
}

WriteJSONKeys(event, test.keys, test.overwriteKeys, false)
WriteJSONKeys(event, test.keys, test.expandKeys, test.overwriteKeys, false)
require.Equal(t, test.expectedMetadata, event.Meta)
require.Equal(t, test.expectedTimestamp.UnixNano(), event.Timestamp.UnixNano())
require.Equal(t, test.expectedFields, event.Fields)
Expand Down
Loading

0 comments on commit f80771f

Please sign in to comment.