Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make network_direction, registered_domain and convert processors compatible with ES older than 7.13.0 #26676

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,9 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fix `kibana.log` pipeline when `event.duration` calculation becomes a Long. {issue}24556[24556] {pull}25675[25675]
- Removed incorrect `http.request.referrer` field from `aws.elb` module. {issue}26435[26435] {pull}26441[26441]
- Fix `threatintel.indicator.url.full` not being populated. {issue}26351[26351] {pull}26508[26508]
- Fix Elasticsearch compatibility for modules that use `type: ip` with `convert` processors. {issue}26629[26629] {pull}26676[26676]
- Fix Elasticsearch compatibility for modules that use the `network_direction` processor. {issue}26629[26629] {pull}26676[26676]
- Fix Elasticsearch compatibility for modules that use the `registered_domain` processor. {issue}26629[26629] {pull}26676[26676]

*Heartbeat*

Expand Down
132 changes: 103 additions & 29 deletions filebeat/fileset/compatibility.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,31 @@ import (
// processorCompatibility defines a processor's minimum version requirements or
// a transformation to make it compatible.
type processorCompatibility struct {
checkVersion func(esVersion *common.Version) bool // Version check returns true if this check applies.
procType string // Elasticsearch Ingest Node processor type.
adaptConfig func(processor map[string]interface{}, log *logp.Logger) (drop bool, err error) // Adapt the configuration to make it compatible.
checkVersion func(esVersion *common.Version) bool // Version check returns true if this check applies.
procType string // Elasticsearch Ingest Node processor type.
adaptConfig func(processor map[string]interface{}, log *logp.Logger) compatAction // Adapt the configuration to make it compatible.
}

type compatAction func(interface{}) (interface{}, error)

func keepProcessor(original interface{}) (interface{}, error) {
return original, nil
}

func dropProcessor(interface{}) (interface{}, error) {
return nil, nil
}

func replaceProcessor(newProc interface{}) compatAction {
return func(interface{}) (interface{}, error) {
return newProc, nil
}
}

func fail(err error) compatAction {
return func(interface{}) (interface{}, error) {
return nil, err
}
}

var processorCompatibilityChecks = []processorCompatibility{
Expand Down Expand Up @@ -70,26 +92,47 @@ var processorCompatibilityChecks = []processorCompatibility{
return esVersion.LessThan(common.MustNewVersion("7.0.0")) &&
!esVersion.LessThan(common.MustNewVersion("6.7.0"))
},
adaptConfig: func(config map[string]interface{}, _ *logp.Logger) (bool, error) {
adaptConfig: func(config map[string]interface{}, _ *logp.Logger) compatAction {
config["ecs"] = true
return false, nil
return keepProcessor
},
},
{
procType: "user_agent",
checkVersion: func(esVersion *common.Version) bool {
return esVersion.LessThan(common.MustNewVersion("6.7.0"))
},
adaptConfig: func(config map[string]interface{}, _ *logp.Logger) (bool, error) {
return false, errors.New("user_agent processor requires option 'ecs: true', Elasticsearch 6.7 or newer required")
adaptConfig: func(config map[string]interface{}, _ *logp.Logger) compatAction {
return fail(errors.New("user_agent processor requires option 'ecs: true', Elasticsearch 6.7 or newer required"))
},
},
{
procType: "convert",
checkVersion: func(esVersion *common.Version) bool {
return esVersion.LessThan(common.MustNewVersion("7.13.0"))
},
adaptConfig: replaceConvertIP,
},
{
procType: "network_direction",
checkVersion: func(esVersion *common.Version) bool {
return esVersion.LessThan(common.MustNewVersion("7.13.0"))
},
adaptConfig: deleteProcessor,
},
{
procType: "registered_domain",
checkVersion: func(esVersion *common.Version) bool {
return esVersion.LessThan(common.MustNewVersion("7.13.0"))
},
adaptConfig: deleteProcessor,
},
}

// adaptPipelineForCompatibility iterates over all processors in the pipeline
// and adapts them for version of Elasticsearch used. Adapt can mean modifying
// processor options or removing the processor.
func adaptPipelineForCompatibility(esVersion common.Version, pipelineID string, content map[string]interface{}, log *logp.Logger) error {
func adaptPipelineForCompatibility(esVersion common.Version, pipelineID string, content map[string]interface{}, log *logp.Logger) (err error) {
p, ok := content["processors"]
if !ok {
return errors.New("'processors' is missing from the pipeline definition")
Expand All @@ -104,12 +147,12 @@ func adaptPipelineForCompatibility(esVersion common.Version, pipelineID string,

nextProcessor:
for i, obj := range processors {
processor, ok := obj.(map[string]interface{})
if !ok {
return fmt.Errorf("processor at index %d is not an object, got %T", i, obj)
}

for _, proc := range processorCompatibilityChecks {
processor, ok := obj.(map[string]interface{})
if !ok {
return fmt.Errorf("processor at index %d is not an object, got %T", i, obj)
}

configIfc, found := processor[proc.procType]
if !found {
continue
Expand All @@ -123,16 +166,17 @@ nextProcessor:
continue
}

drop, err := proc.adaptConfig(config, log.With("processor_type", proc.procType, "processor_index", i))
act := proc.adaptConfig(config, log.With("processor_type", proc.procType, "processor_index", i))
obj, err = act(obj)
if err != nil {
return fmt.Errorf("failed to adapt %q processor at index %d: %w", proc.procType, i, err)
}
if drop {
if obj == nil {
continue nextProcessor
}
}

filteredProcs = append(filteredProcs, processors[i])
filteredProcs = append(filteredProcs, obj)
}

content["processors"] = filteredProcs
Expand All @@ -141,14 +185,16 @@ nextProcessor:

// deleteProcessor returns true to indicate that the processor should be deleted
// in order to adapt the pipeline for backwards compatibility to Elasticsearch.
func deleteProcessor(_ map[string]interface{}, _ *logp.Logger) (bool, error) { return true, nil }
func deleteProcessor(_ map[string]interface{}, _ *logp.Logger) compatAction {
return dropProcessor
}

// replaceSetIgnoreEmptyValue replaces ignore_empty_value option with an if
// statement so ES less than 7.9 will work.
func replaceSetIgnoreEmptyValue(config map[string]interface{}, log *logp.Logger) (bool, error) {
func replaceSetIgnoreEmptyValue(config map[string]interface{}, log *logp.Logger) compatAction {
_, ok := config["ignore_empty_value"].(bool)
if !ok {
return false, nil
return keepProcessor
}

log.Debug("Removing unsupported 'ignore_empty_value' from set processor.")
Expand All @@ -157,11 +203,11 @@ func replaceSetIgnoreEmptyValue(config map[string]interface{}, log *logp.Logger)
_, ok = config["if"].(string)
if ok {
// assume if check is sufficient
return false, nil
return keepProcessor
}
val, ok := config["value"].(string)
if !ok {
return false, nil
return keepProcessor
}

newIf := strings.TrimLeft(val, "{ ")
Expand All @@ -171,37 +217,37 @@ func replaceSetIgnoreEmptyValue(config map[string]interface{}, log *logp.Logger)

log.Debug("Adding if %s to replace 'ignore_empty_value' in set processor.", newIf)
config["if"] = newIf
return false, nil
return keepProcessor
}

// replaceAppendAllowDuplicates replaces allow_duplicates option with an if statement
// so ES less than 7.10 will work.
func replaceAppendAllowDuplicates(config map[string]interface{}, log *logp.Logger) (bool, error) {
func replaceAppendAllowDuplicates(config map[string]interface{}, log *logp.Logger) compatAction {
allow, ok := config["allow_duplicates"].(bool)
if !ok {
return false, nil
return keepProcessor
}

log.Debug("Removing unsupported 'allow_duplicates' from append processor.")
delete(config, "allow_duplicates")

if allow {
// It was set to true, nothing else to do after removing the option.
return false, nil
return keepProcessor
}

currIf, _ := config["if"].(string)
if strings.Contains(strings.ToLower(currIf), "contains") {
// If it has a contains statement, we assume it is checking for duplicates already.
return false, nil
return keepProcessor
}
field, ok := config["field"].(string)
if !ok {
return false, nil
return keepProcessor
}
val, ok := config["value"].(string)
if !ok {
return false, nil
return keepProcessor
}

field = strings.ReplaceAll(field, ".", "?.")
Expand All @@ -220,5 +266,33 @@ func replaceAppendAllowDuplicates(config map[string]interface{}, log *logp.Logge
log.Debug("Adding if %s to replace 'allow_duplicates: false' in append processor.", newIf)
config["if"] = newIf

return false, nil
return keepProcessor
}

// replaceConvertIP replaces convert processors with type: ip with a grok expression that uses
// the IP pattern.
func replaceConvertIP(config map[string]interface{}, log *logp.Logger) compatAction {
wantedType, found := config["type"]
if !found || wantedType != "ip" {
return keepProcessor
}
log.Debug("processor input=", config)
delete(config, "type")
var srcIf, dstIf interface{}
if srcIf, found = config["field"]; !found {
return fail(errors.New("field option is required for convert processor"))
}
if dstIf, found = config["target_field"]; found {
delete(config, "target_field")
} else {
dstIf = srcIf
}
config["patterns"] = []string{
fmt.Sprintf("^%%{IP:%s}$", dstIf),
}
grok := map[string]interface{}{
"grok": config,
}
log.Debug("processor output=", grok)
return replaceProcessor(grok)
}
Loading