diff --git a/pkg/logentry/stages/extensions.go b/pkg/logentry/stages/extensions.go index 9e71e96280980..0f5b248c8f857 100644 --- a/pkg/logentry/stages/extensions.go +++ b/pkg/logentry/stages/extensions.go @@ -8,7 +8,7 @@ import ( const RFC3339Nano = "RFC3339Nano" // NewDocker creates a Docker json log format specific pipeline stage. -func NewDocker(logger log.Logger, registerer prometheus.Registerer) (Stage, error) { +func NewDocker(logger log.Logger, registerer prometheus.Registerer) (*Pipeline, error) { stages := PipelineStages{ PipelineStage{ StageTypeJSON: JSONConfig{ @@ -36,7 +36,7 @@ func NewDocker(logger log.Logger, registerer prometheus.Registerer) (Stage, erro } // NewCRI creates a CRI format specific pipeline stage -func NewCRI(logger log.Logger, registerer prometheus.Registerer) (Stage, error) { +func NewCRI(logger log.Logger, registerer prometheus.Registerer) (*Pipeline, error) { stages := PipelineStages{ PipelineStage{ StageTypeRegex: RegexConfig{ diff --git a/pkg/logentry/stages/extensions_test.go b/pkg/logentry/stages/extensions_test.go index f544614cf9010..19cc3d6e23b5f 100644 --- a/pkg/logentry/stages/extensions_test.go +++ b/pkg/logentry/stages/extensions_test.go @@ -71,11 +71,12 @@ func TestNewDocker(t *testing.T) { } lbs := toLabelSet(tt.labels) extr := map[string]interface{}{} - p.Process(lbs, extr, &tt.t, &tt.entry) + result := &resultChain{} + p.Process(lbs, extr, tt.t, tt.entry, result) - assertLabels(t, tt.expectedLabels, lbs) - assert.Equal(t, tt.expectedEntry, tt.entry, "did not receive expected log entry") - if tt.t.Unix() != tt.expectedT.Unix() { + assertLabels(t, tt.expectedLabels, result.labels) + assert.Equal(t, tt.expectedEntry, result.entry, "did not receive expected log entry") + if result.time.Unix() != tt.expectedT.Unix() { t.Fatalf("mismatch ts want: %s got:%s", tt.expectedT, tt.t) } }) @@ -147,11 +148,12 @@ func TestNewCri(t *testing.T) { } lbs := toLabelSet(tt.labels) extr := map[string]interface{}{} - p.Process(lbs, extr, &tt.t, &tt.entry) + result := &resultChain{} + p.Process(lbs, extr, tt.t, tt.entry, result) - assertLabels(t, tt.expectedLabels, lbs) - assert.Equal(t, tt.expectedEntry, tt.entry, "did not receive expected log entry") - if tt.t.Unix() != tt.expectedT.Unix() { + assertLabels(t, tt.expectedLabels, result.labels) + assert.Equal(t, tt.expectedEntry, result.entry, "did not receive expected log entry") + if result.time.Unix() != tt.expectedT.Unix() { t.Fatalf("mismatch ts want: %s got:%s", tt.expectedT, tt.t) } }) diff --git a/pkg/logentry/stages/json.go b/pkg/logentry/stages/json.go index 888e80f2de185..bd4f1be33bc46 100644 --- a/pkg/logentry/stages/json.go +++ b/pkg/logentry/stages/json.go @@ -92,7 +92,9 @@ func parseJSONConfig(config interface{}) (*JSONConfig, error) { } // Process implements Stage -func (j *jsonStage) Process(labels model.LabelSet, extracted map[string]interface{}, t *time.Time, entry *string) { +func (j *jsonStage) Process(labels model.LabelSet, extracted map[string]interface{}, time time.Time, entry string, chain StageChain) { + defer chain.NextStage(labels, extracted, time, entry) // we can use defer, as we don't update time or entry + // If a source key is provided, the json stage should process it // from the exctracted map, otherwise should fallback to the entry input := entry @@ -113,19 +115,12 @@ func (j *jsonStage) Process(labels model.LabelSet, extracted map[string]interfac return } - input = &value - } - - if input == nil { - if Debug { - level.Debug(j.logger).Log("msg", "cannot parse a nil entry") - } - return + input = value } var data map[string]interface{} - if err := json.Unmarshal([]byte(*input), &data); err != nil { + if err := json.Unmarshal([]byte(input), &data); err != nil { if Debug { level.Debug(j.logger).Log("msg", "failed to unmarshal log line", "err", err) } diff --git a/pkg/logentry/stages/json_test.go b/pkg/logentry/stages/json_test.go index 64a41b1ffd016..0d4a44ff129b8 100644 --- a/pkg/logentry/stages/json_test.go +++ b/pkg/logentry/stages/json_test.go @@ -91,8 +91,9 @@ func TestPipeline_JSON(t *testing.T) { ts := time.Now() entry := testData.entry extracted := map[string]interface{}{} - pl.Process(lbls, extracted, &ts, &entry) - assert.Equal(t, testData.expectedExtract, extracted) + result := &resultChain{} + pl.Process(lbls, extracted, ts, entry, result) + assert.Equal(t, testData.expectedExtract, result.extracted) }) } } @@ -351,10 +352,10 @@ func TestJSONParser_Parse(t *testing.T) { } lbs := model.LabelSet{} extr := tt.extracted - ts := time.Now() - p.Process(lbs, extr, &ts, &tt.entry) + result := &resultChain{} + p.Process(lbs, extr, time.Now(), tt.entry, result) - assert.Equal(t, tt.expectedExtract, extr) + assert.Equal(t, tt.expectedExtract, result.extracted) }) } } diff --git a/pkg/logentry/stages/labels.go b/pkg/logentry/stages/labels.go index f7dd828fbc55a..b97cc3a3931ca 100644 --- a/pkg/logentry/stages/labels.go +++ b/pkg/logentry/stages/labels.go @@ -62,7 +62,7 @@ type labelStage struct { } // Process implements Stage -func (l *labelStage) Process(labels model.LabelSet, extracted map[string]interface{}, t *time.Time, entry *string) { +func (l *labelStage) Process(labels model.LabelSet, extracted map[string]interface{}, time time.Time, entry string, chain StageChain) { for lName, lSrc := range l.cfgs { if lValue, ok := extracted[*lSrc]; ok { s, err := getString(lValue) @@ -82,6 +82,8 @@ func (l *labelStage) Process(labels model.LabelSet, extracted map[string]interfa labels[model.LabelName(lName)] = labelValue } } + + chain.NextStage(labels, extracted, time, entry) } // Name implements Stage diff --git a/pkg/logentry/stages/labels_test.go b/pkg/logentry/stages/labels_test.go index 88781331c9354..a6fd4c1d2723c 100644 --- a/pkg/logentry/stages/labels_test.go +++ b/pkg/logentry/stages/labels_test.go @@ -42,11 +42,10 @@ func TestLabelsPipeline_Labels(t *testing.T) { "level": "WARN", "app": "loki", } - ts := time.Now() - entry := testLabelsLogLine extracted := map[string]interface{}{} - pl.Process(lbls, extracted, &ts, &entry) - assert.Equal(t, expectedLbls, lbls) + result := &resultChain{} + pl.Process(lbls, extracted, time.Now(), testLabelsLogLine, result) + assert.Equal(t, expectedLbls, result.labels) } var ( @@ -157,8 +156,11 @@ func TestLabelStage_Process(t *testing.T) { if err != nil { t.Fatal(err) } - st.Process(test.inputLabels, test.extractedData, nil, nil) - assert.Equal(t, test.expectedLabels, test.inputLabels) + + result := &resultChain{} + + st.Process(test.inputLabels, test.extractedData, time.Now(), "", result) + assert.Equal(t, test.expectedLabels, result.labels) }) } } diff --git a/pkg/logentry/stages/match.go b/pkg/logentry/stages/match.go index 0c0838afd0b76..aa3890167af0b 100644 --- a/pkg/logentry/stages/match.go +++ b/pkg/logentry/stages/match.go @@ -68,7 +68,7 @@ func validateMatcherConfig(cfg *MatcherConfig) (logql.LogSelectorExpr, error) { } // newMatcherStage creates a new matcherStage from config -func newMatcherStage(logger log.Logger, jobName *string, config interface{}, registerer prometheus.Registerer) (Stage, error) { +func newMatcherStage(logger log.Logger, jobName *string, config interface{}, registerer prometheus.Registerer) (*matcherStage, error) { cfg := &MatcherConfig{} err := mapstructure.Decode(config, cfg) if err != nil { @@ -111,24 +111,27 @@ func newMatcherStage(logger log.Logger, jobName *string, config interface{}, reg type matcherStage struct { matchers []*labels.Matcher filter logql.Filter - pipeline Stage + pipeline *Pipeline action string } // Process implements Stage -func (m *matcherStage) Process(labels model.LabelSet, extracted map[string]interface{}, t *time.Time, entry *string) { +func (m *matcherStage) Process(labels model.LabelSet, extracted map[string]interface{}, time time.Time, entry string, chain StageChain) { for _, filter := range m.matchers { if !filter.Matches(string(labels[model.LabelName(filter.Name)])) { + chain.NextStage(labels, extracted, time, entry) return } } - if m.filter == nil || m.filter([]byte(*entry)) { + if m.filter == nil || m.filter([]byte(entry)) { switch m.action { case MatchActionDrop: // Adds the drop label to not be sent by the api.EntryHandler labels[dropLabel] = "" + chain.NextStage(labels, extracted, time, entry) case MatchActionKeep: - m.pipeline.Process(labels, extracted, t, entry) + // run all stages in the pipeline, and then continue with our original chain afterwards + m.pipeline.Process(labels, extracted, time, entry, matcherChain{chain: chain}) } } } @@ -137,3 +140,11 @@ func (m *matcherStage) Process(labels model.LabelSet, extracted map[string]inter func (m *matcherStage) Name() string { return StageTypeMatch } + +type matcherChain struct { + chain StageChain +} + +func (mc matcherChain) NextStage(labels model.LabelSet, extracted map[string]interface{}, time time.Time, entry string) { + mc.chain.NextStage(labels, extracted, time, entry) +} diff --git a/pkg/logentry/stages/match_test.go b/pkg/logentry/stages/match_test.go index 843e2b8dd24a3..188bd41867a9f 100644 --- a/pkg/logentry/stages/match_test.go +++ b/pkg/logentry/stages/match_test.go @@ -65,18 +65,18 @@ func TestMatchPipeline(t *testing.T) { t.Fatal(err) } lbls := model.LabelSet{} - ts := time.Now() + // Process the first log line which should extract the output from the `message` field - entry := testMatchLogLineApp1 extracted := map[string]interface{}{} - pl.Process(lbls, extracted, &ts, &entry) - assert.Equal(t, "app1 log line", entry) + result := &resultChain{} + pl.Process(lbls, extracted, time.Now(), testMatchLogLineApp1, result) + assert.Equal(t, "app1 log line", result.entry) // Process the second log line which should extract the output from the `msg` field - entry = testMatchLogLineApp2 extracted = map[string]interface{}{} - pl.Process(lbls, extracted, &ts, &entry) - assert.Equal(t, "app2 log line", entry) + result = &resultChain{} + pl.Process(lbls, extracted, time.Now(), testMatchLogLineApp2, result) + assert.Equal(t, "app2 log line", result.entry) got, err := registry.Gather() if err != nil { @@ -170,21 +170,21 @@ func TestMatcher(t *testing.T) { return } if s != nil { - ts, entry := time.Now(), "foo" extracted := map[string]interface{}{ "test_label": "unimportant value", } labels := toLabelSet(tt.labels) - s.Process(labels, extracted, &ts, &entry) + result := &resultChain{} + s.Process(labels, extracted, time.Now(), "foo", result) // test_label should only be in the label set if the stage ran - if _, ok := labels["test_label"]; ok { + if _, ok := result.labels["test_label"]; ok { if !tt.shouldRun { t.Error("stage ran but should have not") } } if tt.shouldDrop { - if _, ok := labels[dropLabel]; !ok { + if _, ok := result.labels[dropLabel]; !ok { t.Error("stage should have been dropped") } } diff --git a/pkg/logentry/stages/metrics.go b/pkg/logentry/stages/metrics.go index d831083c07264..57120709834b5 100644 --- a/pkg/logentry/stages/metrics.go +++ b/pkg/logentry/stages/metrics.go @@ -115,7 +115,7 @@ type metricStage struct { } // Process implements Stage -func (m *metricStage) Process(labels model.LabelSet, extracted map[string]interface{}, t *time.Time, entry *string) { +func (m *metricStage) Process(labels model.LabelSet, extracted map[string]interface{}, time time.Time, entry string, chain StageChain) { for name, collector := range m.metrics { if v, ok := extracted[*m.cfg[name].Source]; ok { switch vec := collector.(type) { @@ -128,6 +128,8 @@ func (m *metricStage) Process(labels model.LabelSet, extracted map[string]interf } } } + + chain.NextStage(labels, extracted, time, entry) } // Name implements Stage diff --git a/pkg/logentry/stages/metrics_test.go b/pkg/logentry/stages/metrics_test.go index 4c4ad4e8adde7..457e879bf94d9 100644 --- a/pkg/logentry/stages/metrics_test.go +++ b/pkg/logentry/stages/metrics_test.go @@ -84,12 +84,9 @@ func TestMetricsPipeline(t *testing.T) { t.Fatal(err) } lbls := model.LabelSet{} - ts := time.Now() extracted := map[string]interface{}{} - entry := testMetricLogLine1 - pl.Process(lbls, extracted, &ts, &entry) - entry = testMetricLogLine2 - pl.Process(lbls, extracted, &ts, &entry) + pl.Process(lbls, extracted, time.Now(), testMetricLogLine1, &resultChain{}) + pl.Process(lbls, extracted, time.Now(), testMetricLogLine2, &resultChain{}) if err := testutil.GatherAndCompare(registry, strings.NewReader(expectedMetrics)); err != nil { @@ -244,14 +241,14 @@ func TestMetricStage_Process(t *testing.T) { if err != nil { t.Fatalf("failed to create stage with metrics: %v", err) } - var ts = time.Now() - var entry = logFixture + extr := map[string]interface{}{} - jsonStage.Process(labelFoo, extr, &ts, &entry) - regexStage.Process(labelFoo, extr, &ts, ®exLogFixture) - metricStage.Process(labelFoo, extr, &ts, &entry) + + jsonStage.Process(labelFoo, extr, time.Now(), logFixture, &resultChain{}) + regexStage.Process(labelFoo, extr, time.Now(), regexLogFixture, &resultChain{}) + metricStage.Process(labelFoo, extr, time.Now(), logFixture, &resultChain{}) // Process the same extracted values again with different labels so we can verify proper metric/label assignments - metricStage.Process(labelFu, extr, &ts, &entry) + metricStage.Process(labelFu, extr, time.Now(), logFixture, &resultChain{}) names := metricNames(metricsConfig) if err := testutil.GatherAndCompare(registry, diff --git a/pkg/logentry/stages/multiline.go b/pkg/logentry/stages/multiline.go new file mode 100644 index 0000000000000..f01dab30a33e7 --- /dev/null +++ b/pkg/logentry/stages/multiline.go @@ -0,0 +1,167 @@ +package stages + +import ( + "fmt" + "regexp" + "strings" + "sync" + "time" + + "github.com/go-kit/kit/log" + "github.com/mitchellh/mapstructure" + "github.com/pkg/errors" + "github.com/prometheus/common/model" +) + +const ( + ErrEmptyMultilineStageConfig = "multiline stage config cannot be empty" + ErrMultilineNameRequired = "multiline stage pipeline name can be omitted but cannot be an empty string" + ErrFirstLineRequired = "firstLine regular expression required" +) + +// MultilineConfig contains the configuration for a multilineStage +type MultilineConfig struct { + PipelineName *string `mapstructure:"pipeline_name"` + FirstLineRegexp string `mapstructure:"firstline"` + MaxWait string `mapstructure:"max_wait_time"` +} + +func validateMultilineConfig(cfg *MultilineConfig) (*regexp.Regexp, error) { + if cfg == nil { + return nil, errors.New(ErrEmptyMultilineStageConfig) + } + if cfg.PipelineName != nil && *cfg.PipelineName == "" { + return nil, errors.New(ErrMultilineNameRequired) + } + if cfg.FirstLineRegexp == "" { + return nil, errors.New(ErrFirstLineRequired) + } + + re, err := regexp.Compile(cfg.FirstLineRegexp) + if err != nil { + return nil, errors.Wrap(err, "failed to parse firstline regex") + } + + return re, nil +} + +// newMatcherStage creates a new matcherStage from config +func newMultilineStage(logger log.Logger, config interface{}) (*multilineStage, error) { + cfg := &MultilineConfig{} + err := mapstructure.Decode(config, cfg) + if err != nil { + return nil, err + } + + re, err := validateMultilineConfig(cfg) + if err != nil { + return nil, err + } + + maxWait := time.Duration(0) + if cfg.MaxWait != "" { + maxWait, err = time.ParseDuration(cfg.MaxWait) + if err != nil { + return nil, errors.Wrap(err, "multiline: invalid max_wait_time duration") + } + } + + return &multilineStage{ + firstLine: re, + maxWait: maxWait, + multilines: map[string]*multilineEntry{}, + }, nil +} + +type multilineStage struct { + firstLine *regexp.Regexp + maxWait time.Duration + + mapMu sync.Mutex + multilines map[string]*multilineEntry +} + +type multilineEntry struct { + lines []string + labels model.LabelSet + lastWrite time.Time +} + +func (m *multilineStage) Name() string { + return "multiline" +} + +// Process implements Stage +func (m *multilineStage) Process(labels model.LabelSet, extracted map[string]interface{}, timestamp time.Time, entry string, chain StageChain) { + isFirstLine := m.firstLine.MatchString(entry) + key := labels.String() + + m.mapMu.Lock() + + ent := m.multilines[key] + hasPrevious := ent != nil && len(ent.lines) > 0 + + switch { + case isFirstLine && hasPrevious: + delete(m.multilines, key) + + // put new multiline entry to the map before unlocking + m.multilines[key] = &multilineEntry{ + lines: []string{entry}, + labels: labels.Clone(), + lastWrite: time.Now(), + } + + m.mapMu.Unlock() + + // extracted + timestamp are just passed forward + chain.NextStage(labels, extracted, timestamp, strings.Join(ent.lines, "\n")) + + case isFirstLine && !hasPrevious: + m.multilines[key] = &multilineEntry{ + lines: []string{entry}, // start new multiline entry, + labels: labels.Clone(), + lastWrite: time.Now(), + } + + m.mapMu.Unlock() + + case !isFirstLine && hasPrevious: + // add it to existing line, and wait for more lines. + ent.lines = append(ent.lines, entry) + ent.lastWrite = time.Now() + + m.mapMu.Unlock() + + case !isFirstLine && !hasPrevious: + m.mapMu.Unlock() + + // no started multiline? just pass it forward then. + chain.NextStage(labels, extracted, timestamp, entry) + } +} + +func (m *multilineStage) Flush(chain RepeatableStageChain) { + if m.maxWait <= 0 { + return + } + + now := time.Now() + + m.mapMu.Lock() + for key, e := range m.multilines { + if now.Sub(e.lastWrite) > m.maxWait { + delete(m.multilines, key) // remove from map before unlocking + m.mapMu.Unlock() + + entry := strings.Join(e.lines, "\n") + + fmt.Println("flushing", key) + // only use chain clones, so that original chain can be reused + chain.Clone().NextStage(e.labels, map[string]interface{}{}, e.lastWrite, entry) + + m.mapMu.Lock() + } + } + m.mapMu.Unlock() +} diff --git a/pkg/logentry/stages/output.go b/pkg/logentry/stages/output.go index d7bb9259106f9..5a67ca46b55d8 100644 --- a/pkg/logentry/stages/output.go +++ b/pkg/logentry/stages/output.go @@ -57,8 +57,9 @@ type outputStage struct { } // Process implements Stage -func (o *outputStage) Process(labels model.LabelSet, extracted map[string]interface{}, t *time.Time, entry *string) { +func (o *outputStage) Process(labels model.LabelSet, extracted map[string]interface{}, time time.Time, entry string, chain StageChain) { if o.cfgs == nil { + chain.NextStage(labels, extracted, time, entry) return } if v, ok := extracted[o.cfgs.Source]; ok { @@ -69,12 +70,14 @@ func (o *outputStage) Process(labels model.LabelSet, extracted map[string]interf } return } - *entry = s + entry = s } else { if Debug { level.Debug(o.logger).Log("msg", "extracted data did not contain output source") } } + + chain.NextStage(labels, extracted, time, entry) } // Name implements Stage diff --git a/pkg/logentry/stages/output_test.go b/pkg/logentry/stages/output_test.go index 8429dc7e18d71..3157105b19b43 100644 --- a/pkg/logentry/stages/output_test.go +++ b/pkg/logentry/stages/output_test.go @@ -37,11 +37,10 @@ func TestPipeline_Output(t *testing.T) { t.Fatal(err) } lbls := model.LabelSet{} - ts := time.Now() - entry := testOutputLogLine extracted := map[string]interface{}{} - pl.Process(lbls, extracted, &ts, &entry) - assert.Equal(t, "this is a log line", entry) + result := &resultChain{} + pl.Process(lbls, extracted, time.Now(), testOutputLogLine, result) + assert.Equal(t, "this is a log line", result.entry) } func TestOutputValidation(t *testing.T) { @@ -103,9 +102,9 @@ func TestOutputStage_Process(t *testing.T) { t.Fatal(err) } lbls := model.LabelSet{} - entry := "replaceme" - st.Process(lbls, test.extracted, nil, &entry) - assert.Equal(t, test.expectedOutput, entry) + result := &resultChain{} + st.Process(lbls, test.extracted, time.Now(), "replaceme", result) + assert.Equal(t, test.expectedOutput, result.entry) }) } } diff --git a/pkg/logentry/stages/pipeline.go b/pkg/logentry/stages/pipeline.go index 2974850bc5cee..ead18ee50c912 100644 --- a/pkg/logentry/stages/pipeline.go +++ b/pkg/logentry/stages/pipeline.go @@ -77,28 +77,34 @@ func NewPipeline(logger log.Logger, stgs PipelineStages, jobName *string, regist } // Process implements Stage allowing a pipeline stage to also be an entire pipeline -func (p *Pipeline) Process(labels model.LabelSet, extracted map[string]interface{}, ts *time.Time, entry *string) { - start := time.Now() - +// All pipeline stages are processed first before running chain.NextStage. +func (p *Pipeline) Process(labels model.LabelSet, extracted map[string]interface{}, ts time.Time, entry string, chain StageChain) { // Initialize the extracted map with the initial labels (ie. "filename"), // so that stages can operate on initial labels too for labelName, labelValue := range labels { extracted[string(labelName)] = string(labelValue) } - for i, stage := range p.stages { - if Debug { - level.Debug(p.logger).Log("msg", "processing pipeline", "stage", i, "name", stage.Name(), "labels", labels, "time", ts, "entry", entry) - } - stage.Process(labels, extracted, ts, entry) + pc := &pipelineChain{ + pipeline: p, + nextStage: 0, + chain: chain, + startTime: time.Now(), } - dur := time.Since(start).Seconds() - if Debug { - level.Debug(p.logger).Log("msg", "finished processing log line", "labels", labels, "time", ts, "entry", entry, "duration_s", dur) - } - if p.jobName != nil { - p.plDuration.WithLabelValues(*p.jobName).Observe(dur) + + pc.NextStage(labels, extracted, ts, entry) +} + +// Flushes are flushable stages in this pipeline +func (p *Pipeline) Flush(chain RepeatableStageChain) { + pc := &pipelineChain{ + pipeline: p, + nextStage: 0, + chain: chain, + startTime: time.Time{}, } + + pc.Flush() } // Name implements Stage @@ -108,14 +114,22 @@ func (p *Pipeline) Name() string { // Wrap implements EntryMiddleware func (p *Pipeline) Wrap(next api.EntryHandler) api.EntryHandler { + // start flushing every 100ms + go func() { + for range time.Tick(100 * time.Millisecond) { + fhc := handlerChain{next: next} + p.Flush(fhc) + if fhc.error != nil { + level.Error(p.logger).Log("msg", "failed to flush", "err", fhc.error) + } + } + }() + return api.EntryHandlerFunc(func(labels model.LabelSet, timestamp time.Time, line string) error { extracted := map[string]interface{}{} - p.Process(labels, extracted, ×tamp, &line) - // if the labels set contains the __drop__ label we don't send this entry to the next EntryHandler - if _, ok := labels[dropLabel]; ok { - return nil - } - return next.Handle(labels, timestamp, line) + hc := handlerChain{next: next} + p.Process(labels, extracted, timestamp, line, hc) + return hc.error }) } @@ -128,3 +142,82 @@ func (p *Pipeline) AddStage(stage Stage) { func (p *Pipeline) Size() int { return len(p.stages) } + +type pipelineChain struct { + pipeline *Pipeline + nextStage int + chain StageChain + startTime time.Time +} + +func (pc *pipelineChain) NextStage(labels model.LabelSet, extracted map[string]interface{}, ts time.Time, entry string) { + if pc.nextStage < len(pc.pipeline.stages) { + stage := pc.pipeline.stages[pc.nextStage] + if Debug { + level.Debug(pc.pipeline.logger).Log("msg", "processing pipeline", "stage", pc.nextStage, "name", stage.Name(), "labels", labels, "time", ts, "entry", entry) + } + + pc.nextStage++ + + stage.Process(labels, extracted, ts, entry, pc) + } else { + if !pc.startTime.IsZero() && (Debug || pc.pipeline.jobName != nil) { + dur := time.Since(pc.startTime).Seconds() + if Debug { + level.Debug(pc.pipeline.logger).Log("msg", "pipeline finished", "labels", labels, "time", ts, "entry", entry, "duration_s", dur) + } + + if pc.pipeline.jobName != nil { + pc.pipeline.plDuration.WithLabelValues(*pc.pipeline.jobName).Observe(dur) + } + } + + pc.chain.NextStage(labels, extracted, ts, entry) + } +} + +// Flush all remaining flushable stages. +func (pc *pipelineChain) Flush() { + for pc.nextStage < len(pc.pipeline.stages) { + s := pc.pipeline.stages[pc.nextStage] + + // increase nextStage before making a clone, so that NextStage actually calls NEXT stage + pc.nextStage++ + + if f, ok := s.(FlushableStage); ok { + // send a copy of this chain, so that we can continue with flushing later stages + f.Flush(pc.Clone()) + } + } +} + +// Clone returns new copy of this chain, so that it can be called again. +func (pc *pipelineChain) Clone() RepeatableStageChain { + rsc := pc.chain.(RepeatableStageChain) + + return &pipelineChain{ + pipeline: pc.pipeline, + nextStage: pc.nextStage, + chain: rsc.Clone(), + startTime: time.Time{}, + } +} + +type handlerChain struct { + next api.EntryHandler + error error +} + +func (hc handlerChain) Clone() RepeatableStageChain { + // no need to clone + return hc +} + +func (hc handlerChain) NextStage(labels model.LabelSet, extracted map[string]interface{}, ts time.Time, entry string) { + // if the labels set contains the __drop__ label we don't send this entry to the next EntryHandler + if _, ok := labels[dropLabel]; ok { + hc.error = nil + } else { + hc.error = hc.next.Handle(labels, ts, entry) + } +} diff --git a/pkg/logentry/stages/pipeline_test.go b/pkg/logentry/stages/pipeline_test.go index 6a01fe5fadaed..63a94b5053793 100644 --- a/pkg/logentry/stages/pipeline_test.go +++ b/pkg/logentry/stages/pipeline_test.go @@ -179,11 +179,12 @@ func TestPipeline_Process(t *testing.T) { require.NoError(t, err) extracted := map[string]interface{}{} - p.Process(tt.initialLabels, extracted, &tt.t, &tt.entry) + result := &resultChain{} + p.Process(tt.initialLabels, extracted, tt.t, tt.entry, result) - assert.Equal(t, tt.expectedLabels, tt.initialLabels, "did not get expected labels") - assert.Equal(t, tt.expectedEntry, tt.entry, "did not receive expected log entry") - if tt.t.Unix() != tt.expectedT.Unix() { + assert.Equal(t, tt.expectedLabels, result.labels, "did not get expected labels") + assert.Equal(t, tt.expectedEntry, result.entry, "did not receive expected log entry") + if result.time.Unix() != tt.expectedT.Unix() { t.Fatalf("mismatch ts want: %s got:%s", tt.expectedT, tt.t) } }) @@ -224,10 +225,11 @@ func BenchmarkPipeline(b *testing.B) { } lb := model.LabelSet{} ts := time.Now() + r := &resultChain{} for i := 0; i < b.N; i++ { - entry := bm.entry extracted := map[string]interface{}{} - pl.Process(lb, extracted, &ts, &entry) + + pl.Process(lb, extracted, ts, bm.entry, r) } }) } @@ -243,7 +245,6 @@ func (s *stubHandler) Handle(labels model.LabelSet, time time.Time, entry string } func TestPipeline_Wrap(t *testing.T) { - now := time.Now() var config map[string]interface{} err := yaml.Unmarshal([]byte(testMultiStageYaml), &config) if err != nil { @@ -282,10 +283,10 @@ func TestPipeline_Wrap(t *testing.T) { t.Run(tName, func(t *testing.T) { t.Parallel() extracted := map[string]interface{}{} - p.Process(tt.labels, extracted, &now, &rawTestLine) + p.Process(tt.labels, extracted, time.Now(), rawTestLine, &resultChain{}) stub := &stubHandler{} handler := p.Wrap(stub) - if err := handler.Handle(tt.labels, now, rawTestLine); err != nil { + if err := handler.Handle(tt.labels, time.Now(), rawTestLine); err != nil { t.Fatalf("failed to handle entry: %v", err) } assert.Equal(t, stub.bool, tt.shouldSend) diff --git a/pkg/logentry/stages/regex.go b/pkg/logentry/stages/regex.go index 05aa5f4406481..59e4e628bbd73 100644 --- a/pkg/logentry/stages/regex.go +++ b/pkg/logentry/stages/regex.go @@ -83,7 +83,9 @@ func parseRegexConfig(config interface{}) (*RegexConfig, error) { } // Process implements Stage -func (r *regexStage) Process(labels model.LabelSet, extracted map[string]interface{}, t *time.Time, entry *string) { +func (r *regexStage) Process(labels model.LabelSet, extracted map[string]interface{}, time time.Time, entry string, chain StageChain) { + defer chain.NextStage(labels, extracted, time, entry) // we can use defer, because time or entry are not modified + // If a source key is provided, the regex stage should process it // from the extracted map, otherwise should fallback to the entry input := entry @@ -104,20 +106,13 @@ func (r *regexStage) Process(labels model.LabelSet, extracted map[string]interfa return } - input = &value - } - - if input == nil { - if Debug { - level.Debug(r.logger).Log("msg", "cannot parse a nil entry") - } - return + input = value } - match := r.expression.FindStringSubmatch(*input) + match := r.expression.FindStringSubmatch(input) if match == nil { if Debug { - level.Debug(r.logger).Log("msg", "regex did not match", "input", *input, "regex", r.expression) + level.Debug(r.logger).Log("msg", "regex did not match", "input", input, "regex", r.expression) } return } diff --git a/pkg/logentry/stages/regex_test.go b/pkg/logentry/stages/regex_test.go index 49b65fdb1a406..2ae0d8baf56c6 100644 --- a/pkg/logentry/stages/regex_test.go +++ b/pkg/logentry/stages/regex_test.go @@ -87,11 +87,10 @@ func TestPipeline_Regex(t *testing.T) { } lbls := model.LabelSet{} - ts := time.Now() - entry := testData.entry extracted := map[string]interface{}{} - pl.Process(lbls, extracted, &ts, &entry) - assert.Equal(t, testData.expectedExtract, extracted) + result := &resultChain{} + pl.Process(lbls, extracted, time.Now(), testData.entry, result) + assert.Equal(t, testData.expectedExtract, result.extracted) }) } } @@ -294,10 +293,9 @@ func TestRegexParser_Parse(t *testing.T) { t.Fatalf("failed to create regex parser: %s", err) } lbs := model.LabelSet{} - extr := tt.extracted - ts := time.Now() - p.Process(lbs, extr, &ts, &tt.entry) - assert.Equal(t, tt.expectedExtract, extr) + result := &resultChain{} + p.Process(lbs, tt.extracted, time.Now(), tt.entry, result) + assert.Equal(t, tt.expectedExtract, result.extracted) }) } @@ -338,9 +336,9 @@ func BenchmarkRegexStage(b *testing.B) { labels := model.LabelSet{} ts := time.Now() extr := map[string]interface{}{} + r := &resultChain{} for i := 0; i < b.N; i++ { - entry := bm.entry - stage.Process(labels, extr, &ts, &entry) + stage.Process(labels, extr, ts, bm.entry, r) } }) } diff --git a/pkg/logentry/stages/stage.go b/pkg/logentry/stages/stage.go index 46ac43c55c350..6de205bae779c 100644 --- a/pkg/logentry/stages/stage.go +++ b/pkg/logentry/stages/stage.go @@ -22,26 +22,44 @@ const ( StageTypeTemplate = "template" StageTypePipeline = "pipeline" StageTypeTenant = "tenant" + StageTypeMultiline = "multiline" ) -// Stage takes an existing set of labels, timestamp and log entry and returns either a possibly mutated -// timestamp and log entry +// StageChain is supplied to the Stage, and gives stage an option to continue with the next stage (if it so decides) +type StageChain interface { + NextStage(labels model.LabelSet, extracted map[string]interface{}, time time.Time, entry string) +} + +// Stage takes an existing set of labels, timestamp and log entry and a chain. It can modify these and pass +// to the next stage in the chain by calling chain.NextStage method. +// If stage doesn't call next stage, no further processing is done. type Stage interface { - Process(labels model.LabelSet, extracted map[string]interface{}, time *time.Time, entry *string) + Process(labels model.LabelSet, extracted map[string]interface{}, time time.Time, entry string, chain StageChain) Name() string } +// FlushableStage is a stage that can be flushed. +type FlushableStage interface { + Stage + Flush(chain RepeatableStageChain) +} + +// RepeatableStageChain is a chain, that can be cloned and run again later. +type RepeatableStageChain interface { + StageChain + Clone() RepeatableStageChain +} + // StageFunc is modelled on http.HandlerFunc. -type StageFunc func(labels model.LabelSet, extracted map[string]interface{}, time *time.Time, entry *string) +type StageFunc func(labels model.LabelSet, extracted map[string]interface{}, time time.Time, entry string, chain StageChain) // Process implements EntryHandler. -func (s StageFunc) Process(labels model.LabelSet, extracted map[string]interface{}, time *time.Time, entry *string) { - s(labels, extracted, time, entry) +func (s StageFunc) Process(labels model.LabelSet, extracted map[string]interface{}, time time.Time, entry string, chain StageChain) { + s(labels, extracted, time, entry, chain) } // New creates a new stage for the given type and configuration. -func New(logger log.Logger, jobName *string, stageType string, - cfg interface{}, registerer prometheus.Registerer) (Stage, error) { +func New(logger log.Logger, jobName *string, stageType string, cfg interface{}, registerer prometheus.Registerer) (Stage, error) { var s Stage var err error switch stageType { @@ -100,6 +118,11 @@ func New(logger log.Logger, jobName *string, stageType string, if err != nil { return nil, err } + case StageTypeMultiline: + s, err = newMultilineStage(logger, cfg) + if err != nil { + return nil, err + } default: return nil, errors.Errorf("Unknown stage type: %s", stageType) } diff --git a/pkg/logentry/stages/template.go b/pkg/logentry/stages/template.go index 315d49f2ee543..31dbb7e7e12b5 100644 --- a/pkg/logentry/stages/template.go +++ b/pkg/logentry/stages/template.go @@ -83,7 +83,9 @@ type templateStage struct { } // Process implements Stage -func (o *templateStage) Process(labels model.LabelSet, extracted map[string]interface{}, t *time.Time, entry *string) { +func (o *templateStage) Process(labels model.LabelSet, extracted map[string]interface{}, time time.Time, entry string, chain StageChain) { + defer chain.NextStage(labels, extracted, time, entry) // we can use defer, because this stage doesn't modify time or entry + if o.cfgs == nil { return } diff --git a/pkg/logentry/stages/template_test.go b/pkg/logentry/stages/template_test.go index f893a1d3e528c..328c0b9ff9af2 100644 --- a/pkg/logentry/stages/template_test.go +++ b/pkg/logentry/stages/template_test.go @@ -48,17 +48,15 @@ func TestPipeline_Template(t *testing.T) { if err != nil { t.Fatal(err) } - lbls := model.LabelSet{} expectedLbls := model.LabelSet{ "app": "LOKI doki", "level": "OK", "type": "TEST", } - ts := time.Now() - entry := testTemplateLogLine - extracted := map[string]interface{}{} - pl.Process(lbls, extracted, &ts, &entry) - assert.Equal(t, expectedLbls, lbls) + + result := &resultChain{} + pl.Process(model.LabelSet{}, map[string]interface{}{}, time.Now(), testTemplateLogLine, result) + assert.Equal(t, expectedLbls, result.labels) } func TestTemplateValidation(t *testing.T) { @@ -197,9 +195,9 @@ func TestTemplateStage_Process(t *testing.T) { t.Fatal(err) } lbls := model.LabelSet{} - entry := "not important for this test" - st.Process(lbls, test.extracted, nil, &entry) - assert.Equal(t, test.expectedExtracted, test.extracted) + result := &resultChain{} + st.Process(lbls, test.extracted, time.Now(), "not important for this test", result) + assert.Equal(t, test.expectedExtracted, result.extracted) }) } } diff --git a/pkg/logentry/stages/tenant.go b/pkg/logentry/stages/tenant.go index 24c05807c4ac8..253effda05723 100644 --- a/pkg/logentry/stages/tenant.go +++ b/pkg/logentry/stages/tenant.go @@ -60,7 +60,7 @@ func newTenantStage(logger log.Logger, configs interface{}) (*tenantStage, error } // Process implements Stage -func (s *tenantStage) Process(labels model.LabelSet, extracted map[string]interface{}, t *time.Time, entry *string) { +func (s *tenantStage) Process(labels model.LabelSet, extracted map[string]interface{}, time time.Time, entry string, chain StageChain) { var tenantID string // Get tenant ID from source or configured value @@ -71,11 +71,11 @@ func (s *tenantStage) Process(labels model.LabelSet, extracted map[string]interf } // Skip an empty tenant ID (ie. failed to get the tenant from the source) - if tenantID == "" { - return + if tenantID != "" { + labels[client.ReservedLabelTenantID] = model.LabelValue(tenantID) } - labels[client.ReservedLabelTenantID] = model.LabelValue(tenantID) + chain.NextStage(labels, extracted, time, entry) } // Name implements Stage diff --git a/pkg/logentry/stages/tenant_test.go b/pkg/logentry/stages/tenant_test.go index 60feaece70df6..680354b09cb52 100644 --- a/pkg/logentry/stages/tenant_test.go +++ b/pkg/logentry/stages/tenant_test.go @@ -135,17 +135,16 @@ func TestTenantStage_Process(t *testing.T) { // Process and dummy line and ensure nothing has changed except // the tenant reserved label - timestamp := time.Unix(1, 1) - entry := "hello world" labels := testData.inputLabels.Clone() extracted := testData.inputExtracted - stage.Process(labels, extracted, ×tamp, &entry) + result := &resultChain{} + stage.Process(labels, extracted, time.Unix(1, 1), "hello world", result) - assert.Equal(t, time.Unix(1, 1), timestamp) - assert.Equal(t, "hello world", entry) + assert.Equal(t, time.Unix(1, 1), result.time) + assert.Equal(t, "hello world", result.entry) - actualTenant, ok := labels[client.ReservedLabelTenantID] + actualTenant, ok := result.labels[client.ReservedLabelTenantID] if testData.expectedTenant == nil { assert.False(t, ok) } else { diff --git a/pkg/logentry/stages/timestamp.go b/pkg/logentry/stages/timestamp.go index b6427e0d79aa7..e15a2ffac0ae9 100644 --- a/pkg/logentry/stages/timestamp.go +++ b/pkg/logentry/stages/timestamp.go @@ -128,28 +128,32 @@ func (ts *timestampStage) Name() string { } // Process implements Stage -func (ts *timestampStage) Process(labels model.LabelSet, extracted map[string]interface{}, t *time.Time, entry *string) { +func (ts *timestampStage) Process(labels model.LabelSet, extracted map[string]interface{}, time time.Time, entry string, chain StageChain) { if ts.cfg == nil { + chain.NextStage(labels, extracted, time, entry) return } parsedTs, err := ts.parseTimestampFromSource(extracted) if err != nil { - ts.processActionOnFailure(labels, t) + time = ts.processActionOnFailure(labels, time) + chain.NextStage(labels, extracted, time, entry) return } // Update the log entry timestamp with the parsed one - *t = *parsedTs + time = parsedTs // The timestamp has been correctly parsed, so we should store it in the map // containing the last known timestamp used by the "fudge" action on failure. if *ts.cfg.ActionOnFailure == TimestampActionOnFailureFudge { - ts.lastKnownTimestamps.Add(labels.String(), *t) + ts.lastKnownTimestamps.Add(labels.String(), time) } + + chain.NextStage(labels, extracted, time, entry) } -func (ts *timestampStage) parseTimestampFromSource(extracted map[string]interface{}) (*time.Time, error) { +func (ts *timestampStage) parseTimestampFromSource(extracted map[string]interface{}) (time.Time, error) { // Ensure the extracted data contains the timestamp source v, ok := extracted[ts.cfg.Source] if !ok { @@ -157,7 +161,7 @@ func (ts *timestampStage) parseTimestampFromSource(extracted map[string]interfac level.Debug(ts.logger).Log("msg", ErrTimestampSourceMissing) } - return nil, errors.New(ErrTimestampSourceMissing) + return time.Time{}, errors.New(ErrTimestampSourceMissing) } // Convert the timestamp source to string (if it's not a string yet) @@ -167,7 +171,7 @@ func (ts *timestampStage) parseTimestampFromSource(extracted map[string]interfac level.Debug(ts.logger).Log("msg", ErrTimestampConversionFailed, "err", err, "type", reflect.TypeOf(v).String()) } - return nil, errors.New(ErrTimestampConversionFailed) + return time.Time{}, errors.New(ErrTimestampConversionFailed) } // Parse the timestamp source according to the configured format @@ -177,34 +181,36 @@ func (ts *timestampStage) parseTimestampFromSource(extracted map[string]interfac level.Debug(ts.logger).Log("msg", ErrTimestampParsingFailed, "err", err, "format", ts.cfg.Format, "value", s) } - return nil, errors.New(ErrTimestampParsingFailed) + return time.Time{}, errors.New(ErrTimestampParsingFailed) } - return &parsedTs, nil + return parsedTs, nil } -func (ts *timestampStage) processActionOnFailure(labels model.LabelSet, t *time.Time) { +func (ts *timestampStage) processActionOnFailure(labels model.LabelSet, t time.Time) time.Time { switch *ts.cfg.ActionOnFailure { case TimestampActionOnFailureFudge: - ts.processActionOnFailureFudge(labels, t) + return ts.processActionOnFailureFudge(labels, t) case TimestampActionOnFailureSkip: // Nothing to do } + return t } -func (ts *timestampStage) processActionOnFailureFudge(labels model.LabelSet, t *time.Time) { +func (ts *timestampStage) processActionOnFailureFudge(labels model.LabelSet, t time.Time) time.Time { labelsStr := labels.String() lastTimestamp, ok := ts.lastKnownTimestamps.Get(labelsStr) // If the last known timestamp is unknown (ie. has not been successfully parsed yet) // there's nothing we can do, so we're going to keep the current timestamp if !ok { - return + return t } // Fudge the timestamp - *t = lastTimestamp.(time.Time).Add(1 * time.Nanosecond) + t = lastTimestamp.(time.Time).Add(1 * time.Nanosecond) // Store the fudged timestamp, so that a subsequent fudged timestamp will be 1ns after it - ts.lastKnownTimestamps.Add(labelsStr, *t) + ts.lastKnownTimestamps.Add(labelsStr, t) + return t } diff --git a/pkg/logentry/stages/timestamp_test.go b/pkg/logentry/stages/timestamp_test.go index 9750995ae86da..e231c0f46d577 100644 --- a/pkg/logentry/stages/timestamp_test.go +++ b/pkg/logentry/stages/timestamp_test.go @@ -40,11 +40,10 @@ func TestTimestampPipeline(t *testing.T) { t.Fatal(err) } lbls := model.LabelSet{} - ts := time.Now() - entry := testTimestampLogLine extracted := map[string]interface{}{} - pl.Process(lbls, extracted, &ts, &entry) - assert.Equal(t, time.Date(2012, 11, 01, 22, 8, 41, 0, time.FixedZone("", -4*60*60)).Unix(), ts.Unix()) + result := &resultChain{} + pl.Process(lbls, extracted, time.Now(), testTimestampLogLine, result) + assert.Equal(t, time.Date(2012, 11, 01, 22, 8, 41, 0, time.FixedZone("", -4*60*60)).Unix(), result.time.Unix()) } var ( @@ -235,10 +234,10 @@ func TestTimestampStage_Process(t *testing.T) { if err != nil { t.Fatal(err) } - ts := time.Now() lbls := model.LabelSet{} - st.Process(lbls, test.extracted, &ts, nil) - assert.Equal(t, test.expected.UnixNano(), ts.UnixNano()) + result := &resultChain{} + st.Process(lbls, test.extracted, time.Now(), "", result) + assert.Equal(t, test.expected.UnixNano(), result.time.UnixNano()) }) } } @@ -378,12 +377,10 @@ func TestTimestampStage_ProcessActionOnFailure(t *testing.T) { require.NoError(t, err) for i, inputEntry := range testData.inputEntries { - extracted := inputEntry.extracted - timestamp := inputEntry.timestamp - entry := "" + result := &resultChain{} - s.Process(inputEntry.labels, extracted, ×tamp, &entry) - assert.Equal(t, testData.expectedTimestamps[i], timestamp, "entry: %d", i) + s.Process(inputEntry.labels, inputEntry.extracted, inputEntry.timestamp, "", result) + assert.Equal(t, testData.expectedTimestamps[i], result.time, "entry: %d", i) } }) } diff --git a/pkg/logentry/stages/util_test.go b/pkg/logentry/stages/util_test.go index 5cf5e6c9cac86..469515a50cb25 100644 --- a/pkg/logentry/stages/util_test.go +++ b/pkg/logentry/stages/util_test.go @@ -39,6 +39,20 @@ func assertLabels(t *testing.T, expect map[string]string, got model.LabelSet) { } } +type resultChain struct { + labels model.LabelSet + extracted map[string]interface{} + time time.Time + entry string +} + +func (rc *resultChain) NextStage(labels model.LabelSet, extracted map[string]interface{}, time time.Time, entry string) { + rc.labels = labels + rc.extracted = extracted + rc.time = time + rc.entry = entry +} + // Verify the formatting of float conversion to make sure there are not any trailing zeros, // and also make sure unix timestamps are converted properly func TestGetString(t *testing.T) {