Skip to content

Commit

Permalink
Use dots in field names and make filters pluggable (#1623)
Browse files Browse the repository at this point in the history
Changes:

* Each filtering action is a plugin, so you can easily add a new action to the filtering
*  Add drop_event action
*  Be able to pass fields that contain . in the condition:  equals: process.pid: 34443 
*  Add a sample example in libbeat.yml
*  Add system tests for Topbeat, Packetbeat, Filebeat, Metricbeat
  • Loading branch information
monicasarbu authored and tsg committed May 19, 2016
1 parent 8a70d3b commit 234c177
Show file tree
Hide file tree
Showing 32 changed files with 1,605 additions and 538 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha2...master[Check the HEAD d

*Affecting all Beats*

- Add conditions to generic filtering {pull}1623[1623]

*Packetbeat*

*Topbeat*
Expand Down
12 changes: 12 additions & 0 deletions filebeat/filebeat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -453,3 +453,15 @@ logging.files:

# Number of rotated log files to keep. Oldest files will be deleted first.
#keepfiles: 7

#================================ Filters =====================================

# This section defines a list of filtering rules that are applied one by one starting with the
# exported event:
# event -> filter1 -> event1 -> filter2 ->event2 ...
# Supported actions: drop_fields, drop_event, include_fields
#filters:
# - drop_fields:
# equals:
# status: OK
# fields: [ ]
24 changes: 17 additions & 7 deletions filebeat/tests/system/config/filebeat.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -132,19 +132,29 @@ output:
rotate_every_kb: 1000
number_of_files: 2

{% if filter_enabled %}

############################# Filters #########################################
{%- if drop_fields or drop_event or include_fields %}
filters:

{%- if include_fields %}
- include_fields:
{{include_fields.condition | default()}}
fields: {{include_fields.fields | default([])}}
{%- endif %}

{%- if drop_fields %}
- drop_fields:
fields: {{drop_fields}}
{{drop_fields.condition | default()}}
fields: {{drop_fields.fields | default([])}}
{%- endif %}
{%- if include_fields is not none %}
- include_fields:
fields: {{include_fields}}


{%- if drop_event %}
- drop_event:
{{ drop_event.condition | default()}}
{%- endif %}
{% endif %}

{%- endif %}

{% if path_data %}
path:
Expand Down
38 changes: 33 additions & 5 deletions filebeat/tests/system/test_filtering.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ def test_dropfields(self):
"""
self.render_config_template(
path=os.path.abspath(self.working_dir) + "/test.log",
filter_enabled=True,
drop_fields=["beat"],
include_fields=None,
drop_fields={
"fields": ["beat"],
},
)
with open(self.working_dir + "/test.log", "w") as f:
f.write("test message\n")
Expand All @@ -36,8 +36,9 @@ def test_include_fields(self):
"""
self.render_config_template(
path=os.path.abspath(self.working_dir) + "/test.log",
filter_enabled=True,
include_fields=["source", "offset", "message"]
include_fields={
"fields": ["source", "offset", "message"],
},
)
with open(self.working_dir + "/test.log", "w") as f:
f.write("test message\n")
Expand All @@ -51,3 +52,30 @@ def test_include_fields(self):
)[0]
assert "beat.name" not in output
assert "message" in output

def test_drop_event(self):
"""
Check drop_event filtering action
"""
self.render_config_template(
path=os.path.abspath(self.working_dir) + "/test*.log",
drop_event={
"condition": "contains.source: test1",
},
)
with open(self.working_dir + "/test1.log", "w") as f:
f.write("test1 message\n")

with open(self.working_dir + "/test2.log", "w") as f:
f.write("test2 message\n")

filebeat = self.start_beat()
self.wait_until(lambda: self.output_has(lines=1))
filebeat.check_kill_and_wait()

output = self.read_output(
required_fields=["@timestamp", "type"],
)[0]
assert "beat.name" in output
assert "message" in output
assert "test" in output["message"]
12 changes: 6 additions & 6 deletions filebeat/tests/system/test_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,9 +257,9 @@ def test_with_generic_filtering(self):
overwrite_keys=True,
add_error_key=True,
),
filter_enabled=True,
drop_fields=["headers.request-id"],
include_fields=None,
drop_fields={
"fields": ["headers.request-id"],
},
)

os.mkdir(self.working_dir + "/log/")
Expand Down Expand Up @@ -297,9 +297,9 @@ def test_with_generic_filtering_remove_headers(self):
overwrite_keys=True,
add_error_key=True,
),
filter_enabled=True,
drop_fields=["headers", "res"],
include_fields=None,
drop_fields={
"fields": ["headers", "res"],
},
)

os.mkdir(self.working_dir + "/log/")
Expand Down
12 changes: 12 additions & 0 deletions libbeat/_beat/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -250,3 +250,15 @@ logging.files:

# Number of rotated log files to keep. Oldest files will be deleted first.
#keepfiles: 7

#================================ Filters =====================================

# This section defines a list of filtering rules that are applied one by one starting with the
# exported event:
# event -> filter1 -> event1 -> filter2 ->event2 ...
# Supported actions: drop_fields, drop_event, include_fields
#filters:
# - drop_fields:
# equals:
# status: OK
# fields: [ ]
6 changes: 3 additions & 3 deletions libbeat/beat/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"github.com/elastic/beats/libbeat/cfgfile"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/filter"
_ "github.com/elastic/beats/libbeat/filter/rules"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/paths"
"github.com/elastic/beats/libbeat/publisher"
Expand Down Expand Up @@ -106,15 +107,15 @@ type Beat struct {
Config BeatConfig // Common Beat configuration data.
Publisher *publisher.Publisher // Publisher

filters *filter.FilterList // Filters
filters *filter.Filters // Filters
}

// BeatConfig struct contains the basic configuration of every beat
type BeatConfig struct {
Shipper publisher.ShipperConfig `config:",inline"`
Output map[string]*common.Config `config:"output"`
Logging logp.Logging `config:"logging"`
Filters []filter.FilterConfig `config:"filters"`
Filters filter.FilterPluginConfig `config:"filters"`
Path paths.Path `config:"path"`
}

Expand Down Expand Up @@ -226,7 +227,6 @@ func (bc *instance) config() error {
if err != nil {
return fmt.Errorf("error initializing filters: %v", err)
}
debugf("Filters: %+v", bc.data.filters)

if bc.data.Config.Shipper.MaxProcs != nil {
maxProcs := *bc.data.Config.Shipper.MaxProcs
Expand Down
3 changes: 3 additions & 0 deletions libbeat/common/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,3 +98,6 @@ func fromConfig(in *ucfg.Config) *Config {
func (c *Config) access() *ucfg.Config {
return (*ucfg.Config)(c)
}
func (c *Config) GetFields() []string {
return c.access().GetFields()
}
Loading

0 comments on commit 234c177

Please sign in to comment.