-
Notifications
You must be signed in to change notification settings - Fork 118
/
Copy pathvalidate.go
1369 lines (1218 loc) · 41.6 KB
/
validate.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.
package fields
import (
"bufio"
_ "embed"
"encoding/json"
"errors"
"fmt"
"net"
"os"
"path/filepath"
"regexp"
"slices"
"sort"
"strconv"
"strings"
"github.com/Masterminds/semver/v3"
"github.com/cbroglie/mustache"
"gopkg.in/yaml.v3"
"github.com/elastic/elastic-package/internal/common"
"github.com/elastic/elastic-package/internal/logger"
"github.com/elastic/elastic-package/internal/multierror"
"github.com/elastic/elastic-package/internal/packages"
"github.com/elastic/elastic-package/internal/packages/buildmanifest"
)
var (
semver2_0_0 = semver.MustParse("2.0.0")
semver2_3_0 = semver.MustParse("2.3.0")
semver3_0_1 = semver.MustParse("3.0.1")
// List of stack releases that do not
// include ECS mappings (all versions before 8.13.0).
stackVersionsWithoutECS = []*semver.Version{
semver.MustParse("8.12.2"),
semver.MustParse("8.12.1"),
semver.MustParse("8.12.0"),
semver.MustParse("8.11.4"),
semver.MustParse("8.11.3"),
semver.MustParse("8.11.2"),
semver.MustParse("8.11.1"),
semver.MustParse("8.11.0"),
semver.MustParse("8.10.4"),
semver.MustParse("8.10.3"),
semver.MustParse("8.10.2"),
semver.MustParse("8.10.1"),
semver.MustParse("8.10.0"),
semver.MustParse("8.9.2"),
semver.MustParse("8.9.1"),
semver.MustParse("8.9.0"),
semver.MustParse("8.8.2"),
semver.MustParse("8.8.1"),
semver.MustParse("8.8.0"),
semver.MustParse("8.7.1"),
semver.MustParse("8.7.0"),
semver.MustParse("8.6.2"),
semver.MustParse("8.6.1"),
semver.MustParse("8.6.0"),
semver.MustParse("8.5.3"),
semver.MustParse("8.5.2"),
semver.MustParse("8.5.1"),
semver.MustParse("8.5.0"),
semver.MustParse("8.4.3"),
semver.MustParse("8.4.2"),
semver.MustParse("8.4.1"),
semver.MustParse("8.4.0"),
semver.MustParse("8.3.3"),
semver.MustParse("8.3.2"),
semver.MustParse("8.3.1"),
semver.MustParse("8.3.0"),
semver.MustParse("8.2.3"),
semver.MustParse("8.2.2"),
semver.MustParse("8.2.1"),
semver.MustParse("8.2.0"),
semver.MustParse("8.1.3"),
semver.MustParse("8.1.2"),
semver.MustParse("8.1.1"),
semver.MustParse("8.1.0"),
semver.MustParse("8.0.1"),
semver.MustParse("8.0.0"),
semver.MustParse("7.17.24"),
semver.MustParse("7.17.23"),
semver.MustParse("7.17.22"),
semver.MustParse("7.17.21"),
semver.MustParse("7.17.20"),
semver.MustParse("7.17.19"),
semver.MustParse("7.17.18"),
semver.MustParse("7.17.17"),
semver.MustParse("7.17.16"),
semver.MustParse("7.17.15"),
semver.MustParse("7.17.14"),
semver.MustParse("7.17.13"),
semver.MustParse("7.17.12"),
semver.MustParse("7.17.11"),
semver.MustParse("7.17.10"),
semver.MustParse("7.17.9"),
semver.MustParse("7.17.8"),
semver.MustParse("7.17.7"),
semver.MustParse("7.17.6"),
semver.MustParse("7.17.5"),
semver.MustParse("7.17.4"),
semver.MustParse("7.17.3"),
semver.MustParse("7.17.2"),
semver.MustParse("7.17.1"),
semver.MustParse("7.17.0"),
semver.MustParse("7.16.3"),
semver.MustParse("7.16.2"),
semver.MustParse("7.16.1"),
semver.MustParse("7.16.0"),
semver.MustParse("7.15.2"),
semver.MustParse("7.15.1"),
semver.MustParse("7.15.0"),
semver.MustParse("7.14.2"),
semver.MustParse("7.14.1"),
semver.MustParse("7.14.0"), // First version of Fleet in GA; there are no packages older than this version.
}
defaultExternal = "ecs"
)
// Validator is responsible for fields validation.
type Validator struct {
// Schema contains definition records.
Schema []FieldDefinition
// SpecVersion contains the version of the spec used by the package.
specVersion semver.Version
// expectedDatasets contains the value expected for dataset fields.
expectedDatasets []string
defaultNumericConversion bool
// fields that store keywords, but can be received as numeric types.
numericKeywordFields []string
// fields that store numbers, but can be received as strings.
stringNumberFields []string
disabledDependencyManagement bool
enabledAllowedIPCheck bool
allowedCIDRs []*net.IPNet
enabledImportAllECSSchema bool
disabledNormalization bool
injectFieldsOptions InjectFieldsOptions
}
// ValidatorOption represents an optional flag that can be passed to CreateValidatorForDirectory.
type ValidatorOption func(*Validator) error
// WithSpecVersion enables validation dependant of the spec version used by the package.
func WithSpecVersion(version string) ValidatorOption {
return func(v *Validator) error {
sv, err := semver.NewVersion(version)
if err != nil {
return fmt.Errorf("invalid version %q: %v", version, err)
}
v.specVersion = *sv
return nil
}
}
// WithDefaultNumericConversion configures the validator to accept defined keyword (or constant_keyword) fields as numeric-type.
func WithDefaultNumericConversion() ValidatorOption {
return func(v *Validator) error {
v.defaultNumericConversion = true
return nil
}
}
// WithNumericKeywordFields configures the validator to accept specific fields to have numeric-type
// while defined as keyword or constant_keyword.
func WithNumericKeywordFields(fields []string) ValidatorOption {
return func(v *Validator) error {
v.numericKeywordFields = common.StringSlicesUnion(v.numericKeywordFields, fields)
return nil
}
}
// WithStringNumberFields configures the validator to accept specific fields to have fields defined as numbers
// as their string representation.
func WithStringNumberFields(fields []string) ValidatorOption {
return func(v *Validator) error {
v.stringNumberFields = common.StringSlicesUnion(v.stringNumberFields, fields)
return nil
}
}
// WithDisabledDependencyManagement configures the validator to ignore external fields and won't follow dependencies.
func WithDisabledDependencyManagement() ValidatorOption {
return func(v *Validator) error {
v.disabledDependencyManagement = true
return nil
}
}
// WithEnabledAllowedIPCheck configures the validator to perform check on the IP values against an allowed list.
func WithEnabledAllowedIPCheck() ValidatorOption {
return func(v *Validator) error {
v.enabledAllowedIPCheck = true
return nil
}
}
// WithExpectedDatasets configures the validator to check if the dataset field value matches one of the expected values.
func WithExpectedDatasets(datasets []string) ValidatorOption {
return func(v *Validator) error {
v.expectedDatasets = datasets
return nil
}
}
// WithEnabledImportAllECSSchema configures the validator to check or not the fields with the complete ECS schema.
func WithEnabledImportAllECSSChema(importSchema bool) ValidatorOption {
return func(v *Validator) error {
v.enabledImportAllECSSchema = importSchema
return nil
}
}
// WithDisableNormalization configures the validator to disable normalization.
func WithDisableNormalization(disabledNormalization bool) ValidatorOption {
return func(v *Validator) error {
v.disabledNormalization = disabledNormalization
return nil
}
}
// WithInjectFieldsOptions configures fields injection.
func WithInjectFieldsOptions(options InjectFieldsOptions) ValidatorOption {
return func(v *Validator) error {
v.injectFieldsOptions = options
return nil
}
}
type packageRootFinder interface {
FindPackageRoot() (string, bool, error)
}
type packageRoot struct{}
func (p packageRoot) FindPackageRoot() (string, bool, error) {
return packages.FindPackageRoot()
}
// CreateValidatorForDirectory function creates a validator for the directory.
func CreateValidatorForDirectory(fieldsParentDir string, opts ...ValidatorOption) (v *Validator, err error) {
p := packageRoot{}
return createValidatorForDirectoryAndPackageRoot(fieldsParentDir, p, opts...)
}
func createValidatorForDirectoryAndPackageRoot(fieldsParentDir string, finder packageRootFinder, opts ...ValidatorOption) (v *Validator, err error) {
v = new(Validator)
// In validator, inject fields with settings used for validation, such as `allowed_values`.
v.injectFieldsOptions.IncludeValidationSettings = true
for _, opt := range opts {
if err := opt(v); err != nil {
return nil, err
}
}
v.allowedCIDRs = initializeAllowedCIDRsList()
fieldsDir := filepath.Join(fieldsParentDir, "fields")
var fdm *DependencyManager
if !v.disabledDependencyManagement {
packageRoot, found, err := finder.FindPackageRoot()
if err != nil {
return nil, fmt.Errorf("can't find package root: %w", err)
}
if !found {
return nil, errors.New("package root not found and dependency management is enabled")
}
fdm, v.Schema, err = initDependencyManagement(packageRoot, v.specVersion, v.enabledImportAllECSSchema)
if err != nil {
return nil, fmt.Errorf("failed to initialize dependency management: %w", err)
}
}
fields, err := loadFieldsFromDir(fieldsDir, fdm, v.injectFieldsOptions)
if err != nil {
return nil, fmt.Errorf("can't load fields from directory (path: %s): %w", fieldsDir, err)
}
v.Schema = append(fields, v.Schema...)
return v, nil
}
func initDependencyManagement(packageRoot string, specVersion semver.Version, importECSSchema bool) (*DependencyManager, []FieldDefinition, error) {
buildManifest, ok, err := buildmanifest.ReadBuildManifest(packageRoot)
if err != nil {
return nil, nil, fmt.Errorf("can't read build manifest: %w", err)
}
if !ok {
// There is no build manifest, nothing to do.
return nil, nil, nil
}
fdm, err := CreateFieldDependencyManager(buildManifest.Dependencies)
if err != nil {
return nil, nil, fmt.Errorf("can't create field dependency manager: %w", err)
}
// Check if the package embeds ECS mappings
packageEmbedsEcsMappings := buildManifest.ImportMappings() && !specVersion.LessThan(semver2_3_0)
// Check if all stack versions support ECS mappings
stackSupportsEcsMapping, err := supportsECSMappings(packageRoot)
if err != nil {
return nil, nil, fmt.Errorf("can't check if stack version includes ECS mappings: %w", err)
}
// If the package embeds ECS mappings, or the stack version includes ECS mappings, then
// we should import the ECS schema to validate the package fields against it.
var schema []FieldDefinition
if (packageEmbedsEcsMappings || stackSupportsEcsMapping) && importECSSchema {
// Import all fields from external schema (most likely ECS) to
// validate the package fields against it.
ecsSchema, err := fdm.ImportAllFields(defaultExternal)
if err != nil {
return nil, nil, err
}
logger.Debugf("Imported ECS fields definition from external schema for validation (embedded in package: %v, stack uses ecs@mappings template: %v)", packageEmbedsEcsMappings, stackSupportsEcsMapping)
schema = ecsSchema
}
// ecs@mappings adds additional multifields that are not defined anywhere.
// Adding them in all cases so packages can be tested in versions of the stack that
// add the ecs@mappings component template.
schema = appendECSMappingMultifields(schema, "")
return fdm, schema, nil
}
// supportsECSMappings check if all the versions of the stack the package can run on support ECS mappings.
func supportsECSMappings(packageRoot string) (bool, error) {
packageManifest, err := packages.ReadPackageManifestFromPackageRoot(packageRoot)
if err != nil {
return false, fmt.Errorf("can't read package manifest: %w", err)
}
if len(packageManifest.Conditions.Kibana.Version) == 0 {
logger.Debugf("No Kibana version constraint found in package manifest; assuming it does not support ECS mappings.")
return false, nil
}
kibanaConstraints, err := semver.NewConstraint(packageManifest.Conditions.Kibana.Version)
if err != nil {
return false, fmt.Errorf("invalid constraint for Kibana: %w", err)
}
return allVersionsIncludeECS(kibanaConstraints), nil
}
// allVersionsIncludeECS Check if all the stack versions in the constraints include ECS mappings. Only the stack
// versions 8.13.0 and above include ECS mappings.
//
// Returns true if all the stack versions in the constraints include ECS mappings, otherwise returns false.
func allVersionsIncludeECS(kibanaConstraints *semver.Constraints) bool {
// Looking for a version that satisfies the package constraints.
for _, v := range stackVersionsWithoutECS {
if kibanaConstraints.Check(v) {
// Found a version that satisfies the constraints,
// so at least this version does not include
// ECS mappings.
return false
}
}
// If no version satisfies the constraints, then all versions
// include ECS mappings.
return true
// This check works under the assumption the constraints are not limited
// upwards.
//
// For example, if the constraint is `>= 8.12.0` and the stack version is
// `8.12.999`, the constraint will be satisfied.
//
// However, if the constraint is `>= 8.0.0, < 8.10.0` the check will not
// return the right result.
//
// To support this, we would need to check the constraint against a larger
// set of versions, and check if the constraint is satisfied for all
// of them, like in the commented out example above.
//
// lastStackVersionWithoutEcsMappings := semver.MustParse("8.12.999")
// return !kibanaConstraints.Check(lastStackVersionWithoutEcsMappings)
}
func ecsPathWithMultifieldsMatch(name string) bool {
suffixes := []string{
// From https://github.com/elastic/elasticsearch/blob/34a78f3cf3e91cd13f51f1f4f8e378f8ed244a2b/x-pack/plugin/core/template-resources/src/main/resources/ecs%40mappings.json#L87
".body.content",
"url.full",
"url.original",
// From https://github.com/elastic/elasticsearch/blob/34a78f3cf3e91cd13f51f1f4f8e378f8ed244a2b/x-pack/plugin/core/template-resources/src/main/resources/ecs%40mappings.json#L96
"command_line",
"stack_trace",
// From https://github.com/elastic/elasticsearch/blob/34a78f3cf3e91cd13f51f1f4f8e378f8ed244a2b/x-pack/plugin/core/template-resources/src/main/resources/ecs%40mappings.json#L113
".title",
".executable",
".name",
".working_directory",
".full_name",
"file.path",
"file.target_path",
"os.full",
"email.subject",
"vulnerability.description",
"user_agent.original",
}
for _, suffix := range suffixes {
if strings.HasSuffix(name, suffix) {
return true
}
}
return false
}
// appendECSMappingMultifields adds multifields included in ecs@mappings that are not defined anywhere, for fields
// that don't define any multifield.
func appendECSMappingMultifields(schema []FieldDefinition, prefix string) []FieldDefinition {
rules := []struct {
match func(name string) bool
definitions []FieldDefinition
}{
{
match: ecsPathWithMultifieldsMatch,
definitions: []FieldDefinition{
{
Name: "text",
Type: "match_only_text",
},
},
},
}
var result []FieldDefinition
for _, def := range schema {
fullName := def.Name
if prefix != "" {
fullName = prefix + "." + fullName
}
def.Fields = appendECSMappingMultifields(def.Fields, fullName)
for _, rule := range rules {
if !rule.match(fullName) {
continue
}
for _, mf := range rule.definitions {
// Append multifields only if they are not already defined.
f := func(d FieldDefinition) bool {
return d.Name == mf.Name
}
if !slices.ContainsFunc(def.MultiFields, f) {
def.MultiFields = append(def.MultiFields, mf)
}
}
}
result = append(result, def)
}
return result
}
//go:embed _static/allowed_geo_ips.txt
var allowedGeoIPs string
func initializeAllowedCIDRsList() (cidrs []*net.IPNet) {
s := bufio.NewScanner(strings.NewReader(allowedGeoIPs))
for s.Scan() {
_, cidr, err := net.ParseCIDR(s.Text())
if err != nil {
panic("invalid ip in _static/allowed_geo_ips.txt: " + s.Text())
}
cidrs = append(cidrs, cidr)
}
return cidrs
}
func loadFieldsFromDir(fieldsDir string, fdm *DependencyManager, injectOptions InjectFieldsOptions) ([]FieldDefinition, error) {
files, err := filepath.Glob(filepath.Join(fieldsDir, "*.yml"))
if err != nil {
return nil, fmt.Errorf("reading directory with fields failed (path: %s): %w", fieldsDir, err)
}
var fields []FieldDefinition
for _, file := range files {
body, err := os.ReadFile(file)
if err != nil {
return nil, fmt.Errorf("reading fields file failed: %w", err)
}
if fdm != nil {
body, err = injectFields(body, fdm, injectOptions)
if err != nil {
return nil, fmt.Errorf("loading external fields failed: %w", err)
}
}
var u []FieldDefinition
err = yaml.Unmarshal(body, &u)
if err != nil {
return nil, fmt.Errorf("unmarshalling field body failed: %w", err)
}
fields = append(fields, u...)
}
return fields, nil
}
func injectFields(d []byte, dm *DependencyManager, options InjectFieldsOptions) ([]byte, error) {
var fields []common.MapStr
err := yaml.Unmarshal(d, &fields)
if err != nil {
return nil, fmt.Errorf("parsing fields failed: %w", err)
}
fields, _, err = dm.injectFieldsWithOptions(fields, options)
if err != nil {
return nil, fmt.Errorf("injecting fields failed: %w", err)
}
return yaml.Marshal(fields)
}
// ValidateDocumentBody validates the provided document body.
func (v *Validator) ValidateDocumentBody(body json.RawMessage) multierror.Error {
var c common.MapStr
err := json.Unmarshal(body, &c)
if err != nil {
var errs multierror.Error
errs = append(errs, fmt.Errorf("unmarshalling document body failed: %w", err))
return errs
}
return v.ValidateDocumentMap(c)
}
// ValidateDocumentMap validates the provided document as common.MapStr.
func (v *Validator) ValidateDocumentMap(body common.MapStr) multierror.Error {
errs := v.validateDocumentValues(body)
errs = append(errs, v.validateMapElement("", body, body)...)
if len(errs) == 0 {
return nil
}
return errs
}
var datasetFieldNames = []string{
"event.dataset",
"data_stream.dataset",
}
func (v *Validator) validateDocumentValues(body common.MapStr) multierror.Error {
var errs multierror.Error
if !v.specVersion.LessThan(semver2_0_0) && v.expectedDatasets != nil {
for _, datasetField := range datasetFieldNames {
value, err := body.GetValue(datasetField)
if errors.Is(err, common.ErrKeyNotFound) {
continue
}
// Why do we render the expected datasets here?
// Because the expected datasets can contain
// mustache templates, and not just static
// strings.
//
// For example, the expected datasets for the
// Kubernetes container logs dataset can be:
//
// - "{{kubernetes.labels.elastic_co/dataset}}"
//
var renderedExpectedDatasets []string
for _, dataset := range v.expectedDatasets {
renderedDataset, err := mustache.Render(dataset, body)
if err != nil {
err := fmt.Errorf("can't render expected dataset %q: %w", dataset, err)
errs = append(errs, err)
return errs
}
renderedExpectedDatasets = append(renderedExpectedDatasets, renderedDataset)
}
str, ok := valueToString(value, v.disabledNormalization)
exists := stringInArray(str, renderedExpectedDatasets)
if !ok || !exists {
err := fmt.Errorf("field %q should have value in %q, it has \"%v\"",
datasetField, v.expectedDatasets, value)
errs = append(errs, err)
}
}
}
return errs
}
func stringInArray(target string, arr []string) bool {
// Check if target is part of the array
found := false
for _, item := range arr {
if item == target {
found = true
break
}
}
return found
}
func valueToString(value any, disabledNormalization bool) (string, bool) {
if disabledNormalization {
// when synthetics mode is enabled, each field present in the document is an array
// so this check needs to retrieve the first element of the array
vals, err := common.ToStringSlice(value)
if err != nil || len(vals) != 1 {
return "", false
}
return vals[0], true
}
str, ok := value.(string)
return str, ok
}
func (v *Validator) validateMapElement(root string, elem common.MapStr, doc common.MapStr) multierror.Error {
var errs multierror.Error
for name, val := range elem {
key := strings.TrimLeft(root+"."+name, ".")
switch val := val.(type) {
case []map[string]any:
for _, m := range val {
err := v.validateMapElement(key, m, doc)
if err != nil {
errs = append(errs, err...)
}
}
case map[string]any:
if isFieldTypeFlattened(key, v.Schema) {
// Do not traverse into objects with flattened data types
// because the entire object is mapped as a single field.
continue
}
err := v.validateMapElement(key, val, doc)
if err != nil {
errs = append(errs, err...)
}
default:
if skipLeafOfObject(root, name, v.specVersion, v.Schema) {
// Till some versions we skip some validations on leaf of objects, check if it is the case.
break
}
err := v.validateScalarElement(key, val, doc)
if err != nil {
errs = append(errs, err)
}
}
}
return errs
}
func (v *Validator) validateScalarElement(key string, val any, doc common.MapStr) error {
if key == "" {
return nil // root key is always valid
}
definition := FindElementDefinition(key, v.Schema)
if definition == nil {
switch {
case skipValidationForField(key):
return nil // generic field, let's skip validation for now
case isFlattenedSubfield(key, v.Schema):
return nil // flattened subfield, it will be stored as member of the flattened ancestor.
case isArrayOfObjects(val):
return fmt.Errorf(`field %q is used as array of objects, expected explicit definition with type group or nested`, key)
case couldBeMultifield(key, v.Schema):
return fmt.Errorf(`field %q is undefined, could be a multifield`, key)
default:
return fmt.Errorf(`field %q is undefined`, key)
}
}
if !v.disabledNormalization {
err := v.validateExpectedNormalization(*definition, val)
if err != nil {
return fmt.Errorf("field %q is not normalized as expected: %w", key, err)
}
}
err := v.parseElementValue(key, *definition, val, doc)
if err != nil {
return fmt.Errorf("parsing field value failed: %w", err)
}
return nil
}
func (v *Validator) SanitizeSyntheticSourceDocs(docs []common.MapStr) ([]common.MapStr, error) {
var newDocs []common.MapStr
var multifields []string
for _, doc := range docs {
for key, contents := range doc {
shouldBeArray := false
definition := FindElementDefinition(key, v.Schema)
if definition != nil {
shouldBeArray = v.shouldValueBeArray(definition)
}
// if it needs to be normalized, the field is kept as it is
if shouldBeArray {
continue
}
// in case it is not specified any normalization and that field is an array of
// just one element, the field is going to be updated to remove the array and keep
// that element as a value.
vals, ok := contents.([]any)
if !ok {
continue
}
if len(vals) == 1 {
_, err := doc.Put(key, vals[0])
if err != nil {
return nil, fmt.Errorf("key %s could not be updated: %w", key, err)
}
}
}
expandedDoc, newMultifields, err := createDocExpandingObjects(doc, v.Schema)
if err != nil {
return nil, fmt.Errorf("failure while expanding objects from doc: %w", err)
}
newDocs = append(newDocs, expandedDoc)
for _, multifield := range newMultifields {
if slices.Contains(multifields, multifield) {
continue
}
multifields = append(multifields, multifield)
}
}
if len(multifields) > 0 {
sort.Strings(multifields)
logger.Debugf("Some keys were not included in sanitized docs because they are multifields: %s", strings.Join(multifields, ", "))
}
return newDocs, nil
}
func (v *Validator) shouldValueBeArray(definition *FieldDefinition) bool {
// normalization should just be checked if synthetic source is enabled and the
// spec version of this package is >= 2.0.0
if v.disabledNormalization && !v.specVersion.LessThan(semver2_0_0) {
for _, normalize := range definition.Normalize {
switch normalize {
case "array":
return true
}
}
}
return false
}
func createDocExpandingObjects(doc common.MapStr, schema []FieldDefinition) (common.MapStr, []string, error) {
keys := make([]string, 0)
for k := range doc {
keys = append(keys, k)
}
sort.Strings(keys)
newDoc := make(common.MapStr)
var multifields []string
for _, k := range keys {
value, err := doc.GetValue(k)
if err != nil {
return nil, nil, fmt.Errorf("not found key %s: %w", k, err)
}
_, err = newDoc.Put(k, value)
if err == nil {
continue
}
// Possible errors found but not limited to those
// - expected map but type is string
// - expected map but type is []any
if strings.HasPrefix(err.Error(), "expected map but type is") {
if couldBeMultifield(k, schema) {
// We cannot add multifields and they are not in source documents ignore them.
multifields = append(multifields, k)
continue
}
logger.Warnf("not able to add key %s: %s", k, err)
continue
}
return nil, nil, fmt.Errorf("not added key %s with value %s: %w", k, value, err)
}
return newDoc, multifields, nil
}
// skipValidationForField skips field validation (field presence) of special fields. The special fields are present
// in every (most?) documents collected by Elastic Agent, but aren't defined in any integration in `fields.yml` files.
// FIXME https://github.com/elastic/elastic-package/issues/147
func skipValidationForField(key string) bool {
return isFieldFamilyMatching("agent", key) ||
isFieldFamilyMatching("elastic_agent", key) ||
isFieldFamilyMatching("cloud", key) || // too many common fields
isFieldFamilyMatching("event", key) || // too many common fields
isFieldFamilyMatching("host", key) || // too many common fields
isFieldFamilyMatching("metricset", key) || // field is deprecated
isFieldFamilyMatching("event.module", key) // field is deprecated
}
// skipLeafOfObject checks if the element is a child of an object that was skipped in some previous
// version of the spec. This is relevant in documents that store fields without subobjects.
func skipLeafOfObject(root, name string, specVersion semver.Version, schema []FieldDefinition) bool {
// We are only skipping validation of these fields on versions older than 3.0.1.
if !specVersion.LessThan(semver3_0_1) {
return false
}
// If it doesn't contain a dot in the name, we have traversed its parent, if any.
if !strings.Contains(name, ".") {
return false
}
key := name
if root != "" {
key = root + "." + name
}
_, ancestor := findAncestorElementDefinition(key, schema, func(key string, def *FieldDefinition) bool {
// Don't look for ancestors beyond root, these objects have been already traversed.
if len(key) < len(root) {
return false
}
if !slices.Contains([]string{"group", "object", "nested", "flattened"}, def.Type) {
return false
}
return true
})
return ancestor != nil
}
func isFieldFamilyMatching(family, key string) bool {
return key == family || strings.HasPrefix(key, family+".")
}
func isFieldTypeFlattened(key string, fieldDefinitions []FieldDefinition) bool {
definition := FindElementDefinition(key, fieldDefinitions)
return definition != nil && definition.Type == "flattened"
}
func couldBeMultifield(key string, fieldDefinitions []FieldDefinition) bool {
parent := findParentElementDefinition(key, fieldDefinitions)
if parent == nil {
// Parent is not defined, so not sure what this can be.
return false
}
switch parent.Type {
case "", "group", "nested", "object":
// Objects cannot have multifields.
return false
}
return true
}
func isArrayOfObjects(val any) bool {
switch val := val.(type) {
case []map[string]any:
return true
case []any:
for _, e := range val {
if _, isMap := e.(map[string]any); isMap {
return true
}
}
}
return false
}
func isFlattenedSubfield(key string, schema []FieldDefinition) bool {
_, ancestor := findAncestorElementDefinition(key, schema, func(_ string, def *FieldDefinition) bool {
return def.Type == "flattened"
})
return ancestor != nil
}
func findElementDefinitionForRoot(root, searchedKey string, fieldDefinitions []FieldDefinition) *FieldDefinition {
for _, def := range fieldDefinitions {
key := strings.TrimLeft(root+"."+def.Name, ".")
if compareKeys(key, def, searchedKey) {
return &def
}
fd := findElementDefinitionForRoot(key, searchedKey, def.Fields)
if fd != nil {
return fd
}
fd = findElementDefinitionForRoot(key, searchedKey, def.MultiFields)
if fd != nil {
return fd
}
}
if root == "" {
// No definition found, check if the parent is an object with object type.
parent := findParentElementDefinition(searchedKey, fieldDefinitions)
if parent != nil && parent.Type == "object" && parent.ObjectType != "" {
fd := *parent
fd.Name = searchedKey
fd.Type = parent.ObjectType
fd.ObjectType = ""
return &fd
}
}
return nil
}
// FindElementDefinition is a helper function used to find the fields definition in the schema.
func FindElementDefinition(searchedKey string, fieldDefinitions []FieldDefinition) *FieldDefinition {
return findElementDefinitionForRoot("", searchedKey, fieldDefinitions)
}
func findParentElementDefinition(key string, fieldDefinitions []FieldDefinition) *FieldDefinition {
lastDotIndex := strings.LastIndex(key, ".")
if lastDotIndex < 0 {
// Field at the root level cannot be a multifield.
return nil
}
parentKey := key[:lastDotIndex]
return FindElementDefinition(parentKey, fieldDefinitions)
}
func findAncestorElementDefinition(key string, fieldDefinitions []FieldDefinition, cond func(string, *FieldDefinition) bool) (string, *FieldDefinition) {
for strings.Contains(key, ".") {
i := strings.LastIndex(key, ".")
key = key[:i]
ancestor := FindElementDefinition(key, fieldDefinitions)
if ancestor == nil {
continue
}
if cond(key, ancestor) {
return key, ancestor
}
}
return "", nil
}
// compareKeys checks if `searchedKey` matches with the given `key`. `key` can contain
// wildcards (`*`), that match any sequence of characters in `searchedKey` different to dots.
func compareKeys(key string, def FieldDefinition, searchedKey string) bool {
// Loop over every byte in `key` to find if there is a matching byte in `searchedKey`.
var j int
for _, k := range []byte(key) {
if j >= len(searchedKey) {
// End of searched key reached before maching all characters in the key.
return false
}
switch k {
case searchedKey[j]:
// Match, continue.
j++
case '*':
// Wildcard, match everything till next dot.
switch idx := strings.IndexByte(searchedKey[j:], '.'); idx {
default:
// Jump till next dot.
j += idx
case -1:
// No dots, wildcard matches with the rest of the searched key.
j = len(searchedKey)
case 0:
// Empty name on wildcard, this is not permitted (e.g. `example..foo`).
return false
}
default:
// No match.
return false
}
}
// If everything matched, searched key has been found.