From b36610a2824aef232fe12de933d31699095eb52d Mon Sep 17 00:00:00 2001 From: Silvia Mitter Date: Tue, 5 Mar 2019 20:57:16 +0100 Subject: [PATCH] Allow ConcatFields for nodes of type object (#10959) (#11094) Checks if given field conflicts with current fields. Fixes an error where `append_fields` could not add valid field. --- CHANGELOG-developer.next.asciidoc | 1 + libbeat/common/field.go | 63 ++++++++ libbeat/common/field_test.go | 240 ++++++++++++++++++++++++++++++ libbeat/template/template.go | 18 +-- libbeat/template/template_test.go | 2 +- 5 files changed, 306 insertions(+), 18 deletions(-) diff --git a/CHANGELOG-developer.next.asciidoc b/CHANGELOG-developer.next.asciidoc index dadbbd7108ce..a471818f0d77 100644 --- a/CHANGELOG-developer.next.asciidoc +++ b/CHANGELOG-developer.next.asciidoc @@ -21,5 +21,6 @@ The list below covers the major changes between 6.6.0 and 6.6 head only. ==== Breaking changes ==== Bugfixes +- Fix duplication check for `append_fields` option. {pull}10959[10959] ==== Added diff --git a/libbeat/common/field.go b/libbeat/common/field.go index d69a75e90bf8..687d431715e5 100644 --- a/libbeat/common/field.go +++ b/libbeat/common/field.go @@ -21,6 +21,9 @@ import ( "fmt" "strings" + "github.com/joeshaw/multierror" + "github.com/pkg/errors" + "github.com/elastic/go-ucfg/yaml" ) @@ -216,3 +219,63 @@ func (f Fields) getKeys(namespace string) []string { return keys } + +// ConcatFields concatenates two Fields lists into a new list. +// The operation fails if the input definitions define the same keys. +func ConcatFields(a, b Fields) (Fields, error) { + if len(b) == 0 { + return a, nil + } + if len(a) == 0 { + return b, nil + } + + // check for duplicates + if err := a.conflicts(b); err != nil { + return nil, err + } + + // concat a+b into new array + fields := make(Fields, 0, len(a)+len(b)) + return append(append(fields, a...), b...), nil +} + +func (f Fields) conflicts(fields Fields) error { + var errs multierror.Errors + for _, key := range fields.GetKeys() { + keys := strings.Split(key, ".") + if err := f.canConcat(key, keys); err != nil { + errs = append(errs, err) + } + } + return errs.Err() +} + +// canConcat checks if the given string can be concatenated to the existing fields f +// a key cannot be concatenated if +// - f has a node with name key +// - f has a leaf with key's parent name and the leaf's type is not `object` +func (f Fields) canConcat(k string, keys []string) error { + if len(keys) == 0 { + return nil + } + key := keys[0] + keys = keys[1:] + for _, field := range f { + if field.Name != key { + continue + } + // last key to compare + if len(keys) == 0 { + return errors.Errorf("fields contain key <%s>", k) + } + // last field to compare, only valid if it is of type object + if len(field.Fields) == 0 { + if field.Type != "object" { + return errors.Errorf("fields contain non object node conflicting with key <%s>", k) + } + } + return field.Fields.canConcat(k, keys) + } + return nil +} diff --git a/libbeat/common/field_test.go b/libbeat/common/field_test.go index ea1fe2729c00..3cc61b610954 100644 --- a/libbeat/common/field_test.go +++ b/libbeat/common/field_test.go @@ -18,6 +18,7 @@ package common import ( + "strings" "testing" "github.com/stretchr/testify/assert" @@ -25,6 +26,58 @@ import ( "github.com/elastic/go-ucfg/yaml" ) +func TestFieldsHasNode(t *testing.T) { + tests := map[string]struct { + node string + fields Fields + hasNode bool + }{ + "empty fields": { + node: "a.b", + fields: Fields{}, + hasNode: false, + }, + "no node": { + node: "", + fields: Fields{Field{Name: "a"}}, + hasNode: false, + }, + "key not in fields, but node in fields": { + node: "a.b.c", + fields: Fields{ + Field{Name: "a", Fields: Fields{Field{Name: "b"}}}, + }, + hasNode: true, + }, + "last node in fields": { + node: "a.b.c", + fields: Fields{ + Field{Name: "a", Fields: Fields{ + Field{Name: "b", Fields: Fields{ + Field{Name: "c"}, + }}}}, + }, + hasNode: true, + }, + "node in fields": { + node: "a.b", + fields: Fields{ + Field{Name: "a", Fields: Fields{ + Field{Name: "b", Fields: Fields{ + Field{Name: "c"}, + }}}}, + }, + hasNode: true, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + assert.Equal(t, test.hasNode, test.fields.HasNode(test.node)) + }) + } +} + func TestFieldsHasKey(t *testing.T) { tests := []struct { key string @@ -189,3 +242,190 @@ func TestGetKeys(t *testing.T) { assert.Equal(t, test.keys, test.fields.GetKeys()) } } + +func TestFieldConcat(t *testing.T) { + tests := map[string]struct { + a, b Fields + want Fields + err string + }{ + "empty lists": {}, + "first list only": { + a: Fields{{Name: "a"}}, + want: Fields{{Name: "a"}}, + }, + "second list only": { + b: Fields{{Name: "a"}}, + want: Fields{{Name: "a"}}, + }, + "concat": { + a: Fields{{Name: "a"}}, + b: Fields{{Name: "b"}}, + want: Fields{{Name: "a"}, {Name: "b"}}, + }, + "duplicates fail": { + a: Fields{{Name: "a"}}, + b: Fields{{Name: "a"}}, + err: "1 error: fields contain key ", + }, + "nested with common prefix": { + a: Fields{{ + Name: "a", + Fields: Fields{{Name: "b"}}, + }}, + b: Fields{{ + Name: "a", + Fields: Fields{{Name: "c"}}, + }}, + want: Fields{ + {Name: "a", Fields: Fields{{Name: "b"}}}, + {Name: "a", Fields: Fields{{Name: "c"}}}, + }, + }, + "deep nested with common prefix": { + a: Fields{{ + Name: "a", + Fields: Fields{{Name: "b"}}, + }}, + b: Fields{{ + Name: "a", + Fields: Fields{{Name: "c", Fields: Fields{ + {Name: "d"}, + }}}, + }}, + want: Fields{ + {Name: "a", Fields: Fields{{Name: "b"}}}, + {Name: "a", Fields: Fields{{Name: "c", Fields: Fields{{Name: "d"}}}}}, + }, + }, + "nested duplicates fail": { + a: Fields{{ + Name: "a", + Fields: Fields{{Name: "b"}, {Name: "c"}}, + }}, + b: Fields{{ + Name: "a", + Fields: Fields{{Name: "c"}}, + }}, + err: "1 error: fields contain key ", + }, + "a is prefix of b": { + a: Fields{{Name: "a"}}, + b: Fields{{ + Name: "a", + Fields: Fields{{Name: "b"}}, + }}, + err: "1 error: fields contain non object node conflicting with key ", + }, + "a is object and prefix of b": { + a: Fields{{Name: "a", Type: "object"}}, + b: Fields{{ + Name: "a", + Fields: Fields{{Name: "b"}}, + }}, + want: Fields{ + {Name: "a", Type: "object"}, + {Name: "a", Fields: Fields{{Name: "b"}}}, + }, + }, + "b is prefix of a": { + a: Fields{{ + Name: "a", + Fields: Fields{{Name: "b"}}, + }}, + b: Fields{{Name: "a"}}, + err: "1 error: fields contain key ", + }, + "multiple errors": { + a: Fields{ + {Name: "a", Fields: Fields{{Name: "b"}}}, + {Name: "foo", Fields: Fields{{Name: "b"}}}, + {Name: "bar", Type: "object"}, + }, + b: Fields{ + {Name: "bar", Fields: Fields{{Name: "foo"}}}, + {Name: "a"}, + {Name: "foo", Fields: Fields{{Name: "b", Fields: Fields{{Name: "c"}}}}}, + }, + + err: "2 errors: fields contain key ; fields contain non object node conflicting with key ", + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + fs, err := ConcatFields(test.a, test.b) + if test.err == "" { + assert.NoError(t, err) + assert.Equal(t, test.want, fs) + return + } + if assert.Error(t, err) { + assert.Equal(t, test.err, err.Error()) + } + }) + } +} + +func TestFieldsCanConcat(t *testing.T) { + tests := map[string]struct { + key string + fields Fields + err string + }{ + "empty fields": { + key: "a.b", + fields: Fields{}, + }, + "no key": { + key: "", + fields: Fields{Field{Name: "a"}}, + }, + "key not in fields, but parent node in fields": { + key: "a.b.c", + fields: Fields{ + Field{Name: "a", Fields: Fields{Field{Name: "b"}}}, + }, + err: "fields contain non object node conflicting with key ", + }, + "key not in fields, but parent node in fields and of type object": { + key: "a.b.c", + fields: Fields{ + Field{Name: "a", Fields: Fields{Field{Name: "b", Type: "object"}}}, + }, + }, + "last node in fields": { + key: "a.b.c", + fields: Fields{ + Field{Name: "a", Fields: Fields{ + Field{Name: "b", Fields: Fields{ + Field{Name: "c"}, + }}}}, + }, + err: "fields contain key ", + }, + "node in fields": { + key: "a.b", + fields: Fields{ + Field{Name: "a", Fields: Fields{ + Field{Name: "b", Fields: Fields{ + Field{Name: "c"}, + }}}}, + }, + err: "fields contain key ", + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + err := test.fields.canConcat(test.key, strings.Split(test.key, ".")) + if test.err == "" { + assert.Nil(t, err) + return + } + if assert.Error(t, err) { + assert.Equal(t, test.err, err.Error()) + } + }) + } +} diff --git a/libbeat/template/template.go b/libbeat/template/template.go index 1e7070f13aa6..98fff7cfb795 100644 --- a/libbeat/template/template.go +++ b/libbeat/template/template.go @@ -121,7 +121,7 @@ func (t *Template) load(fields common.Fields) (common.MapStr, error) { var err error if len(t.config.AppendFields) > 0 { cfgwarn.Experimental("append_fields is used.") - fields, err = appendFields(fields, t.config.AppendFields) + fields, err = common.ConcatFields(fields, t.config.AppendFields) if err != nil { return nil, err } @@ -256,22 +256,6 @@ func (t *Template) Generate(properties common.MapStr, dynamicTemplates []common. return basicStructure } -func appendFields(fields, appendFields common.Fields) (common.Fields, error) { - if len(appendFields) > 0 { - appendFieldKeys := appendFields.GetKeys() - - // Append is only allowed to add fields, not overwrite - for _, key := range appendFieldKeys { - if fields.HasNode(key) { - return nil, fmt.Errorf("append_fields contains an already existing key: %s", key) - } - } - // Appends fields to existing fields - fields = append(fields, appendFields...) - } - return fields, nil -} - func loadYamlByte(data []byte) (common.Fields, error) { var keys []common.Field diff --git a/libbeat/template/template_test.go b/libbeat/template/template_test.go index 701f692737f6..15834211dc3f 100644 --- a/libbeat/template/template_test.go +++ b/libbeat/template/template_test.go @@ -171,7 +171,7 @@ func TestAppendFields(t *testing.T) { } for _, test := range tests { - _, err := appendFields(test.fields, test.appendFields) + _, err := common.ConcatFields(test.fields, test.appendFields) if test.error { assert.Error(t, err) } else {