diff --git a/filebeat/tests/system/config/filebeat.yml.j2 b/filebeat/tests/system/config/filebeat.yml.j2 index a80640eb0298..2c05a9e23ca6 100644 --- a/filebeat/tests/system/config/filebeat.yml.j2 +++ b/filebeat/tests/system/config/filebeat.yml.j2 @@ -108,31 +108,22 @@ geoip: ] {%- endif %} -{%- if drop_fields or drop_event or include_fields %} +{%- if processors %} #================================ Filters ===================================== -processors: - - {%- if include_fields %} - - include_fields: - when: - {{include_fields.condition | default()}} - fields: {{include_fields.fields | default([])}} - {%- endif %} - - {%- if drop_fields %} - - drop_fields: - when: - {{drop_fields.condition | default()}} - fields: {{drop_fields.fields | default([])}} - {%- endif %} - - {%- if drop_event %} - - drop_event: - when: - {{ drop_event.condition | default()}} - {%- endif %} +processors: +{%- for processor in processors %} +{%- for name, settings in processor.iteritems() %} +- {{name}}: + {%- if settings %} + {%- for k, v in settings.iteritems() %} + {{k}}: + {{v | default([])}} + {%- endfor %} + {%- endif %} +{%- endfor %} +{%- endfor %} {%- endif %} @@ -152,4 +143,4 @@ output.file: #================================ Paths ===================================== path: data: {{path_data}} -{%endif%} +{% endif %} diff --git a/filebeat/tests/system/test_json.py b/filebeat/tests/system/test_json.py index eae0fdd42e25..d7d78f98cd29 100644 --- a/filebeat/tests/system/test_json.py +++ b/filebeat/tests/system/test_json.py @@ -263,9 +263,11 @@ def test_with_generic_filtering(self): overwrite_keys=True, add_error_key=True, ), - drop_fields={ - "fields": ["headers.request-id"], - }, + processors=[{ + "drop_fields": { + "fields": ["headers.request-id"], + }, + }] ) os.mkdir(self.working_dir + "/log/") @@ -303,9 +305,11 @@ def test_with_generic_filtering_remove_headers(self): overwrite_keys=True, add_error_key=True, ), - drop_fields={ - "fields": ["headers", "res"], - }, + processors=[{ + "drop_fields": { + "fields": ["headers", "res"], + }, + }] ) os.mkdir(self.working_dir + "/log/") diff --git a/filebeat/tests/system/test_processors.py b/filebeat/tests/system/test_processors.py index 5c75b6ca5eed..042896fc2d6f 100644 --- a/filebeat/tests/system/test_processors.py +++ b/filebeat/tests/system/test_processors.py @@ -13,9 +13,11 @@ def test_dropfields(self): """ self.render_config_template( path=os.path.abspath(self.working_dir) + "/test.log", - drop_fields={ - "fields": ["beat"], - }, + processors=[{ + "drop_fields": { + "fields": ["beat"], + }, + }] ) with open(self.working_dir + "/test.log", "w") as f: f.write("test message\n") @@ -36,9 +38,11 @@ def test_include_fields(self): """ self.render_config_template( path=os.path.abspath(self.working_dir) + "/test.log", - include_fields={ - "fields": ["source", "offset", "message"], - }, + processors=[{ + "include_fields": { + "fields": ["source", "offset", "message"], + }, + }] ) with open(self.working_dir + "/test.log", "w") as f: f.write("test message\n") @@ -59,9 +63,11 @@ def test_drop_event(self): """ self.render_config_template( path=os.path.abspath(self.working_dir) + "/test*.log", - drop_event={ - "condition": "contains.source: test1", - }, + processors=[{ + "drop_event": { + "when": "contains.source: test1", + }, + }] ) with open(self.working_dir + "/test1.log", "w") as f: f.write("test1 message\n") @@ -86,9 +92,11 @@ def test_condition(self): """ self.render_config_template( path=os.path.abspath(self.working_dir) + "/test*.log", - drop_event={ - "condition": "not.contains.source: test", - }, + processors=[{ + "drop_event": { + "when": "not.contains.source: test", + }, + }] ) with open(self.working_dir + "/test1.log", "w") as f: f.write("test1 message\n") diff --git a/libbeat/processors/actions/checks.go b/libbeat/processors/actions/checks.go new file mode 100644 index 000000000000..5848a400e947 --- /dev/null +++ b/libbeat/processors/actions/checks.go @@ -0,0 +1,64 @@ +package actions + +import ( + "fmt" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/processors" +) + +func configChecked( + constr processors.Constructor, + checks ...func(common.Config) error, +) processors.Constructor { + validator := checkAll(checks...) + return func(c common.Config) (processors.Processor, error) { + err := validator(c) + if err != nil { + return nil, fmt.Errorf("%v in %v", err.Error(), c.Path()) + } + + return constr(c) + } +} + +func checkAll(checks ...func(common.Config) error) func(common.Config) error { + return func(c common.Config) error { + for _, check := range checks { + if err := check(c); err != nil { + return err + } + } + return nil + } +} + +func requireFields(fields ...string) func(common.Config) error { + return func(c common.Config) error { + for _, field := range fields { + if !c.HasField(field) { + return fmt.Errorf("missing %v option", field) + } + } + return nil + } +} + +func allowedFields(fields ...string) func(common.Config) error { + return func(c common.Config) error { + for _, field := range c.GetFields() { + found := false + for _, allowed := range fields { + if field == allowed { + found = true + break + } + } + + if !found { + return fmt.Errorf("unexpected %v option", field) + } + } + return nil + } +} diff --git a/libbeat/processors/actions/drop_event.go b/libbeat/processors/actions/drop_event.go index d0e29da21488..76be30a6cda5 100644 --- a/libbeat/processors/actions/drop_event.go +++ b/libbeat/processors/actions/drop_event.go @@ -1,73 +1,26 @@ package actions import ( - "fmt" - "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/processors" ) -type DropEvent struct { - Cond *processors.Condition -} - -type DropEventConfig struct { - Cond *processors.ConditionConfig `config:"when"` -} +type dropEvent struct{} func init() { - if err := processors.RegisterPlugin("drop_event", newDropEvent); err != nil { + constructor := configChecked(newDropEvent, allowedFields("when")) + if err := processors.RegisterPlugin("drop_event", constructor); err != nil { panic(err) } } func newDropEvent(c common.Config) (processors.Processor, error) { - - f := DropEvent{} - - if err := f.CheckConfig(c); err != nil { - return nil, err - } - - config := DropEventConfig{} - - err := c.Unpack(&config) - if err != nil { - return nil, fmt.Errorf("fail to unpack the drop_event configuration: %s", err) - } - - cond, err := processors.NewCondition(config.Cond) - if err != nil { - return nil, err - } - f.Cond = cond - - return &f, nil -} - -func (f *DropEvent) CheckConfig(c common.Config) error { - - for _, field := range c.GetFields() { - if field != "when" { - return fmt.Errorf("unexpected %s option in the drop_event configuration", field) - } - } - return nil + return dropEvent{}, nil } -func (f *DropEvent) Run(event common.MapStr) (common.MapStr, error) { - - if f.Cond != nil && !f.Cond.Check(event) { - return event, nil - } - +func (f dropEvent) Run(event common.MapStr) (common.MapStr, error) { // return event=nil to delete the entire event return nil, nil } -func (f DropEvent) String() string { - if f.Cond != nil { - return "drop_event, condition=" + f.Cond.String() - } - return "drop_event" -} +func (f dropEvent) String() string { return "drop_event" } diff --git a/libbeat/processors/actions/drop_fields.go b/libbeat/processors/actions/drop_fields.go index fcf7b838e8d7..955fb437972a 100644 --- a/libbeat/processors/actions/drop_fields.go +++ b/libbeat/processors/actions/drop_fields.go @@ -8,33 +8,22 @@ import ( "github.com/elastic/beats/libbeat/processors" ) -type DropFields struct { +type dropFields struct { Fields []string - // condition - Cond *processors.Condition -} - -type DropFieldsConfig struct { - Fields []string `config:"fields"` - Cond *processors.ConditionConfig `config:"when"` } func init() { - if err := processors.RegisterPlugin("drop_fields", newDropFields); err != nil { + constructor := configChecked(newDropFields, + requireFields("fields"), allowedFields("fields", "when")) + if err := processors.RegisterPlugin("drop_fields", constructor); err != nil { panic(err) } } func newDropFields(c common.Config) (processors.Processor, error) { - - f := DropFields{} - - if err := f.CheckConfig(c); err != nil { - return nil, err - } - - config := DropFieldsConfig{} - + config := struct { + Fields []string `config:"fields"` + }{} err := c.Unpack(&config) if err != nil { return nil, fmt.Errorf("fail to unpack the drop_fields configuration: %s", err) @@ -48,42 +37,12 @@ func newDropFields(c common.Config) (processors.Processor, error) { } } } - f.Fields = config.Fields - cond, err := processors.NewCondition(config.Cond) - if err != nil { - return nil, err - } - f.Cond = cond - - return &f, nil + f := dropFields{Fields: config.Fields} + return f, nil } -func (f *DropFields) CheckConfig(c common.Config) error { - - complete := false - - for _, field := range c.GetFields() { - if field != "fields" && field != "when" { - return fmt.Errorf("unexpected %s option in the drop_fields configuration", field) - } - if field == "fields" { - complete = true - } - } - - if !complete { - return fmt.Errorf("missing fields option in the drop_fields configuration") - } - return nil -} - -func (f *DropFields) Run(event common.MapStr) (common.MapStr, error) { - - if f.Cond != nil && !f.Cond.Check(event) { - return event, nil - } - +func (f dropFields) Run(event common.MapStr) (common.MapStr, error) { for _, field := range f.Fields { err := event.Delete(field) if err != nil { @@ -94,11 +53,6 @@ func (f *DropFields) Run(event common.MapStr) (common.MapStr, error) { return event, nil } -func (f DropFields) String() string { - - if f.Cond != nil { - return "drop_fields=" + strings.Join(f.Fields, ", ") + ", condition=" + f.Cond.String() - } +func (f dropFields) String() string { return "drop_fields=" + strings.Join(f.Fields, ", ") - } diff --git a/libbeat/processors/actions/include_fields.go b/libbeat/processors/actions/include_fields.go index 214e1fddc6ce..c8be043c8d4d 100644 --- a/libbeat/processors/actions/include_fields.go +++ b/libbeat/processors/actions/include_fields.go @@ -5,37 +5,25 @@ import ( "strings" "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/processors" ) -type IncludeFields struct { +type includeFields struct { Fields []string - // condition - Cond *processors.Condition -} - -type IncludeFieldsConfig struct { - Fields []string `config:"fields"` - Cond *processors.ConditionConfig `config:"when"` } func init() { - if err := processors.RegisterPlugin("include_fields", newIncludeFields); err != nil { + constructor := configChecked(newIncludeFields, + requireFields("fields"), allowedFields("fields", "when")) + if err := processors.RegisterPlugin("include_fields", constructor); err != nil { panic(err) } } func newIncludeFields(c common.Config) (processors.Processor, error) { - - f := IncludeFields{} - - if err := f.CheckConfig(c); err != nil { - return nil, err - } - - config := IncludeFieldsConfig{} - + config := struct { + Fields []string `config:"fields"` + }{} err := c.Unpack(&config) if err != nil { return nil, fmt.Errorf("fail to unpack the include_fields configuration: %s", err) @@ -53,43 +41,12 @@ func newIncludeFields(c common.Config) (processors.Processor, error) { config.Fields = append(config.Fields, readOnly) } } - f.Fields = config.Fields - - cond, err := processors.NewCondition(config.Cond) - if err != nil { - return nil, err - } - f.Cond = cond + f := includeFields{Fields: config.Fields} return &f, nil } -func (f *IncludeFields) CheckConfig(c common.Config) error { - - complete := false - - logp.Info("include_fields: %v", c) - for _, field := range c.GetFields() { - if field != "fields" && field != "when" { - return fmt.Errorf("unexpected %s option in the include_fields configuration", field) - } - if field == "fields" { - complete = true - } - } - - if !complete { - return fmt.Errorf("missing fields option in the include_fields configuration") - } - return nil -} - -func (f *IncludeFields) Run(event common.MapStr) (common.MapStr, error) { - - if f.Cond != nil && !f.Cond.Check(event) { - return event, nil - } - +func (f includeFields) Run(event common.MapStr) (common.MapStr, error) { filtered := common.MapStr{} for _, field := range f.Fields { @@ -109,10 +66,6 @@ func (f *IncludeFields) Run(event common.MapStr) (common.MapStr, error) { return filtered, nil } -func (f IncludeFields) String() string { - - if f.Cond != nil { - return "include_fields=" + strings.Join(f.Fields, ", ") + ", condition=" + f.Cond.String() - } +func (f includeFields) String() string { return "include_fields=" + strings.Join(f.Fields, ", ") } diff --git a/libbeat/processors/condition.go b/libbeat/processors/condition.go index 5e940a791575..0af73cf30832 100644 --- a/libbeat/processors/condition.go +++ b/libbeat/processors/condition.go @@ -33,6 +33,24 @@ type Condition struct { not *Condition } +type WhenProcessor struct { + condition *Condition + p Processor +} + +func NewConditional( + ruleFactory Constructor, +) Constructor { + return func(cfg common.Config) (Processor, error) { + rule, err := ruleFactory(cfg) + if err != nil { + return nil, err + } + + return addCondition(cfg, rule) + } +} + func NewCondition(config *ConditionConfig) (*Condition, error) { c := Condition{} @@ -460,3 +478,50 @@ func (e EqualsValue) String() string { } return strconv.Itoa(int(e.Int)) } + +func NewConditionRule( + config ConditionConfig, + p Processor, +) (Processor, error) { + cond, err := NewCondition(&config) + if err != nil { + logp.Err("Failed to initialize lookup condition: %v", err) + return nil, err + } + + if cond == nil { + return p, nil + } + return &WhenProcessor{cond, p}, nil +} + +func (r *WhenProcessor) Run(event common.MapStr) (common.MapStr, error) { + if !r.condition.Check(event) { + return event, nil + } + return r.p.Run(event) +} + +func (r *WhenProcessor) String() string { + return fmt.Sprintf("%v, condition=%v", r.p.String(), r.condition.String()) +} + +func addCondition( + cfg common.Config, + p Processor, +) (Processor, error) { + if !cfg.HasField("when") { + return p, nil + } + sub, err := cfg.Child("when", -1) + if err != nil { + return nil, err + } + + condConfig := ConditionConfig{} + if err := sub.Unpack(&condConfig); err != nil { + return nil, err + } + + return NewConditionRule(condConfig, p) +} diff --git a/libbeat/processors/condition_test.go b/libbeat/processors/condition_test.go index ac963cd97668..7b855031828e 100644 --- a/libbeat/processors/condition_test.go +++ b/libbeat/processors/condition_test.go @@ -1,6 +1,7 @@ package processors import ( + "errors" "testing" "github.com/elastic/beats/libbeat/common" @@ -8,6 +9,17 @@ import ( "github.com/stretchr/testify/assert" ) +type countFilter struct { + N int +} + +func (c *countFilter) Run(e common.MapStr) (common.MapStr, error) { + c.N++ + return e, nil +} + +func (c *countFilter) String() string { return "count" } + func TestBadCondition(t *testing.T) { if testing.Verbose() { @@ -534,3 +546,71 @@ func TestCombinedCondition(t *testing.T) { assert.True(t, conds[0].Check(event)) } + +func TestWhenProcessor(t *testing.T) { + type config map[string]interface{} + + tests := []struct { + title string + filter config + events []common.MapStr + expected int + }{ + { + "condition_matches", + config{"when.equals.i": 10}, + []common.MapStr{{"i": 10}}, + 1, + }, + { + "condition_fails", + config{"when.equals.i": 11}, + []common.MapStr{{"i": 10}}, + 0, + }, + { + "no_condition", + config{}, + []common.MapStr{{"i": 10}}, + 1, + }, + } + + for i, test := range tests { + t.Logf("run test (%v): %v", i, test.title) + + config, err := common.NewConfigFrom(test.filter) + if err != nil { + t.Error(err) + continue + } + + cf := &countFilter{} + filter, err := NewConditional(func(_ common.Config) (Processor, error) { + return cf, nil + })(*config) + if err != nil { + t.Error(err) + continue + } + + for _, event := range test.events { + _, err := filter.Run(event) + if err != nil { + t.Error(err) + } + } + + assert.Equal(t, test.expected, cf.N) + } +} + +func TestConditionRuleInitErrorPropagates(t *testing.T) { + testErr := errors.New("test") + filter, err := NewConditional(func(_ common.Config) (Processor, error) { + return nil, testErr + })(common.Config{}) + + assert.Equal(t, testErr, err) + assert.Nil(t, filter) +} diff --git a/libbeat/processors/processor_test.go b/libbeat/processors/processor_test.go index 349a3fcf3c6d..41061a8887fe 100644 --- a/libbeat/processors/processor_test.go +++ b/libbeat/processors/processor_test.go @@ -481,7 +481,9 @@ func TestBadCondition(t *testing.T) { for name, actionYml := range action { actionConfig, err := common.NewConfigFrom(actionYml) - assert.Nil(t, err) + if err != nil { + t.Fatal(err) + } c[name] = *actionConfig } @@ -489,7 +491,7 @@ func TestBadCondition(t *testing.T) { } _, err := processors.New(config) - assert.NotNil(t, err) + assert.Error(t, err) } diff --git a/libbeat/processors/registry.go b/libbeat/processors/registry.go index b24133311470..cc7023d30e02 100644 --- a/libbeat/processors/registry.go +++ b/libbeat/processors/registry.go @@ -23,6 +23,6 @@ func RegisterPlugin(name string, constructor Constructor) error { if _, exists := constructors[name]; exists { return fmt.Errorf("plugin %s already registered", name) } - constructors[name] = constructor + constructors[name] = NewConditional(constructor) return nil } diff --git a/metricbeat/tests/system/config/metricbeat.yml.j2 b/metricbeat/tests/system/config/metricbeat.yml.j2 index 6689c4c6c54a..264f3310f6b8 100644 --- a/metricbeat/tests/system/config/metricbeat.yml.j2 +++ b/metricbeat/tests/system/config/metricbeat.yml.j2 @@ -102,30 +102,22 @@ geoip: ] {%- endif %} -{%- if drop_fields or drop_event or include_fields %} +{%- if processors %} -#================================ Processors ===================================== -processors: - - {%- if include_fields %} - - include_fields: - when: - {{include_fields.condition | default()}} - fields: {{include_fields.fields | default([])}} - {%- endif %} - - {%- if drop_fields %} - - drop_fields: - when: - {{drop_fields.condition | default()}} - fields: {{drop_fields.fields | default([])}} - {%- endif %} +#================================ Filters ===================================== - {%- if drop_event %} - - drop_event: - when: - {{ drop_event.condition | default()}} +processors: +{%- for processor in processors %} +{%- for name, settings in processor.iteritems() %} +- {{name}}: + {%- if settings %} + {%- for k, v in settings.iteritems() %} + {{k}}: + {{v | default([])}} + {%- endfor %} {%- endif %} +{%- endfor %} +{%- endfor %} {%- endif %} diff --git a/metricbeat/tests/system/test_processors.py b/metricbeat/tests/system/test_processors.py index d01fabc588a3..e01b2ff27f7f 100644 --- a/metricbeat/tests/system/test_processors.py +++ b/metricbeat/tests/system/test_processors.py @@ -14,10 +14,12 @@ def test_drop_fields(self): "metricsets": ["cpu"], "period": "1s" }], - drop_fields={ - "condition": "range.system.cpu.system.pct.lt: 0.1", - "fields": ["system.cpu.load"], - }, + processors=[{ + "drop_fields":{ + "when": "range.system.cpu.system.pct.lt: 0.1", + "fields": ["system.cpu.load"], + }, + }] ) proc = self.start_beat() self.wait_until(lambda: self.output_lines() > 0) @@ -52,10 +54,12 @@ def test_dropfields_with_condition(self): "metricsets": ["process"], "period": "1s" }], - drop_fields={ - "fields": ["system.process.memory"], - "condition": "range.system.process.cpu.total.pct.lt: 0.5", - }, + processors=[{ + "drop_fields":{ + "fields": ["system.process.memory"], + "when": "range.system.process.cpu.total.pct.lt: 0.5", + }, + }] ) metricbeat = self.start_beat() self.wait_until( @@ -84,9 +88,11 @@ def test_dropevent_with_condition(self): "metricsets": ["process"], "period": "1s" }], - drop_event={ - "condition": "range.system.process.cpu.total.pct.lt: 0.001", - }, + processors=[{ + "drop_event":{ + "when": "range.system.process.cpu.total.pct.lt: 0.001", + }, + }] ) metricbeat = self.start_beat() self.wait_until( @@ -112,9 +118,11 @@ def test_dropevent_with_complex_condition(self): "metricsets": ["process"], "period": "1s" }], - drop_event={ - "condition": "not.contains.system.process.cmdline: metricbeat.test", - }, + processors=[{ + "drop_event":{ + "when.not": "contains.system.process.cmdline: metricbeat.test", + }, + }] ) metricbeat = self.start_beat() self.wait_until( @@ -139,7 +147,9 @@ def test_include_fields(self): "metricsets": ["process"], "period": "1s" }], - include_fields={"fields": ["system.process.cpu", "system.process.memory"]}, + processors=[{ + "include_fields":{"fields": ["system.process.cpu", "system.process.memory"]}, + }] ) metricbeat = self.start_beat() self.wait_until( @@ -179,8 +189,11 @@ def test_multiple_actions(self): "metricsets": ["process"], "period": "1s" }], - include_fields={"fields": ["system.process"]}, - drop_fields={"fields": ["system.process.memory"]}, + processors=[{ + "include_fields":{"fields": ["system.process"]}, + }, { + "drop_fields": {"fields": ["system.process.memory"]}, + }] ) metricbeat = self.start_beat() self.wait_until( @@ -218,8 +231,15 @@ def test_contradictory_multiple_actions(self): "metricsets": ["process"], "period": "1s" }], - include_fields={"fields": ["system.process.memory.size", "proc.memory.rss.pct"]}, - drop_fields={"fields": ["system.process.memory.size", "proc.memory.rss.pct"]}, + processors=[{ + "include_fields":{ + "fields": ["system.process.memory.size", "proc.memory.rss.pct"], + }, + }, { + "drop_fields": { + "fields": ["system.process.memory.size", "proc.memory.rss.pct"], + }, + }] ) metricbeat = self.start_beat() self.wait_until( diff --git a/packetbeat/tests/system/config/packetbeat.yml.j2 b/packetbeat/tests/system/config/packetbeat.yml.j2 index 245898ea28e9..4f284f015da4 100644 --- a/packetbeat/tests/system/config/packetbeat.yml.j2 +++ b/packetbeat/tests/system/config/packetbeat.yml.j2 @@ -174,31 +174,22 @@ geoip: ] {%- endif %} -{%- if drop_fields or drop_event or include_fields %} +{%- if processors %} -#================================ Processors ===================================== -processors: +#================================ Filters ===================================== - {%- if include_fields %} - - include_fields: - when: - {{include_fields.condition | default()}} - fields: {{include_fields.fields | default([])}} - {%- endif %} - - {%- if drop_fields %} - - drop_fields: - when: - {{drop_fields.condition | default()}} - fields: {{drop_fields.fields | default([])}} - {%- endif %} - - - {%- if drop_event %} - - drop_event: - when: - {{ drop_event.condition | default()}} - {%- endif %} +processors: +{%- for processor in processors %} +{%- for name, settings in processor.iteritems() %} +- {{name}}: + {%- if settings %} + {%- for k, v in settings.iteritems() %} + {{k}}: + {{v | default([])}} + {%- endfor %} + {%- endif %} +{%- endfor %} +{%- endfor %} {%- endif %} diff --git a/packetbeat/tests/system/test_0060_processors.py b/packetbeat/tests/system/test_0060_processors.py index e926b06cd684..c95800557f29 100644 --- a/packetbeat/tests/system/test_0060_processors.py +++ b/packetbeat/tests/system/test_0060_processors.py @@ -7,9 +7,11 @@ def test_drop_map_fields(self): self.render_config_template( http_send_all_headers=True, - drop_fields={"fields": ["http.request_headers"]}, - # export all fields - include_fields=None, + processors=[{ + "drop_fields": { + "fields": ["http.request_headers"] + }, + }] ) self.run_packetbeat(pcap="http_minitwit.pcap", @@ -30,10 +32,12 @@ def test_drop_fields_with_cond(self): self.render_config_template( http_send_all_headers=True, - drop_fields={ - "fields": ["http.request_headers", "http.response_headers"], - "condition": "equals.http.code: 200", - }, + processors=[{ + "drop_fields": { + "fields": ["http.request_headers", "http.response_headers"], + "when": "equals.http.code: 200", + }, + }] ) self.run_packetbeat(pcap="http_minitwit.pcap", @@ -60,10 +64,12 @@ def test_include_fields_with_cond(self): self.render_config_template( http_send_request=True, http_send_response=True, - include_fields={ - "fields": ["http"], - "condition": "equals.http.code: 200", - }, + processors=[{ + "include_fields": { + "fields": ["http"], + "when": "equals.http.code: 200", + }, + }] ) self.run_packetbeat(pcap="http_minitwit.pcap", @@ -92,10 +98,12 @@ def test_drop_fields_with_cond_range(self): self.render_config_template( http_send_request=True, http_send_response=True, - drop_fields={ - "fields": ["request", "response"], - "condition": "range.http.code.lt: 300", - }, + processors=[{ + "drop_fields": { + "fields": ["request", "response"], + "when": "range.http.code.lt: 300", + }, + }] ) self.run_packetbeat(pcap="http_minitwit.pcap", @@ -120,9 +128,11 @@ def test_drop_fields_with_cond_range(self): def test_drop_event_with_cond(self): self.render_config_template( - drop_event={ - "condition": "range.http.code.lt: 300", - }, + processors=[{ + "drop_event": { + "when": "range.http.code.lt: 300", + }, + }] ) self.run_packetbeat(pcap="http_minitwit.pcap", @@ -138,11 +148,11 @@ def test_drop_end_fields(self): self.render_config_template( http_send_all_headers=True, - drop_fields={ - "fields": ["http.response_headers.transfer-encoding"] - }, - # export all fields - include_fields=None, + processors=[{ + "drop_fields": { + "fields": ["http.response_headers.transfer-encoding"] + }, + }] ) self.run_packetbeat(pcap="http_minitwit.pcap", @@ -167,11 +177,11 @@ def test_drop_unknown_field(self): self.render_config_template( http_send_all_headers=True, - drop_fields={ - "fields": ["http.response_headers.transfer-encoding-test"] - }, - # export all fields - include_fields=None, + processors=[{ + "drop_fields": { + "fields": ["http.response_headers.transfer-encoding-test"] + }, + }] ) self.run_packetbeat(pcap="http_minitwit.pcap", @@ -196,9 +206,11 @@ def test_drop_event(self): self.render_config_template( http_send_all_headers=True, - drop_event={ - "condition": "equals.status: OK", - }, + processors=[{ + "drop_event": { + "when": "equals.status: OK", + }, + }] ) self.run_packetbeat(pcap="http_minitwit.pcap", @@ -212,9 +224,11 @@ def test_include_empty_list(self): self.render_config_template( http_send_all_headers=True, # export all mandatory fields - include_fields={ - "fields": [], - }, + processors=[{ + "include_fields": { + "fields": [], + }, + }] ) self.run_packetbeat(pcap="http_minitwit.pcap", @@ -228,14 +242,14 @@ def test_include_empty_list(self): assert "http.response_headers" not in objs[0] def test_drop_no_fields(self): - self.render_config_template( http_send_all_headers=True, - drop_fields={ - "fields": [], - }, - # export all fields - include_fields=None, + processors=[{ + "drop_fields": { + "fields": [], + }, + # export all fields + }] ) self.run_packetbeat(pcap="http_minitwit.pcap", @@ -250,16 +264,18 @@ def test_drop_no_fields(self): assert objs[2]["status"] == "Error" def test_drop_and_include_fields_failed_cond(self): - self.render_config_template( http_send_all_headers=True, - include_fields={ - "fields": ["http"], - }, - drop_fields={ - "fields": ["http.request_headers", "http.response_headers"], - "condition": "equals.status: OK", - }, + processors=[{ + "include_fields": { + "fields": ["http"], + }, + }, { + "drop_fields": { + "fields": ["http.request_headers", "http.response_headers"], + "when": "equals.status: OK", + }, + }] ) self.run_packetbeat(pcap="http_minitwit.pcap", @@ -281,13 +297,16 @@ def test_drop_and_include_fields(self): self.render_config_template( http_send_all_headers=True, - include_fields={ - "fields": ["http"], - }, - drop_fields={ - "fields": ["http.request_headers", "http.response_headers"], - "condition": "equals.http.code: 200", - }, + processors=[{ + "include_fields": { + "fields": ["http"], + }, + }, { + "drop_fields": { + "fields": ["http.request_headers", "http.response_headers"], + "when": "equals.http.code: 200", + }, + }] ) self.run_packetbeat(pcap="http_minitwit.pcap", @@ -309,14 +328,16 @@ def test_condition_and(self): self.render_config_template( http_send_all_headers=True, - include_fields={ - "fields": ["http"], - "condition": """ - and: - - equals.type: http - - equals.http.code: 200 - """ - }, + processors=[{ + "include_fields": { + "fields": ["http"], + "when": """ + and: + - equals.type: http + - equals.http.code: 200 + """ + }, + }] ) self.run_packetbeat(pcap="http_minitwit.pcap", @@ -334,13 +355,15 @@ def test_condition_or(self): self.render_config_template( http_send_all_headers=True, - drop_event={ - "condition": """ - or: - - equals.http.code: 404 - - equals.http.code: 200 - """ - }, + processors=[{ + "drop_event": { + "when": """ + or: + - equals.http.code: 404 + - equals.http.code: 200 + """ + }, + }] ) self.run_packetbeat(pcap="http_minitwit.pcap", @@ -356,9 +379,11 @@ def test_condition_not(self): self.render_config_template( http_send_all_headers=True, - drop_event={ - "condition": "not.equals.http.code: 200", - }, + processors=[{ + "drop_event": { + "when.not": "equals.http.code: 200", + }, + }] ) self.run_packetbeat(pcap="http_minitwit.pcap",