From 5758f3dc37370ae250789a371faabf87db5e03b5 Mon Sep 17 00:00:00 2001 From: Monica Sarbu Date: Fri, 1 Jul 2016 17:41:44 +0200 Subject: [PATCH] Introduce the condition in the processor with the when keyword --- CHANGELOG.asciidoc | 1 + filebeat/tests/system/config/filebeat.yml.j2 | 9 +- libbeat/docs/filteringconfig.asciidoc | 17 ++-- libbeat/processors/actions/drop_event.go | 6 +- libbeat/processors/actions/drop_fields.go | 12 ++- libbeat/processors/actions/include_fields.go | 14 +-- libbeat/processors/condition.go | 20 ++--- libbeat/processors/condition_test.go | 4 +- libbeat/processors/processor.go | 22 ++--- libbeat/processors/processor_test.go | 88 +++++++++++++++---- metricbeat/metricbeat.full.yml | 3 +- .../tests/system/config/metricbeat.yml.j2 | 9 +- .../tests/system/config/packetbeat.yml.j2 | 9 +- 13 files changed, 138 insertions(+), 76 deletions(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index b41690fb77bb..7ea42d714b08 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -14,6 +14,7 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha3...master[Check the HEAD d *Affecting all Beats* - Rename the `filters` section to `processors`. {pull}1944[1944] +- Introduce the condition with `when` in the processor configuration. {pull}1949[1949] *Metricbeat* diff --git a/filebeat/tests/system/config/filebeat.yml.j2 b/filebeat/tests/system/config/filebeat.yml.j2 index 608a4585555f..cf42c9e07a49 100644 --- a/filebeat/tests/system/config/filebeat.yml.j2 +++ b/filebeat/tests/system/config/filebeat.yml.j2 @@ -114,20 +114,23 @@ processors: {%- if include_fields %} - include_fields: - {{include_fields.condition | default()}} + when: + {{include_fields.condition | default()}} fields: {{include_fields.fields | default([])}} {%- endif %} {%- if drop_fields %} - drop_fields: - {{drop_fields.condition | default()}} + when: + {{drop_fields.condition | default()}} fields: {{drop_fields.fields | default([])}} {%- endif %} {%- if drop_event %} - drop_event: - {{ drop_event.condition | default()}} + when: + {{ drop_event.condition | default()}} {%- endif %} {%- endif %} diff --git a/libbeat/docs/filteringconfig.asciidoc b/libbeat/docs/filteringconfig.asciidoc index df747dbace53..06e434b46838 100644 --- a/libbeat/docs/filteringconfig.asciidoc +++ b/libbeat/docs/filteringconfig.asciidoc @@ -28,16 +28,18 @@ event -> processor 1 -> event1 -> processor 2 -> event2 ... See <> for the full list of possible fields. Each processor receives a condition and optionally a set of arguments. The action is executed only if the condition -is fulfilled. +is fulfilled. If not condition is passed then the action is always executed. [source,yaml] ------ processors: - action1: - condition1 + when: + condition1 [arguments] - action2: - condition2 + when: + condition2 [arguments] ... @@ -160,7 +162,8 @@ optional and if it's missing then the defined fields are always exported. The `@ ------- processors: - include_fields: - [condition] + when: + condition fields: ["field1", "field2", ...] ------- @@ -182,7 +185,8 @@ even if they show up in the `drop_fields` list. ----------------------------------------------------- processors: - drop_fields: - [condition] + when: + condition fields: ["field1", "field2", ...] ----------------------------------------------------- @@ -199,6 +203,7 @@ without one all the events are dropped. ------ processors: - drop_event: - condition + when: + condition ------ diff --git a/libbeat/processors/actions/drop_event.go b/libbeat/processors/actions/drop_event.go index cc98b2cb6dca..d0e29da21488 100644 --- a/libbeat/processors/actions/drop_event.go +++ b/libbeat/processors/actions/drop_event.go @@ -12,7 +12,7 @@ type DropEvent struct { } type DropEventConfig struct { - processors.ConditionConfig `config:",inline"` + Cond *processors.ConditionConfig `config:"when"` } func init() { @@ -36,7 +36,7 @@ func newDropEvent(c common.Config) (processors.Processor, error) { return nil, fmt.Errorf("fail to unpack the drop_event configuration: %s", err) } - cond, err := processors.NewCondition(config.ConditionConfig) + cond, err := processors.NewCondition(config.Cond) if err != nil { return nil, err } @@ -48,7 +48,7 @@ func newDropEvent(c common.Config) (processors.Processor, error) { func (f *DropEvent) CheckConfig(c common.Config) error { for _, field := range c.GetFields() { - if !processors.AvailableCondition(field) { + if field != "when" { return fmt.Errorf("unexpected %s option in the drop_event configuration", field) } } diff --git a/libbeat/processors/actions/drop_fields.go b/libbeat/processors/actions/drop_fields.go index 5db3ca35bbe5..fcf7b838e8d7 100644 --- a/libbeat/processors/actions/drop_fields.go +++ b/libbeat/processors/actions/drop_fields.go @@ -15,8 +15,8 @@ type DropFields struct { } type DropFieldsConfig struct { - Fields []string `config:"fields"` - processors.ConditionConfig `config:",inline"` + Fields []string `config:"fields"` + Cond *processors.ConditionConfig `config:"when"` } func init() { @@ -50,7 +50,7 @@ func newDropFields(c common.Config) (processors.Processor, error) { } f.Fields = config.Fields - cond, err := processors.NewCondition(config.ConditionConfig) + cond, err := processors.NewCondition(config.Cond) if err != nil { return nil, err } @@ -64,10 +64,8 @@ func (f *DropFields) CheckConfig(c common.Config) error { complete := false for _, field := range c.GetFields() { - if !processors.AvailableCondition(field) { - if field != "fields" { - return fmt.Errorf("unexpected %s option in the drop_fields configuration", field) - } + if field != "fields" && field != "when" { + return fmt.Errorf("unexpected %s option in the drop_fields configuration", field) } if field == "fields" { complete = true diff --git a/libbeat/processors/actions/include_fields.go b/libbeat/processors/actions/include_fields.go index 741c4c34cf9e..214e1fddc6ce 100644 --- a/libbeat/processors/actions/include_fields.go +++ b/libbeat/processors/actions/include_fields.go @@ -5,6 +5,7 @@ import ( "strings" "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/processors" ) @@ -15,8 +16,8 @@ type IncludeFields struct { } type IncludeFieldsConfig struct { - Fields []string `config:"fields"` - processors.ConditionConfig `config:",inline"` + Fields []string `config:"fields"` + Cond *processors.ConditionConfig `config:"when"` } func init() { @@ -54,7 +55,7 @@ func newIncludeFields(c common.Config) (processors.Processor, error) { } f.Fields = config.Fields - cond, err := processors.NewCondition(config.ConditionConfig) + cond, err := processors.NewCondition(config.Cond) if err != nil { return nil, err } @@ -67,11 +68,10 @@ func (f *IncludeFields) CheckConfig(c common.Config) error { complete := false + logp.Info("include_fields: %v", c) for _, field := range c.GetFields() { - if !processors.AvailableCondition(field) { - if field != "fields" { - return fmt.Errorf("unexpected %s option in the include_fields configuration", field) - } + if field != "fields" && field != "when" { + return fmt.Errorf("unexpected %s option in the include_fields configuration", field) } if field == "fields" { complete = true diff --git a/libbeat/processors/condition.go b/libbeat/processors/condition.go index 2efa08634f5d..a182b04f6b45 100644 --- a/libbeat/processors/condition.go +++ b/libbeat/processors/condition.go @@ -30,20 +30,15 @@ type Condition struct { rangexp map[string]RangeValue } -func AvailableCondition(name string) bool { - - switch name { - case "equals", "contains", "range", "regexp": - return true - default: - return false - } -} - -func NewCondition(config ConditionConfig) (*Condition, error) { +func NewCondition(config *ConditionConfig) (*Condition, error) { c := Condition{} + if config == nil { + // empty condition + return nil, nil + } + if config.Equals != nil { if err := c.setEquals(config.Equals); err != nil { return nil, err @@ -61,8 +56,7 @@ func NewCondition(config ConditionConfig) (*Condition, error) { return nil, err } } else { - // empty condition - return nil, nil + return nil, fmt.Errorf("missing condition") } return &c, nil diff --git a/libbeat/processors/condition_test.go b/libbeat/processors/condition_test.go index dbd94b0ed6f8..a4183de3a54e 100644 --- a/libbeat/processors/condition_test.go +++ b/libbeat/processors/condition_test.go @@ -40,7 +40,7 @@ func TestBadCondition(t *testing.T) { } for _, config := range configs { - _, err := NewCondition(config) + _, err := NewCondition(&config) assert.NotNil(t, err) } } @@ -50,7 +50,7 @@ func GetConditions(t *testing.T, configs []ConditionConfig) []Condition { for _, config := range configs { - cond, err := NewCondition(config) + cond, err := NewCondition(&config) assert.Nil(t, err) conds = append(conds, *cond) } diff --git a/libbeat/processors/processor.go b/libbeat/processors/processor.go index 5d15c4679772..b38cec8fccce 100644 --- a/libbeat/processors/processor.go +++ b/libbeat/processors/processor.go @@ -14,7 +14,7 @@ type Processors struct { func New(config PluginConfig) (*Processors, error) { - processors := Processors{} + procs := Processors{} for _, processor := range config { @@ -35,24 +35,24 @@ func New(config PluginConfig) (*Processors, error) { return nil, err } - processors.addProcessor(plugin) + procs.addProcessor(plugin) } } - logp.Debug("processors", "Processors: %v", processors) - return &processors, nil + logp.Debug("processors", "Processors: %v", procs) + return &procs, nil } -func (processors *Processors) addProcessor(p Processor) { +func (procs *Processors) addProcessor(p Processor) { - processors.list = append(processors.list, p) + procs.list = append(procs.list, p) } // Applies a sequence of processing rules and returns the filtered event -func (processors *Processors) Run(event common.MapStr) common.MapStr { +func (procs *Processors) Run(event common.MapStr) common.MapStr { // Check if processors are set, just return event if not - if len(processors.list) == 0 { + if len(procs.list) == 0 { return event } @@ -60,7 +60,7 @@ func (processors *Processors) Run(event common.MapStr) common.MapStr { filtered := event.Clone() var err error - for _, p := range processors.list { + for _, p := range procs.list { filtered, err = p.Run(filtered) if err != nil { logp.Debug("filter", "fail to apply processor %s: %s", p, err) @@ -74,10 +74,10 @@ func (processors *Processors) Run(event common.MapStr) common.MapStr { return filtered } -func (processors Processors) String() string { +func (procs Processors) String() string { s := []string{} - for _, p := range processors.list { + for _, p := range procs.list { s = append(s, p.String()) } diff --git a/libbeat/processors/processor_test.go b/libbeat/processors/processor_test.go index a25ed78e5f28..349a3fcf3c6d 100644 --- a/libbeat/processors/processor_test.go +++ b/libbeat/processors/processor_test.go @@ -43,8 +43,10 @@ func TestBadConfig(t *testing.T) { yml := []map[string]interface{}{ map[string]interface{}{ "include_fields": map[string]interface{}{ - "contains": map[string]string{ - "proc.name": "test", + "when": map[string]interface{}{ + "contains": map[string]string{ + "proc.name": "test", + }, }, "fields": []string{"proc.cpu.total_p", "proc.mem", "dd"}, }, @@ -82,8 +84,10 @@ func TestIncludeFields(t *testing.T) { yml := []map[string]interface{}{ map[string]interface{}{ "include_fields": map[string]interface{}{ - "contains": map[string]string{ - "proc.name": "test", + "when": map[string]interface{}{ + "contains": map[string]string{ + "proc.name": "test", + }, }, "fields": []string{"proc.cpu.total_p", "proc.mem", "dd"}, }, @@ -148,8 +152,10 @@ func TestIncludeFields1(t *testing.T) { yml := []map[string]interface{}{ map[string]interface{}{ "include_fields": map[string]interface{}{ - "regexp": map[string]string{ - "proc.cmdline": "launchd", + "when": map[string]interface{}{ + "regexp": map[string]string{ + "proc.cmdline": "launchd", + }, }, "fields": []string{"proc.cpu.total_add"}, }, @@ -199,8 +205,10 @@ func TestDropFields(t *testing.T) { yml := []map[string]interface{}{ map[string]interface{}{ "drop_fields": map[string]interface{}{ - "equals": map[string]string{ - "beat.hostname": "mar", + "when": map[string]interface{}{ + "equals": map[string]string{ + "beat.hostname": "mar", + }, }, "fields": []string{"proc.cpu.start_time", "mem", "proc.cmdline", "beat", "dd"}, }, @@ -262,8 +270,10 @@ func TestMultipleIncludeFields(t *testing.T) { yml := []map[string]interface{}{ map[string]interface{}{ "include_fields": map[string]interface{}{ - "contains": map[string]string{ - "beat.name": "my-shipper", + "when": map[string]interface{}{ + "contains": map[string]string{ + "beat.name": "my-shipper", + }, }, "fields": []string{"proc"}, }, @@ -357,9 +367,11 @@ func TestDropEvent(t *testing.T) { yml := []map[string]interface{}{ map[string]interface{}{ "drop_event": map[string]interface{}{ - "range": map[string]interface{}{ - "proc.cpu.total_p": map[string]float64{ - "lt": 0.5, + "when": map[string]interface{}{ + "range": map[string]interface{}{ + "proc.cpu.total_p": map[string]float64{ + "lt": 0.5, + }, }, }, }, @@ -453,8 +465,10 @@ func TestBadCondition(t *testing.T) { yml := []map[string]interface{}{ map[string]interface{}{ "drop_event": map[string]interface{}{ - "equal": map[string]string{ - "type": "process", + "when": map[string]interface{}{ + "equal": map[string]string{ + "type": "process", + }, }, }, }, @@ -488,8 +502,10 @@ func TestMissingFields(t *testing.T) { yml := []map[string]interface{}{ map[string]interface{}{ "include_fields": map[string]interface{}{ - "equals": map[string]string{ - "type": "process", + "when": map[string]interface{}{ + "equals": map[string]string{ + "type": "process", + }, }, }, }, @@ -513,3 +529,41 @@ func TestMissingFields(t *testing.T) { assert.NotNil(t, err) } + +func TestBadConditionConfig(t *testing.T) { + + if testing.Verbose() { + logp.LogInit(logp.LOG_DEBUG, "", false, true, []string{"*"}) + } + + yml := []map[string]interface{}{ + map[string]interface{}{ + "include_fields": map[string]interface{}{ + "when": map[string]interface{}{ + "fake": map[string]string{ + "type": "process", + }, + }, + "fields": []string{"proc.cpu.start_time", "proc.cpu.total_p", "proc.mem.rss_p", "proc.cmdline"}, + }, + }, + } + + config := processors.PluginConfig{} + + for _, action := range yml { + c := map[string]common.Config{} + + for name, actionYml := range action { + actionConfig, err := common.NewConfigFrom(actionYml) + assert.Nil(t, err) + + c[name] = *actionConfig + } + config = append(config, c) + } + + _, err := processors.New(config) + assert.NotNil(t, err) + +} diff --git a/metricbeat/metricbeat.full.yml b/metricbeat/metricbeat.full.yml index 3df8d1ddf88c..48c409f9cb94 100644 --- a/metricbeat/metricbeat.full.yml +++ b/metricbeat/metricbeat.full.yml @@ -213,7 +213,8 @@ metricbeat.modules: # #filters: #- drop_event: -# equals: +# when: +# equals: # http.code: 200 # diff --git a/metricbeat/tests/system/config/metricbeat.yml.j2 b/metricbeat/tests/system/config/metricbeat.yml.j2 index 98acbd966119..6689c4c6c54a 100644 --- a/metricbeat/tests/system/config/metricbeat.yml.j2 +++ b/metricbeat/tests/system/config/metricbeat.yml.j2 @@ -109,19 +109,22 @@ processors: {%- if include_fields %} - include_fields: - {{include_fields.condition | default()}} + when: + {{include_fields.condition | default()}} fields: {{include_fields.fields | default([])}} {%- endif %} {%- if drop_fields %} - drop_fields: - {{drop_fields.condition | default()}} + when: + {{drop_fields.condition | default()}} fields: {{drop_fields.fields | default([])}} {%- endif %} {%- if drop_event %} - drop_event: - {{ drop_event.condition | default()}} + when: + {{ drop_event.condition | default()}} {%- endif %} {%- endif %} diff --git a/packetbeat/tests/system/config/packetbeat.yml.j2 b/packetbeat/tests/system/config/packetbeat.yml.j2 index 07e9eaaa7ece..245898ea28e9 100644 --- a/packetbeat/tests/system/config/packetbeat.yml.j2 +++ b/packetbeat/tests/system/config/packetbeat.yml.j2 @@ -181,20 +181,23 @@ processors: {%- if include_fields %} - include_fields: - {{include_fields.condition | default()}} + when: + {{include_fields.condition | default()}} fields: {{include_fields.fields | default([])}} {%- endif %} {%- if drop_fields %} - drop_fields: - {{drop_fields.condition | default()}} + when: + {{drop_fields.condition | default()}} fields: {{drop_fields.fields | default([])}} {%- endif %} {%- if drop_event %} - drop_event: - {{ drop_event.condition | default()}} + when: + {{ drop_event.condition | default()}} {%- endif %} {%- endif %}