Skip to content

Commit

Permalink
Introduce the condition in the processor with the when keyword
Browse files Browse the repository at this point in the history
  • Loading branch information
monicasarbu committed Jul 1, 2016
1 parent 7a88510 commit 5758f3d
Show file tree
Hide file tree
Showing 13 changed files with 138 additions and 76 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha3...master[Check the HEAD d

*Affecting all Beats*
- Rename the `filters` section to `processors`. {pull}1944[1944]
- Introduce the condition with `when` in the processor configuration. {pull}1949[1949]

*Metricbeat*

Expand Down
9 changes: 6 additions & 3 deletions filebeat/tests/system/config/filebeat.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -114,20 +114,23 @@ processors:

{%- if include_fields %}
- include_fields:
{{include_fields.condition | default()}}
when:
{{include_fields.condition | default()}}
fields: {{include_fields.fields | default([])}}
{%- endif %}

{%- if drop_fields %}
- drop_fields:
{{drop_fields.condition | default()}}
when:
{{drop_fields.condition | default()}}
fields: {{drop_fields.fields | default([])}}
{%- endif %}


{%- if drop_event %}
- drop_event:
{{ drop_event.condition | default()}}
when:
{{ drop_event.condition | default()}}
{%- endif %}

{%- endif %}
Expand Down
17 changes: 11 additions & 6 deletions libbeat/docs/filteringconfig.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,18 @@ event -> processor 1 -> event1 -> processor 2 -> event2 ...
See <<exported-fields>> for the full list of possible fields.

Each processor receives a condition and optionally a set of arguments. The action is executed only if the condition
is fulfilled.
is fulfilled. If not condition is passed then the action is always executed.

[source,yaml]
------
processors:
- action1:
condition1
when:
condition1
[arguments]
- action2:
condition2
when:
condition2
[arguments]
...
Expand Down Expand Up @@ -160,7 +162,8 @@ optional and if it's missing then the defined fields are always exported. The `@
-------
processors:
- include_fields:
[condition]
when:
condition
fields: ["field1", "field2", ...]
-------

Expand All @@ -182,7 +185,8 @@ even if they show up in the `drop_fields` list.
-----------------------------------------------------
processors:
- drop_fields:
[condition]
when:
condition
fields: ["field1", "field2", ...]
-----------------------------------------------------

Expand All @@ -199,6 +203,7 @@ without one all the events are dropped.
------
processors:
- drop_event:
condition
when:
condition
------

6 changes: 3 additions & 3 deletions libbeat/processors/actions/drop_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ type DropEvent struct {
}

type DropEventConfig struct {
processors.ConditionConfig `config:",inline"`
Cond *processors.ConditionConfig `config:"when"`
}

func init() {
Expand All @@ -36,7 +36,7 @@ func newDropEvent(c common.Config) (processors.Processor, error) {
return nil, fmt.Errorf("fail to unpack the drop_event configuration: %s", err)
}

cond, err := processors.NewCondition(config.ConditionConfig)
cond, err := processors.NewCondition(config.Cond)
if err != nil {
return nil, err
}
Expand All @@ -48,7 +48,7 @@ func newDropEvent(c common.Config) (processors.Processor, error) {
func (f *DropEvent) CheckConfig(c common.Config) error {

for _, field := range c.GetFields() {
if !processors.AvailableCondition(field) {
if field != "when" {
return fmt.Errorf("unexpected %s option in the drop_event configuration", field)
}
}
Expand Down
12 changes: 5 additions & 7 deletions libbeat/processors/actions/drop_fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ type DropFields struct {
}

type DropFieldsConfig struct {
Fields []string `config:"fields"`
processors.ConditionConfig `config:",inline"`
Fields []string `config:"fields"`
Cond *processors.ConditionConfig `config:"when"`
}

func init() {
Expand Down Expand Up @@ -50,7 +50,7 @@ func newDropFields(c common.Config) (processors.Processor, error) {
}
f.Fields = config.Fields

cond, err := processors.NewCondition(config.ConditionConfig)
cond, err := processors.NewCondition(config.Cond)
if err != nil {
return nil, err
}
Expand All @@ -64,10 +64,8 @@ func (f *DropFields) CheckConfig(c common.Config) error {
complete := false

for _, field := range c.GetFields() {
if !processors.AvailableCondition(field) {
if field != "fields" {
return fmt.Errorf("unexpected %s option in the drop_fields configuration", field)
}
if field != "fields" && field != "when" {
return fmt.Errorf("unexpected %s option in the drop_fields configuration", field)
}
if field == "fields" {
complete = true
Expand Down
14 changes: 7 additions & 7 deletions libbeat/processors/actions/include_fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"strings"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/processors"
)

Expand All @@ -15,8 +16,8 @@ type IncludeFields struct {
}

type IncludeFieldsConfig struct {
Fields []string `config:"fields"`
processors.ConditionConfig `config:",inline"`
Fields []string `config:"fields"`
Cond *processors.ConditionConfig `config:"when"`
}

func init() {
Expand Down Expand Up @@ -54,7 +55,7 @@ func newIncludeFields(c common.Config) (processors.Processor, error) {
}
f.Fields = config.Fields

cond, err := processors.NewCondition(config.ConditionConfig)
cond, err := processors.NewCondition(config.Cond)
if err != nil {
return nil, err
}
Expand All @@ -67,11 +68,10 @@ func (f *IncludeFields) CheckConfig(c common.Config) error {

complete := false

logp.Info("include_fields: %v", c)
for _, field := range c.GetFields() {
if !processors.AvailableCondition(field) {
if field != "fields" {
return fmt.Errorf("unexpected %s option in the include_fields configuration", field)
}
if field != "fields" && field != "when" {
return fmt.Errorf("unexpected %s option in the include_fields configuration", field)
}
if field == "fields" {
complete = true
Expand Down
20 changes: 7 additions & 13 deletions libbeat/processors/condition.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,20 +30,15 @@ type Condition struct {
rangexp map[string]RangeValue
}

func AvailableCondition(name string) bool {

switch name {
case "equals", "contains", "range", "regexp":
return true
default:
return false
}
}

func NewCondition(config ConditionConfig) (*Condition, error) {
func NewCondition(config *ConditionConfig) (*Condition, error) {

c := Condition{}

if config == nil {
// empty condition
return nil, nil
}

if config.Equals != nil {
if err := c.setEquals(config.Equals); err != nil {
return nil, err
Expand All @@ -61,8 +56,7 @@ func NewCondition(config ConditionConfig) (*Condition, error) {
return nil, err
}
} else {
// empty condition
return nil, nil
return nil, fmt.Errorf("missing condition")
}

return &c, nil
Expand Down
4 changes: 2 additions & 2 deletions libbeat/processors/condition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestBadCondition(t *testing.T) {
}

for _, config := range configs {
_, err := NewCondition(config)
_, err := NewCondition(&config)
assert.NotNil(t, err)
}
}
Expand All @@ -50,7 +50,7 @@ func GetConditions(t *testing.T, configs []ConditionConfig) []Condition {

for _, config := range configs {

cond, err := NewCondition(config)
cond, err := NewCondition(&config)
assert.Nil(t, err)
conds = append(conds, *cond)
}
Expand Down
22 changes: 11 additions & 11 deletions libbeat/processors/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ type Processors struct {

func New(config PluginConfig) (*Processors, error) {

processors := Processors{}
procs := Processors{}

for _, processor := range config {

Expand All @@ -35,32 +35,32 @@ func New(config PluginConfig) (*Processors, error) {
return nil, err
}

processors.addProcessor(plugin)
procs.addProcessor(plugin)
}
}

logp.Debug("processors", "Processors: %v", processors)
return &processors, nil
logp.Debug("processors", "Processors: %v", procs)
return &procs, nil
}

func (processors *Processors) addProcessor(p Processor) {
func (procs *Processors) addProcessor(p Processor) {

processors.list = append(processors.list, p)
procs.list = append(procs.list, p)
}

// Applies a sequence of processing rules and returns the filtered event
func (processors *Processors) Run(event common.MapStr) common.MapStr {
func (procs *Processors) Run(event common.MapStr) common.MapStr {

// Check if processors are set, just return event if not
if len(processors.list) == 0 {
if len(procs.list) == 0 {
return event
}

// clone the event at first, before starting filtering
filtered := event.Clone()
var err error

for _, p := range processors.list {
for _, p := range procs.list {
filtered, err = p.Run(filtered)
if err != nil {
logp.Debug("filter", "fail to apply processor %s: %s", p, err)
Expand All @@ -74,10 +74,10 @@ func (processors *Processors) Run(event common.MapStr) common.MapStr {
return filtered
}

func (processors Processors) String() string {
func (procs Processors) String() string {
s := []string{}

for _, p := range processors.list {
for _, p := range procs.list {

s = append(s, p.String())
}
Expand Down
Loading

0 comments on commit 5758f3d

Please sign in to comment.