Skip to content

Commit

Permalink
Relax validation of subobject leaves for flattened fields
Browse files Browse the repository at this point in the history
  • Loading branch information
jsoriano committed Sep 9, 2024
1 parent 25dfab5 commit d115a79
Show file tree
Hide file tree
Showing 2 changed files with 147 additions and 12 deletions.
67 changes: 55 additions & 12 deletions internal/fields/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -662,6 +662,11 @@ func (v *Validator) validateMapElement(root string, elem common.MapStr, doc comm
errs = append(errs, err...)
}
default:
if skipLeafOfObject(root, name, v.specVersion, v.Schema) {
// Till some versions we skip some validations on leaf of objects, check if it is the case.
break
}

err := v.validateScalarElement(key, val, doc)
if err != nil {
errs = append(errs, err)
Expand Down Expand Up @@ -820,6 +825,36 @@ func skipValidationForField(key string) bool {
isFieldFamilyMatching("event.module", key) // field is deprecated
}

// skipLeafOfObject checks if the element is a child of an object that was skipped in some previous
// version of the spec. This is relevant in documents that store fields without subobjects.
func skipLeafOfObject(root, name string, specVersion semver.Version, schema []FieldDefinition) bool {
if specVersion.LessThan(semver3_0_1) {
// Check if this is a subobject of an object we didn't traverse.
if !strings.Contains(name, ".") {
return false
}
key := name
if root != "" {
key = root + "." + name
}
_, ancestor := findAncestorElementDefinition(key, schema, func(key string, def *FieldDefinition) bool {
// Don't look for ancestors beyond root, these objects have been already traversed.
if len(key) < len(root) {
return false
}
if !slices.Contains([]string{"group", "object", "nested", "flattened"}, def.Type) {
return false
}
return true
})
if ancestor != nil {
return true
}
}

return false
}

func isFieldFamilyMatching(family, key string) bool {
return key == family || strings.HasPrefix(key, family+".")
}
Expand Down Expand Up @@ -858,19 +893,11 @@ func isArrayOfObjects(val any) bool {
}

func isFlattenedSubfield(key string, schema []FieldDefinition) bool {
for strings.Contains(key, ".") {
i := strings.LastIndex(key, ".")
key = key[:i]
ancestor := FindElementDefinition(key, schema)
if ancestor == nil {
continue
}
if ancestor.Type == "flattened" {
return true
}
}
_, ancestor := findAncestorElementDefinition(key, schema, func(_ string, def *FieldDefinition) bool {
return def.Type == "flattened"
})

return false
return ancestor != nil
}

func findElementDefinitionForRoot(root, searchedKey string, fieldDefinitions []FieldDefinition) *FieldDefinition {
Expand Down Expand Up @@ -921,6 +948,22 @@ func findParentElementDefinition(key string, fieldDefinitions []FieldDefinition)
return FindElementDefinition(parentKey, fieldDefinitions)
}

func findAncestorElementDefinition(key string, fieldDefinitions []FieldDefinition, cond func(string, *FieldDefinition) bool) (string, *FieldDefinition) {
for strings.Contains(key, ".") {
i := strings.LastIndex(key, ".")
key = key[:i]
ancestor := FindElementDefinition(key, fieldDefinitions)
if ancestor == nil {
continue
}
if cond(key, ancestor) {
return key, ancestor
}
}

return "", nil
}

// compareKeys checks if `searchedKey` matches with the given `key`. `key` can contain
// wildcards (`*`), that match any sequence of characters in `searchedKey` different to dots.
func compareKeys(key string, def FieldDefinition, searchedKey string) bool {
Expand Down
92 changes: 92 additions & 0 deletions internal/fields/validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1048,6 +1048,98 @@ func TestValidateStackVersionsWithEcsMappings(t *testing.T) {
}
}

func TestSkipLeafOfObject(t *testing.T) {
schema := []FieldDefinition{
{
Name: "foo",
Type: "keyword",
},
{
Name: "flattened",
Type: "flattened",
},
{
Name: "object",
Type: "object",
},
{
Name: "nested",
Type: "nested",
},
{
Name: "group",
Type: "group",
Fields: []FieldDefinition{
{
Name: "subgroup",
Type: "object",
},
},
},
}

cases := []struct {
name string
version *semver.Version
expected bool
}{
{
name: "foo.bar",
version: semver.MustParse("3.0.0"),
expected: true,
},
{
name: "subgroup.bar",
version: semver.MustParse("3.0.0"),
expected: true,
},
{
name: "foo.bar",
version: semver.MustParse("3.0.1"),
expected: false,
},
{
name: "subgroup.bar",
version: semver.MustParse("3.0.1"),
expected: false,
},
}

// Cases we expect to skip depending on the version.
okRoots := []string{"flattened", "object", "group", "nested"}
for _, root := range okRoots {
t.Run("(empty root)", func(t *testing.T) {
for _, c := range cases {
t.Run(c.name+"_"+c.version.String(), func(t *testing.T) {
found := skipLeafOfObject("", root+"."+c.name, *c.version, schema)
assert.Equal(t, c.expected, found)
})
}
})
t.Run(root, func(t *testing.T) {
for _, c := range cases {
t.Run(c.name+"_"+c.version.String(), func(t *testing.T) {
found := skipLeafOfObject(root, c.name, *c.version, schema)
assert.Equal(t, c.expected, found)
})
}
})
}

// Cases we never expect to skip.
notOkRoots := []string{"foo", "notexists", "group.subgroup.other"}
for _, root := range notOkRoots {
t.Run("not ok "+root, func(t *testing.T) {
for _, c := range cases {
t.Run(c.name+"_"+c.version.String(), func(t *testing.T) {
found := skipLeafOfObject(root, c.name, *c.version, schema)
assert.Equal(t, false, found)
})
}
})
}
}

func readTestResults(t *testing.T, path string) (f results) {
c, err := os.ReadFile(path)
require.NoError(t, err)
Expand Down

0 comments on commit d115a79

Please sign in to comment.