Skip to content

Commit

Permalink
Make network_direction, registered_domain and convert processors comp…
Browse files Browse the repository at this point in the history
…atible with ES older than 7.13.0 (#26676)

Adds three new Filebeat fileset compatibility tweaks to support Elasticsearch versions before 7.13.0:

- Replaces usages of convert processor using type: ip with an equivalent grok expression.
  Convert to ip type is used to make a conditional field copy if the source field is a valid IP address.
- Removes the network_direction processor.
- Removes the registered_domain processor.

(cherry picked from commit 65d2193)
  • Loading branch information
adriansr authored and mergify-bot committed Jul 2, 2021
1 parent 49dae12 commit 97ef2e6
Show file tree
Hide file tree
Showing 3 changed files with 556 additions and 29 deletions.
26 changes: 26 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,32 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fix bug in aws-s3 input where the end of gzipped log files might have been discarded. {pull}26260[26260]
- o365: Avoid mapping exception for `Parameters` and `ExtendedProperties` fields of string type. {pull}26164[26164]
- Clone value when copy fields in processors to avoid crash. {issue}19206[19206] {pull}20500[20500]
- Fix cisco umbrella module config by adding input variable. {pull}22892[22892]
- Fix network.direction logic in zeek connection fileset. {pull}22967[22967]
- Convert the o365 module's `client.port` and `source.port` to numbers (from strings) in events. {pull}22939[22939]
- Fix Cisco ASA/FTD module's parsing of WebVPN log message 716002. {pull}22966[22966]
- Fix aws s3 overview dashboard. {pull}23045[23045]
- Fix bad `network.direction` values in Fortinet/firewall fileset. {pull}23072[23072]
- Add support for organization and custom prefix in AWS/CloudTrail fileset. {issue}23109[23109] {pull}23126[23126]
- Simplify regex for organization custom prefix in AWS/CloudTrail fileset. {issue}23203[23203] {pull}23204[23204]
- Fix syslog header parsing in infoblox module. {issue}23272[23272] {pull}23273[23273]
- Fix concurrent modification exception in Suricata ingest node pipeline. {pull}23534[23534]
- Fix handling of ModifiedProperties field in Office 365. {pull}23777[23777]
- Fix gcp/vpcflow module error where input type was defaulting to file. {pull}24719[24719]
- Change `checkpoint.source_object` from Long to Keyword. {issue}25124[25124] {pull}25145[25145]
- Fix s3 input when there is a blank line in the log file. {pull}25357[25357]
- Fix Nginx module pipelines. {issue}19088[19088] {pull}24699[24699]
- Remove space from field `sophos.xg.trans_src_ ip`. {issue}25154[25154] {pull}25250[25250]
- Fix `checkpoint.action_reason` when its a string, not a Long. {issue}25575[25575] {pull}25609[25609]
- Fix `fortinet.firewall.addr` when its a string, not an IP address. {issue}25585[25585] {pull}25608[25608]
- Fix incorrect field name appending to `related.hash` in `threatintel.abusechmalware` ingest pipeline. {issue}25151[25151] {pull}25674[25674]
- Add improvements to the azure activitylogs and platformlogs ingest pipelines. {pull}26148[26148]
- Fix `kibana.log` pipeline when `event.duration` calculation becomes a Long. {issue}24556[24556] {pull}25675[25675]
- Removed incorrect `http.request.referrer` field from `aws.elb` module. {issue}26435[26435] {pull}26441[26441]
- Fix `threatintel.indicator.url.full` not being populated. {issue}26351[26351] {pull}26508[26508]
- Fix Elasticsearch compatibility for modules that use `type: ip` with `convert` processors. {issue}26629[26629] {pull}26676[26676]
- Fix Elasticsearch compatibility for modules that use the `network_direction` processor. {issue}26629[26629] {pull}26676[26676]
- Fix Elasticsearch compatibility for modules that use the `registered_domain` processor. {issue}26629[26629] {pull}26676[26676]

*Heartbeat*

Expand Down
132 changes: 103 additions & 29 deletions filebeat/fileset/compatibility.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,31 @@ import (
// processorCompatibility defines a processor's minimum version requirements or
// a transformation to make it compatible.
type processorCompatibility struct {
checkVersion func(esVersion *common.Version) bool // Version check returns true if this check applies.
procType string // Elasticsearch Ingest Node processor type.
adaptConfig func(processor map[string]interface{}, log *logp.Logger) (drop bool, err error) // Adapt the configuration to make it compatible.
checkVersion func(esVersion *common.Version) bool // Version check returns true if this check applies.
procType string // Elasticsearch Ingest Node processor type.
adaptConfig func(processor map[string]interface{}, log *logp.Logger) compatAction // Adapt the configuration to make it compatible.
}

type compatAction func(interface{}) (interface{}, error)

func keepProcessor(original interface{}) (interface{}, error) {
return original, nil
}

func dropProcessor(interface{}) (interface{}, error) {
return nil, nil
}

func replaceProcessor(newProc interface{}) compatAction {
return func(interface{}) (interface{}, error) {
return newProc, nil
}
}

func fail(err error) compatAction {
return func(interface{}) (interface{}, error) {
return nil, err
}
}

var processorCompatibilityChecks = []processorCompatibility{
Expand Down Expand Up @@ -70,26 +92,47 @@ var processorCompatibilityChecks = []processorCompatibility{
return esVersion.LessThan(common.MustNewVersion("7.0.0")) &&
!esVersion.LessThan(common.MustNewVersion("6.7.0"))
},
adaptConfig: func(config map[string]interface{}, _ *logp.Logger) (bool, error) {
adaptConfig: func(config map[string]interface{}, _ *logp.Logger) compatAction {
config["ecs"] = true
return false, nil
return keepProcessor
},
},
{
procType: "user_agent",
checkVersion: func(esVersion *common.Version) bool {
return esVersion.LessThan(common.MustNewVersion("6.7.0"))
},
adaptConfig: func(config map[string]interface{}, _ *logp.Logger) (bool, error) {
return false, errors.New("user_agent processor requires option 'ecs: true', Elasticsearch 6.7 or newer required")
adaptConfig: func(config map[string]interface{}, _ *logp.Logger) compatAction {
return fail(errors.New("user_agent processor requires option 'ecs: true', Elasticsearch 6.7 or newer required"))
},
},
{
procType: "convert",
checkVersion: func(esVersion *common.Version) bool {
return esVersion.LessThan(common.MustNewVersion("7.13.0"))
},
adaptConfig: replaceConvertIP,
},
{
procType: "network_direction",
checkVersion: func(esVersion *common.Version) bool {
return esVersion.LessThan(common.MustNewVersion("7.13.0"))
},
adaptConfig: deleteProcessor,
},
{
procType: "registered_domain",
checkVersion: func(esVersion *common.Version) bool {
return esVersion.LessThan(common.MustNewVersion("7.13.0"))
},
adaptConfig: deleteProcessor,
},
}

// adaptPipelineForCompatibility iterates over all processors in the pipeline
// and adapts them for version of Elasticsearch used. Adapt can mean modifying
// processor options or removing the processor.
func adaptPipelineForCompatibility(esVersion common.Version, pipelineID string, content map[string]interface{}, log *logp.Logger) error {
func adaptPipelineForCompatibility(esVersion common.Version, pipelineID string, content map[string]interface{}, log *logp.Logger) (err error) {
p, ok := content["processors"]
if !ok {
return errors.New("'processors' is missing from the pipeline definition")
Expand All @@ -104,12 +147,12 @@ func adaptPipelineForCompatibility(esVersion common.Version, pipelineID string,

nextProcessor:
for i, obj := range processors {
processor, ok := obj.(map[string]interface{})
if !ok {
return fmt.Errorf("processor at index %d is not an object, got %T", i, obj)
}

for _, proc := range processorCompatibilityChecks {
processor, ok := obj.(map[string]interface{})
if !ok {
return fmt.Errorf("processor at index %d is not an object, got %T", i, obj)
}

configIfc, found := processor[proc.procType]
if !found {
continue
Expand All @@ -123,16 +166,17 @@ nextProcessor:
continue
}

drop, err := proc.adaptConfig(config, log.With("processor_type", proc.procType, "processor_index", i))
act := proc.adaptConfig(config, log.With("processor_type", proc.procType, "processor_index", i))
obj, err = act(obj)
if err != nil {
return fmt.Errorf("failed to adapt %q processor at index %d: %w", proc.procType, i, err)
}
if drop {
if obj == nil {
continue nextProcessor
}
}

filteredProcs = append(filteredProcs, processors[i])
filteredProcs = append(filteredProcs, obj)
}

content["processors"] = filteredProcs
Expand All @@ -141,14 +185,16 @@ nextProcessor:

// deleteProcessor returns true to indicate that the processor should be deleted
// in order to adapt the pipeline for backwards compatibility to Elasticsearch.
func deleteProcessor(_ map[string]interface{}, _ *logp.Logger) (bool, error) { return true, nil }
func deleteProcessor(_ map[string]interface{}, _ *logp.Logger) compatAction {
return dropProcessor
}

// replaceSetIgnoreEmptyValue replaces ignore_empty_value option with an if
// statement so ES less than 7.9 will work.
func replaceSetIgnoreEmptyValue(config map[string]interface{}, log *logp.Logger) (bool, error) {
func replaceSetIgnoreEmptyValue(config map[string]interface{}, log *logp.Logger) compatAction {
_, ok := config["ignore_empty_value"].(bool)
if !ok {
return false, nil
return keepProcessor
}

log.Debug("Removing unsupported 'ignore_empty_value' from set processor.")
Expand All @@ -157,11 +203,11 @@ func replaceSetIgnoreEmptyValue(config map[string]interface{}, log *logp.Logger)
_, ok = config["if"].(string)
if ok {
// assume if check is sufficient
return false, nil
return keepProcessor
}
val, ok := config["value"].(string)
if !ok {
return false, nil
return keepProcessor
}

newIf := strings.TrimLeft(val, "{ ")
Expand All @@ -171,37 +217,37 @@ func replaceSetIgnoreEmptyValue(config map[string]interface{}, log *logp.Logger)

log.Debug("Adding if %s to replace 'ignore_empty_value' in set processor.", newIf)
config["if"] = newIf
return false, nil
return keepProcessor
}

// replaceAppendAllowDuplicates replaces allow_duplicates option with an if statement
// so ES less than 7.10 will work.
func replaceAppendAllowDuplicates(config map[string]interface{}, log *logp.Logger) (bool, error) {
func replaceAppendAllowDuplicates(config map[string]interface{}, log *logp.Logger) compatAction {
allow, ok := config["allow_duplicates"].(bool)
if !ok {
return false, nil
return keepProcessor
}

log.Debug("Removing unsupported 'allow_duplicates' from append processor.")
delete(config, "allow_duplicates")

if allow {
// It was set to true, nothing else to do after removing the option.
return false, nil
return keepProcessor
}

currIf, _ := config["if"].(string)
if strings.Contains(strings.ToLower(currIf), "contains") {
// If it has a contains statement, we assume it is checking for duplicates already.
return false, nil
return keepProcessor
}
field, ok := config["field"].(string)
if !ok {
return false, nil
return keepProcessor
}
val, ok := config["value"].(string)
if !ok {
return false, nil
return keepProcessor
}

field = strings.ReplaceAll(field, ".", "?.")
Expand All @@ -220,5 +266,33 @@ func replaceAppendAllowDuplicates(config map[string]interface{}, log *logp.Logge
log.Debug("Adding if %s to replace 'allow_duplicates: false' in append processor.", newIf)
config["if"] = newIf

return false, nil
return keepProcessor
}

// replaceConvertIP replaces convert processors with type: ip with a grok expression that uses
// the IP pattern.
func replaceConvertIP(config map[string]interface{}, log *logp.Logger) compatAction {
wantedType, found := config["type"]
if !found || wantedType != "ip" {
return keepProcessor
}
log.Debug("processor input=", config)
delete(config, "type")
var srcIf, dstIf interface{}
if srcIf, found = config["field"]; !found {
return fail(errors.New("field option is required for convert processor"))
}
if dstIf, found = config["target_field"]; found {
delete(config, "target_field")
} else {
dstIf = srcIf
}
config["patterns"] = []string{
fmt.Sprintf("^%%{IP:%s}$", dstIf),
}
grok := map[string]interface{}{
"grok": config,
}
log.Debug("processor output=", grok)
return replaceProcessor(grok)
}
Loading

0 comments on commit 97ef2e6

Please sign in to comment.