Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add logql filter to match stages and drop capability #1112

Merged
merged 3 commits into from
Oct 15, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 7 additions & 8 deletions docs/clients/promtail/pipelines.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ stages:
2. Change the timestamp of the log line
3. Change the content of the log line
4. Create a metric based on the extracted data
4. **Filtering stages** optionally apply a subset of stages based on some
4. **Filtering stages** optionally apply a subset of stages or drop entries based on some
condition.

Typical pipelines will start with a parsing stage (such as a
Expand All @@ -28,7 +28,7 @@ something with that extract data. The most common action stage will be a
[labels](./stages/labels.md) stage to turn extracted data into a label.

A common stage will also be the [match](./stages/match.md) stage to selectively
apply stages based on the current labels.
apply stages or drop entries based on a [LogQL stream selector and filter expressions](../../logql.md).

Note that pipelines can not currently be used to deduplicate logs; Loki will
receive the same log line multiple times if, for example:
Expand Down Expand Up @@ -76,9 +76,9 @@ scrape_configs:
source: timestamp

# This stage is only going to run if the scraped target has a label of
# "name" with a value of "nginx".
# "name" with a value of "nginx" and if the log line contains the word "GET"
- match:
selector: '{name="nginx"}'
selector: '{name="nginx"} |= "GET"'
stages:
# This regex stage extracts a new output by matching against some
# values and capturing the rest.
Expand Down Expand Up @@ -126,10 +126,10 @@ scrape_configs:
level:
component:

# This stage will only run if the scraped target has a label of "app"
# and a value of "some-app".
# This stage will only run if the scraped target has a label "app"
# with a value of "some-app" and the log line doesn't contains the word "info"
- match:
selector: '{app="some-app"}'
selector: '{app="some-app"} != "info"'
stages:
# The regex stage tries to extract a Go panic by looking for panic:
# in the log message.
Expand Down Expand Up @@ -207,4 +207,3 @@ Action stages:
Filtering stages:

* [match](./stages/match.md): Conditionally run stages based on the label set.

32 changes: 22 additions & 10 deletions docs/clients/promtail/stages/match.md
Original file line number Diff line number Diff line change
@@ -1,20 +1,24 @@
# `match` stage

The match stage is a filtering stage that conditionally applies a set of stages
when a log entry matches a configurable [LogQL](../../../logql.md) stream
selector.
or drop entries when a log entry matches a configurable [LogQL](../../../logql.md)
stream selector and filter expressions.

## Schema

```yaml
match:
# LogQL stream selector.
# LogQL stream selector and filter expressions.
selector: <string>

# Names the pipeline. When defined, creates an additional label in
# the pipeline_duration_seconds histogram, where the value is
# concatenated with job_name using an underscore.
[pipieline_name: <string>]
[pipeline_name: <string>]

# When set to true (default to false), all entries matching the selector will
# be dropped. Stages must not be defined when dropping entries.
[drop_entries: <bool>]

# Nested set of pipeline stages only if the selector
# matches the labels of the log entries:
Expand Down Expand Up @@ -46,40 +50,48 @@ pipeline_stages:
- labels:
app:
- match:
selector: "{app=\"loki\"}"
selector: '{app="loki"}'
stages:
- json:
expressions:
msg: message
- match:
pipeline_name: "app2"
selector: "{app=\"pokey\"}"
selector: '{app="pokey"}'
stages:
- json:
expressions:
msg: msg
- match:
selector: '{app="promtail"} |~ ".*noisy error.*"'
drop_entries: true
cyriltovena marked this conversation as resolved.
Show resolved Hide resolved
- output:
source: msg
```

And the given log line:
And given log lines:

```
```json
{ "time":"2012-11-01T22:08:41+00:00", "app":"loki", "component": ["parser","type"], "level" : "WARN", "message" : "app1 log line" }
{ "time":"2012-11-01T22:08:41+00:00", "app":"promtail", "component": ["parser","type"], "level" : "ERROR", "message" : "foo noisy error" }
```

The first stage will add `app` with a value of `loki` into the extracted map,
The first stage will add `app` with a value of `loki` into the extracted map for the first log line,
while the second stage will add `app` as a label (again with the value of `loki`).
The second line will follow the same flow and will be added the label `app` with a value of `promtail`.

The third stage uses LogQL to only execute the nested stages when there is a
label of `app` whose value is `loki`. This matches in our case; the nested
label of `app` whose value is `loki`. This matches the first line in our case; the nested
`json` stage then adds `msg` into the extracted map with a value of `app1 log
line`.

The fourth stage uses LogQL to only executed the nested stages when there is a
label of `app` whose value is `pokey`. This does **not** match in our case, so
the nested `json` stage is not ran.

The fifth stage will drop any entries from the application `promtail` that matches
the regex `.*noisy error`.

The final `output` stage changes the contents of the log line to be the value of
`msg` from the extracted map. In this case, the log line is changed to `app1 log
line`.
47 changes: 37 additions & 10 deletions pkg/logentry/stages/match.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@ package stages
import (
"time"

"github.com/prometheus/prometheus/pkg/labels"

"github.com/go-kit/kit/log"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"

"github.com/grafana/loki/pkg/logql"
)
Expand All @@ -19,17 +20,19 @@ const (
ErrSelectorRequired = "selector statement required for match stage"
ErrMatchRequiresStages = "match stage requires at least one additional stage to be defined in '- stages'"
ErrSelectorSyntax = "invalid selector syntax for match stage"
ErrStagesWithDropLine = "match stage configured to drop entries cannot contains stages"
)

// MatcherConfig contains the configuration for a matcherStage
type MatcherConfig struct {
PipelineName *string `mapstructure:"pipeline_name"`
Selector string `mapstructure:"selector"`
Stages PipelineStages `mapstructure:"stages"`
DropEntries bool `mapstructure:"drop_entries"`
}

// validateMatcherConfig validates the MatcherConfig for the matcherStage
func validateMatcherConfig(cfg *MatcherConfig) ([]*labels.Matcher, error) {
func validateMatcherConfig(cfg *MatcherConfig) (logql.LogSelectorExpr, error) {
if cfg == nil {
return nil, errors.New(ErrEmptyMatchStageConfig)
}
Expand All @@ -39,14 +42,18 @@ func validateMatcherConfig(cfg *MatcherConfig) ([]*labels.Matcher, error) {
if cfg.Selector == "" {
return nil, errors.New(ErrSelectorRequired)
}
if cfg.Stages == nil || len(cfg.Stages) == 0 {
if !cfg.DropEntries && (cfg.Stages == nil || len(cfg.Stages) == 0) {
return nil, errors.New(ErrMatchRequiresStages)
}
matchers, err := logql.ParseMatchers(cfg.Selector)
if cfg.DropEntries && (cfg.Stages != nil && len(cfg.Stages) != 0) {
return nil, errors.New(ErrStagesWithDropLine)
}

selector, err := logql.ParseLogSelector(cfg.Selector)
if err != nil {
return nil, errors.Wrap(err, ErrSelectorSyntax)
}
return matchers, nil
return selector, nil
}

// newMatcherStage creates a new matcherStage from config
Expand All @@ -56,7 +63,7 @@ func newMatcherStage(logger log.Logger, jobName *string, config interface{}, reg
if err != nil {
return nil, err
}
matchers, err := validateMatcherConfig(cfg)
selector, err := validateMatcherConfig(cfg)
if err != nil {
return nil, err
}
Expand All @@ -67,21 +74,34 @@ func newMatcherStage(logger log.Logger, jobName *string, config interface{}, reg
nPtr = &name
}

pl, err := NewPipeline(logger, cfg.Stages, nPtr, registerer)
var pl *Pipeline
if !cfg.DropEntries {
var err error
pl, err = NewPipeline(logger, cfg.Stages, nPtr, registerer)
if err != nil {
return nil, errors.Wrapf(err, "match stage failed to create pipeline from config: %v", config)
}
}

filter, err := selector.Filter()
if err != nil {
return nil, errors.Wrapf(err, "match stage failed to create pipeline from config: %v", config)
return nil, errors.Wrap(err, "error parsing filter")
}

return &matcherStage{
matchers: matchers,
matchers: selector.Matchers(),
pipeline: pl,
drop: cfg.DropEntries,
filter: filter,
}, nil
}

// matcherStage applies Label matchers to determine if the include stages should be run
type matcherStage struct {
matchers []*labels.Matcher
filter logql.Filter
pipeline Stage
drop bool
}

// Process implements Stage
Expand All @@ -91,7 +111,14 @@ func (m *matcherStage) Process(labels model.LabelSet, extracted map[string]inter
return
}
}
m.pipeline.Process(labels, extracted, t, entry)
if m.filter == nil || m.filter([]byte(*entry)) {
// Adds the drop label to not be sent by the api.EntryHandler
if m.drop {
labels[dropLabel] = ""
return
}
m.pipeline.Process(labels, extracted, t, entry)
}
}

// Name implements Stage
Expand Down
Loading