Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Always add conditionals to all processor actions #2068

Merged
merged 4 commits into from
Jul 21, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 14 additions & 23 deletions filebeat/tests/system/config/filebeat.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -108,31 +108,22 @@ geoip:
]
{%- endif %}

{%- if drop_fields or drop_event or include_fields %}
{%- if processors %}

#================================ Filters =====================================
processors:

{%- if include_fields %}
- include_fields:
when:
{{include_fields.condition | default()}}
fields: {{include_fields.fields | default([])}}
{%- endif %}

{%- if drop_fields %}
- drop_fields:
when:
{{drop_fields.condition | default()}}
fields: {{drop_fields.fields | default([])}}
{%- endif %}


{%- if drop_event %}
- drop_event:
when:
{{ drop_event.condition | default()}}
{%- endif %}
processors:
{%- for processor in processors %}
{%- for name, settings in processor.iteritems() %}
- {{name}}:
{%- if settings %}
{%- for k, v in settings.iteritems() %}
{{k}}:
{{v | default([])}}
{%- endfor %}
{%- endif %}
{%- endfor %}
{%- endfor %}

{%- endif %}

Expand All @@ -152,4 +143,4 @@ output.file:
#================================ Paths =====================================
path:
data: {{path_data}}
{%endif%}
{% endif %}
16 changes: 10 additions & 6 deletions filebeat/tests/system/test_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,9 +263,11 @@ def test_with_generic_filtering(self):
overwrite_keys=True,
add_error_key=True,
),
drop_fields={
"fields": ["headers.request-id"],
},
processors=[{
"drop_fields": {
"fields": ["headers.request-id"],
},
}]
)

os.mkdir(self.working_dir + "/log/")
Expand Down Expand Up @@ -303,9 +305,11 @@ def test_with_generic_filtering_remove_headers(self):
overwrite_keys=True,
add_error_key=True,
),
drop_fields={
"fields": ["headers", "res"],
},
processors=[{
"drop_fields": {
"fields": ["headers", "res"],
},
}]
)

os.mkdir(self.working_dir + "/log/")
Expand Down
32 changes: 20 additions & 12 deletions filebeat/tests/system/test_processors.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@ def test_dropfields(self):
"""
self.render_config_template(
path=os.path.abspath(self.working_dir) + "/test.log",
drop_fields={
"fields": ["beat"],
},
processors=[{
"drop_fields": {
"fields": ["beat"],
},
}]
)
with open(self.working_dir + "/test.log", "w") as f:
f.write("test message\n")
Expand All @@ -36,9 +38,11 @@ def test_include_fields(self):
"""
self.render_config_template(
path=os.path.abspath(self.working_dir) + "/test.log",
include_fields={
"fields": ["source", "offset", "message"],
},
processors=[{
"include_fields": {
"fields": ["source", "offset", "message"],
},
}]
)
with open(self.working_dir + "/test.log", "w") as f:
f.write("test message\n")
Expand All @@ -59,9 +63,11 @@ def test_drop_event(self):
"""
self.render_config_template(
path=os.path.abspath(self.working_dir) + "/test*.log",
drop_event={
"condition": "contains.source: test1",
},
processors=[{
"drop_event": {
"when": "contains.source: test1",
},
}]
)
with open(self.working_dir + "/test1.log", "w") as f:
f.write("test1 message\n")
Expand All @@ -86,9 +92,11 @@ def test_condition(self):
"""
self.render_config_template(
path=os.path.abspath(self.working_dir) + "/test*.log",
drop_event={
"condition": "not.contains.source: test",
},
processors=[{
"drop_event": {
"when": "not.contains.source: test",
},
}]
)
with open(self.working_dir + "/test1.log", "w") as f:
f.write("test1 message\n")
Expand Down
64 changes: 64 additions & 0 deletions libbeat/processors/actions/checks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package actions

import (
"fmt"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/processors"
)

func configChecked(
constr processors.Constructor,
checks ...func(common.Config) error,
) processors.Constructor {
validator := checkAll(checks...)
return func(c common.Config) (processors.Processor, error) {
err := validator(c)
if err != nil {
return nil, fmt.Errorf("%v in %v", err.Error(), c.Path())
}

return constr(c)
}
}

func checkAll(checks ...func(common.Config) error) func(common.Config) error {
return func(c common.Config) error {
for _, check := range checks {
if err := check(c); err != nil {
return err
}
}
return nil
}
}

func requireFields(fields ...string) func(common.Config) error {
return func(c common.Config) error {
for _, field := range fields {
if !c.HasField(field) {
return fmt.Errorf("missing %v option", field)
}
}
return nil
}
}

