Skip to content

Commit

Permalink
Collapse multiline logs based on a start line. (#3024)
Browse files Browse the repository at this point in the history
* Collapse multiline logs based on a start line.

Summary:
This is a very simple approach based on #1380 to provide multiline
or block log entries in promtail.

A `multiline` stage is added to pipelines. This stages matches a start
line. Once a start line is matched all following lines are appended
to an entry and not passed on to downstream stages. Once a new start
line is matched the former block of multilines is sent.

If now new line arrives withing `max_wait_time` the block is flushed to
the next stage and a new block is started.

* Test multiline stage process.

* Format code.

* Flush multiline block after `max_lines`.

* Capture internal state of the stage.

* Process different multiline streams in parallel.

* Start documenting multiline stage.

* Give an example configuration for `multiline` stage.

* Make linter happy.

* Pass through entries until first start line.

* Update pkg/logentry/stages/multiline.go
  • Loading branch information
jeschkies authored Dec 3, 2020
1 parent c98dfa0 commit afc1e53
Show file tree
Hide file tree
Showing 5 changed files with 457 additions and 0 deletions.
1 change: 1 addition & 0 deletions docs/sources/clients/promtail/pipelines.md
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ Parsing stages:

Transform stages:

- [multiline](../stages/multiline/): Merges multiple lines, e.g. stack traces, into multiline blocks.
- [template](../stages/template/): Use Go templates to modify extracted data.

Action stages:
Expand Down
67 changes: 67 additions & 0 deletions docs/sources/clients/promtail/stages/multiline.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
---
title: multiline
---

# `multiline` stage

The `multiline` stage multiple lines into a multiline block before passing it on to the next stage in the pipeline.

A new block is identified by the `firstline` regular expression. Any line that does *not* match the expression is considered to be part of the block of the previous match.

## Schema

```yaml
multiline:
# RE2 regular expression, if matched will start a new multiline block.
# This expresion must be provided.
firstline: <string>

# The maximum wait time will be parsed as a Go duration: https://golang.org/pkg/time/#ParseDuration.
# If now new logs arrive withing this maximum wait time the current block will be sent on.
# This is useful if the opserved application dies with e.g. an exception. No new logs will arrive and the exception
# block is sent *after* the maximum wait time expired.
# It defaults to 3s.
max_wait_time: <duration>

# Maximum number of lines a block can have. If block has more lines a new block is started.
# The default is 128 lines.
max_lines: <integer>
```
## Examples
Let's say we have the following logs from a very simple [flask](https://flask.palletsprojects.com) service.
```
[2020-12-03 11:36:20] "GET /hello HTTP/1.1" 200 -
[2020-12-03 11:36:23] ERROR in app: Exception on /error [GET]
Traceback (most recent call last):
File "/home/pallets/.pyenv/versions/3.8.5/lib/python3.8/site-packages/flask/app.py", line 2447, in wsgi_app
response = self.full_dispatch_request()
File "/home/pallets/.pyenv/versions/3.8.5/lib/python3.8/site-packages/flask/app.py", line 1952, in full_dispatch_request
rv = self.handle_user_exception(e)
File "/home/pallets/.pyenv/versions/3.8.5/lib/python3.8/site-packages/flask/app.py", line 1821, in handle_user_exception
reraise(exc_type, exc_value, tb)
File "/home/pallets/.pyenv/versions/3.8.5/lib/python3.8/site-packages/flask/_compat.py", line 39, in reraise
raise value
File "/home/pallets/.pyenv/versions/3.8.5/lib/python3.8/site-packages/flask/app.py", line 1950, in full_dispatch_request
rv = self.dispatch_request()
File "/home/pallets/.pyenv/versions/3.8.5/lib/python3.8/site-packages/flask/app.py", line 1936, in dispatch_request
return self.view_functions[rule.endpoint](**req.view_args)
File "/home/pallets/src/deployment_tools/hello.py", line 10, in error
raise Exception("Sorry, this route always breaks")
Exception: Sorry, this route always breaks
[2020-12-03 11:36:23] "GET /error HTTP/1.1" 500 -
[2020-12-03 11:36:26] "GET /hello HTTP/1.1" 200 -
[2020-12-03 11:36:27] "GET /hello HTTP/1.1" 200 -
```

We would like to collapse all lines of the traceback into one multiline block. All blocks start with a timestamp in brackets. Thus we configure a `multiline` stage with the `firstline` regular expression `^\[\d{4}-\d{2}-\d{2} \d{1,2}:\d{2}:\d{2}\]`. This will match the start of the traceback but not the following lines until `Exception: Sorry, this route always breaks`. These will be part of a multiline block and one log entry in Loki.

```yaml
multiline:
# Identify timestamps as first line of a multiline block.
firstline: "^\[\d{4}-\d{2}-\d{2} \d{1,2}:\d{2}:\d{2}\]"

max_wait_time: 3s
```
226 changes: 226 additions & 0 deletions pkg/logentry/stages/multiline.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
package stages

import (
"bytes"
"fmt"
"regexp"
"sync"
"time"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
"github.com/prometheus/common/model"

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/promtail/api"
)

const (
ErrMultilineStageEmptyConfig = "multiline stage config must define `firstline` regular expression"
ErrMultilineStageInvalidRegex = "multiline stage first line regex compilation error: %v"
ErrMultilineStageInvalidMaxWaitTime = "multiline stage `max_wait_time` parse error: %v"
)

const (
maxLineDefault uint64 = 128
maxWaitDefault = 3 * time.Second
)

// MultilineConfig contains the configuration for a multilineStage
type MultilineConfig struct {
Expression *string `mapstructure:"firstline"`
regex *regexp.Regexp
MaxLines *uint64 `mapstructure:"max_lines"`
MaxWaitTime *string `mapstructure:"max_wait_time"`
maxWait time.Duration
}

func validateMultilineConfig(cfg *MultilineConfig) error {
if cfg == nil || cfg.Expression == nil {
return errors.New(ErrMultilineStageEmptyConfig)
}

expr, err := regexp.Compile(*cfg.Expression)
if err != nil {
return errors.Errorf(ErrMultilineStageInvalidRegex, err)
}
cfg.regex = expr

if cfg.MaxWaitTime != nil {
maxWait, err := time.ParseDuration(*cfg.MaxWaitTime)
if err != nil {
return errors.Errorf(ErrMultilineStageInvalidMaxWaitTime, err)
}
cfg.maxWait = maxWait
} else {
cfg.maxWait = maxWaitDefault
}

if cfg.MaxLines == nil {
cfg.MaxLines = new(uint64)
*cfg.MaxLines = maxLineDefault
}

return nil
}

// dropMultiline matches lines to determine whether the following lines belong to a block and should be collapsed
type multilineStage struct {
logger log.Logger
cfg *MultilineConfig
}

// multilineState captures the internal state of a running multiline stage.
type multilineState struct {
buffer *bytes.Buffer // The lines of the current multiline block.
startLineEntry Entry // The entry of the start line of a multiline block.
currentLines uint64 // The number of lines of the current multiline block.
}

// newMulitlineStage creates a MulitlineStage from config
func newMultilineStage(logger log.Logger, config interface{}) (Stage, error) {
cfg := &MultilineConfig{}
err := mapstructure.WeakDecode(config, cfg)
if err != nil {
return nil, err
}
err = validateMultilineConfig(cfg)
if err != nil {
return nil, err
}

return &multilineStage{
logger: log.With(logger, "component", "stage", "type", "multiline"),
cfg: cfg,
}, nil
}

func (m *multilineStage) Run(in chan Entry) chan Entry {
out := make(chan Entry)
go func() {
defer close(out)

streams := make(map[model.Fingerprint](chan Entry))
wg := new(sync.WaitGroup)

for e := range in {
key := e.Labels.FastFingerprint()
s, ok := streams[key]
if !ok {
// Pass through entries until we hit first start line.
if !m.cfg.regex.MatchString(e.Line) {
if Debug {
level.Debug(m.logger).Log("msg", "pass through entry", "stream", key)
}
out <- e
continue
}

if Debug {
level.Debug(m.logger).Log("msg", "creating new stream", "stream", key)
}
s = make(chan Entry)
streams[key] = s

wg.Add(1)
go m.runMultiline(s, out, wg)
}
if Debug {
level.Debug(m.logger).Log("msg", "pass entry", "stream", key, "line", e.Line)
}
s <- e
}

// Close all streams and wait for them to finish being processed.
for _, s := range streams {
close(s)
}
wg.Wait()
}()
return out
}

func (m *multilineStage) runMultiline(in chan Entry, out chan Entry, wg *sync.WaitGroup) {
defer wg.Done()

state := &multilineState{
buffer: new(bytes.Buffer),
currentLines: 0,
}

for {
select {
case <-time.After(m.cfg.maxWait):
if Debug {
level.Debug(m.logger).Log("msg", fmt.Sprintf("flush multiline block due to %v timeout", m.cfg.maxWait), "block", state.buffer.String())
}
m.flush(out, state)
case e, ok := <-in:
if Debug {
level.Debug(m.logger).Log("msg", "processing line", "line", e.Line, "stream", e.Labels.FastFingerprint())
}

if !ok {
if Debug {
level.Debug(m.logger).Log("msg", "flush multiline block because inbound closed", "block", state.buffer.String(), "stream", e.Labels.FastFingerprint())
}
m.flush(out, state)
return
}

isFirstLine := m.cfg.regex.MatchString(e.Line)
if isFirstLine {
if Debug {
level.Debug(m.logger).Log("msg", "flush multiline block because new start line", "block", state.buffer.String(), "stream", e.Labels.FastFingerprint())
}
m.flush(out, state)

// The start line entry is used to set timestamp and labels in the flush method.
// The timestamps for following lines are ignored for now.
state.startLineEntry = e
}

// Append block line
if state.buffer.Len() > 0 {
state.buffer.WriteRune('\n')
}
state.buffer.WriteString(e.Line)
state.currentLines++

if state.currentLines == *m.cfg.MaxLines {
m.flush(out, state)
}
}
}
}

func (m *multilineStage) flush(out chan Entry, s *multilineState) {
if s.buffer.Len() == 0 {
if Debug {
level.Debug(m.logger).Log("msg", "nothing to flush", "buffer_len", s.buffer.Len())
}
return
}

collapsed := Entry{
Extracted: s.startLineEntry.Extracted,
Entry: api.Entry{
Labels: s.startLineEntry.Entry.Labels,
Entry: logproto.Entry{
Timestamp: s.startLineEntry.Entry.Entry.Timestamp,
Line: s.buffer.String(),
},
},
}
s.buffer.Reset()
s.currentLines = 0

out <- collapsed
}

// Name implements Stage
func (m *multilineStage) Name() string {
return StageTypeMultiline
}
Loading

0 comments on commit afc1e53

Please sign in to comment.