func allowedFields(fields ...string) func(common.Config) error {
return func(c common.Config) error {
for _, field := range c.GetFields() {
found := false
for _, allowed := range fields {
if field == allowed {
found = true
break
}
}

if !found {
return fmt.Errorf("unexpected %v option", field)
}
}
return nil
}
}
59 changes: 6 additions & 53 deletions libbeat/processors/actions/drop_event.go
Original file line number Diff line number Diff line change
@@ -1,73 +1,26 @@
package actions

import (
"fmt"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/processors"
)

type DropEvent struct {
Cond *processors.Condition
}

type DropEventConfig struct {
Cond *processors.ConditionConfig `config:"when"`
}
type dropEvent struct{}

func init() {
if err := processors.RegisterPlugin("drop_event", newDropEvent); err != nil {
constructor := configChecked(newDropEvent, allowedFields("when"))
if err := processors.RegisterPlugin("drop_event", constructor); err != nil {
panic(err)
}
}

func newDropEvent(c common.Config) (processors.Processor, error) {

f := DropEvent{}

if err := f.CheckConfig(c); err != nil {
return nil, err
}

config := DropEventConfig{}

err := c.Unpack(&config)
if err != nil {
return nil, fmt.Errorf("fail to unpack the drop_event configuration: %s", err)
}

cond, err := processors.NewCondition(config.Cond)
if err != nil {
return nil, err
}
f.Cond = cond

return &f, nil
}

func (f *DropEvent) CheckConfig(c common.Config) error {

for _, field := range c.GetFields() {
if field != "when" {
return fmt.Errorf("unexpected %s option in the drop_event configuration", field)
}
}
return nil
return dropEvent{}, nil
}

func (f *DropEvent) Run(event common.MapStr) (common.MapStr, error) {

if f.Cond != nil && !f.Cond.Check(event) {
return event, nil
}

func (f dropEvent) Run(event common.MapStr) (common.MapStr, error) {
// return event=nil to delete the entire event
return nil, nil
}

func (f DropEvent) String() string {
if f.Cond != nil {
return "drop_event, condition=" + f.Cond.String()
}
return "drop_event"
}
func (f dropEvent) String() string { return "drop_event" }
68 changes: 11 additions & 57 deletions libbeat/processors/actions/drop_fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,33 +8,22 @@ import (
"github.com/elastic/beats/libbeat/processors"
)

type DropFields struct {
type dropFields struct {
Fields []string
// condition
Cond *processors.Condition
}

type DropFieldsConfig struct {
Fields []string `config:"fields"`
Cond *processors.ConditionConfig `config:"when"`
}

func init() {
if err := processors.RegisterPlugin("drop_fields", newDropFields); err != nil {
constructor := configChecked(newDropFields,
requireFields("fields"), allowedFields("fields", "when"))
if err := processors.RegisterPlugin("drop_fields", constructor); err != nil {
panic(err)
}
}

func newDropFields(c common.Config) (processors.Processor, error) {

f := DropFields{}

if err := f.CheckConfig(c); err != nil {
return nil, err
}

config := DropFieldsConfig{}

config := struct {
Fields []string `config:"fields"`
}{}
err := c.Unpack(&config)
if err != nil {
return nil, fmt.Errorf("fail to unpack the drop_fields configuration: %s", err)
Expand All @@ -48,42 +37,12 @@ func newDropFields(c common.Config) (processors.Processor, error) {
}
}
}
f.Fields = config.Fields

cond, err := processors.NewCondition(config.Cond)
if err != nil {
return nil, err
}
f.Cond = cond

return &f, nil
f := dropFields{Fields: config.Fields}
return f, nil
}

func (f *DropFields) CheckConfig(c common.Config) error {

complete := false

for _, field := range c.GetFields() {
if field != "fields" && field != "when" {
return fmt.Errorf("unexpected %s option in the drop_fields configuration", field)
}
if field == "fields" {
complete = true
}
}

if !complete {
return fmt.Errorf("missing fields option in the drop_fields configuration")
}
return nil
}

func (f *DropFields) Run(event common.MapStr) (common.MapStr, error) {

if f.Cond != nil && !f.Cond.Check(event) {
return event, nil
}

func (f dropFields) Run(event common.MapStr) (common.MapStr, error) {
for _, field := range f.Fields {
err := event.Delete(field)
if err != nil {
Expand All @@ -94,11 +53,6 @@ func (f *DropFields) Run(event common.MapStr) (common.MapStr, error) {
return event, nil
}

func (f DropFields) String() string {

if f.Cond != nil {
return "drop_fields=" + strings.Join(f.Fields, ", ") + ", condition=" + f.Cond.String()
}
func (f dropFields) String() string {
return "drop_fields=" + strings.Join(f.Fields, ", ")

}
Loading