Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added basic multiline support based on stage-chains idea #1380

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/logentry/stages/extensions.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
const RFC3339Nano = "RFC3339Nano"

// NewDocker creates a Docker json log format specific pipeline stage.
func NewDocker(logger log.Logger, registerer prometheus.Registerer) (Stage, error) {
func NewDocker(logger log.Logger, registerer prometheus.Registerer) (*Pipeline, error) {
stages := PipelineStages{
PipelineStage{
StageTypeJSON: JSONConfig{
Expand Down Expand Up @@ -36,7 +36,7 @@ func NewDocker(logger log.Logger, registerer prometheus.Registerer) (Stage, erro
}

// NewCRI creates a CRI format specific pipeline stage
func NewCRI(logger log.Logger, registerer prometheus.Registerer) (Stage, error) {
func NewCRI(logger log.Logger, registerer prometheus.Registerer) (*Pipeline, error) {
stages := PipelineStages{
PipelineStage{
StageTypeRegex: RegexConfig{
Expand Down
18 changes: 10 additions & 8 deletions pkg/logentry/stages/extensions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,12 @@ func TestNewDocker(t *testing.T) {
}
lbs := toLabelSet(tt.labels)
extr := map[string]interface{}{}
p.Process(lbs, extr, &tt.t, &tt.entry)
result := &resultChain{}
p.Process(lbs, extr, tt.t, tt.entry, result)

assertLabels(t, tt.expectedLabels, lbs)
assert.Equal(t, tt.expectedEntry, tt.entry, "did not receive expected log entry")
if tt.t.Unix() != tt.expectedT.Unix() {
assertLabels(t, tt.expectedLabels, result.labels)
assert.Equal(t, tt.expectedEntry, result.entry, "did not receive expected log entry")
if result.time.Unix() != tt.expectedT.Unix() {
t.Fatalf("mismatch ts want: %s got:%s", tt.expectedT, tt.t)
}
})
Expand Down Expand Up @@ -147,11 +148,12 @@ func TestNewCri(t *testing.T) {
}
lbs := toLabelSet(tt.labels)
extr := map[string]interface{}{}
p.Process(lbs, extr, &tt.t, &tt.entry)
result := &resultChain{}
p.Process(lbs, extr, tt.t, tt.entry, result)

assertLabels(t, tt.expectedLabels, lbs)
assert.Equal(t, tt.expectedEntry, tt.entry, "did not receive expected log entry")
if tt.t.Unix() != tt.expectedT.Unix() {
assertLabels(t, tt.expectedLabels, result.labels)
assert.Equal(t, tt.expectedEntry, result.entry, "did not receive expected log entry")
if result.time.Unix() != tt.expectedT.Unix() {
t.Fatalf("mismatch ts want: %s got:%s", tt.expectedT, tt.t)
}
})
Expand Down
15 changes: 5 additions & 10 deletions pkg/logentry/stages/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,9 @@ func parseJSONConfig(config interface{}) (*JSONConfig, error) {
}

// Process implements Stage
func (j *jsonStage) Process(labels model.LabelSet, extracted map[string]interface{}, t *time.Time, entry *string) {
func (j *jsonStage) Process(labels model.LabelSet, extracted map[string]interface{}, time time.Time, entry string, chain StageChain) {
defer chain.NextStage(labels, extracted, time, entry) // we can use defer, as we don't update time or entry

// If a source key is provided, the json stage should process it
// from the exctracted map, otherwise should fallback to the entry
input := entry
Expand All @@ -113,19 +115,12 @@ func (j *jsonStage) Process(labels model.LabelSet, extracted map[string]interfac
return
}

input = &value
}

if input == nil {
if Debug {
level.Debug(j.logger).Log("msg", "cannot parse a nil entry")
}
return
input = value
}

var data map[string]interface{}

if err := json.Unmarshal([]byte(*input), &data); err != nil {
if err := json.Unmarshal([]byte(input), &data); err != nil {
if Debug {
level.Debug(j.logger).Log("msg", "failed to unmarshal log line", "err", err)
}
Expand Down
11 changes: 6 additions & 5 deletions pkg/logentry/stages/json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,9 @@ func TestPipeline_JSON(t *testing.T) {
ts := time.Now()
entry := testData.entry
extracted := map[string]interface{}{}
pl.Process(lbls, extracted, &ts, &entry)
assert.Equal(t, testData.expectedExtract, extracted)
result := &resultChain{}
pl.Process(lbls, extracted, ts, entry, result)
assert.Equal(t, testData.expectedExtract, result.extracted)
})
}
}
Expand Down Expand Up @@ -351,10 +352,10 @@ func TestJSONParser_Parse(t *testing.T) {
}
lbs := model.LabelSet{}
extr := tt.extracted
ts := time.Now()
p.Process(lbs, extr, &ts, &tt.entry)
result := &resultChain{}
p.Process(lbs, extr, time.Now(), tt.entry, result)

assert.Equal(t, tt.expectedExtract, extr)
assert.Equal(t, tt.expectedExtract, result.extracted)
})
}
}
4 changes: 3 additions & 1 deletion pkg/logentry/stages/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ type labelStage struct {
}

// Process implements Stage
func (l *labelStage) Process(labels model.LabelSet, extracted map[string]interface{}, t *time.Time, entry *string) {
func (l *labelStage) Process(labels model.LabelSet, extracted map[string]interface{}, time time.Time, entry string, chain StageChain) {
for lName, lSrc := range l.cfgs {
if lValue, ok := extracted[*lSrc]; ok {
s, err := getString(lValue)
Expand All @@ -82,6 +82,8 @@ func (l *labelStage) Process(labels model.LabelSet, extracted map[string]interfa
labels[model.LabelName(lName)] = labelValue
}
}

chain.NextStage(labels, extracted, time, entry)
}

// Name implements Stage
Expand Down
14 changes: 8 additions & 6 deletions pkg/logentry/stages/labels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,10 @@ func TestLabelsPipeline_Labels(t *testing.T) {
"level": "WARN",
"app": "loki",
}
ts := time.Now()
entry := testLabelsLogLine
extracted := map[string]interface{}{}
pl.Process(lbls, extracted, &ts, &entry)
assert.Equal(t, expectedLbls, lbls)
result := &resultChain{}
pl.Process(lbls, extracted, time.Now(), testLabelsLogLine, result)
assert.Equal(t, expectedLbls, result.labels)
}

var (
Expand Down Expand Up @@ -157,8 +156,11 @@ func TestLabelStage_Process(t *testing.T) {
if err != nil {
t.Fatal(err)
}
st.Process(test.inputLabels, test.extractedData, nil, nil)
assert.Equal(t, test.expectedLabels, test.inputLabels)

result := &resultChain{}

st.Process(test.inputLabels, test.extractedData, time.Now(), "", result)
assert.Equal(t, test.expectedLabels, result.labels)
})
}
}
21 changes: 16 additions & 5 deletions pkg/logentry/stages/match.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func validateMatcherConfig(cfg *MatcherConfig) (logql.LogSelectorExpr, error) {
}

// newMatcherStage creates a new matcherStage from config
func newMatcherStage(logger log.Logger, jobName *string, config interface{}, registerer prometheus.Registerer) (Stage, error) {
func newMatcherStage(logger log.Logger, jobName *string, config interface{}, registerer prometheus.Registerer) (*matcherStage, error) {
cfg := &MatcherConfig{}
err := mapstructure.Decode(config, cfg)
if err != nil {
Expand Down Expand Up @@ -111,24 +111,27 @@ func newMatcherStage(logger log.Logger, jobName *string, config interface{}, reg
type matcherStage struct {
matchers []*labels.Matcher
filter logql.Filter
pipeline Stage
pipeline *Pipeline
action string
}

// Process implements Stage
func (m *matcherStage) Process(labels model.LabelSet, extracted map[string]interface{}, t *time.Time, entry *string) {
func (m *matcherStage) Process(labels model.LabelSet, extracted map[string]interface{}, time time.Time, entry string, chain StageChain) {
for _, filter := range m.matchers {
if !filter.Matches(string(labels[model.LabelName(filter.Name)])) {
chain.NextStage(labels, extracted, time, entry)
return
}
}
if m.filter == nil || m.filter([]byte(*entry)) {
if m.filter == nil || m.filter([]byte(entry)) {
switch m.action {
case MatchActionDrop:
// Adds the drop label to not be sent by the api.EntryHandler
labels[dropLabel] = ""
chain.NextStage(labels, extracted, time, entry)
case MatchActionKeep:
m.pipeline.Process(labels, extracted, t, entry)
// run all stages in the pipeline, and then continue with our original chain afterwards
m.pipeline.Process(labels, extracted, time, entry, matcherChain{chain: chain})
}
}
}
Expand All @@ -137,3 +140,11 @@ func (m *matcherStage) Process(labels model.LabelSet, extracted map[string]inter
func (m *matcherStage) Name() string {
return StageTypeMatch
}

type matcherChain struct {
chain StageChain
}

func (mc matcherChain) NextStage(labels model.LabelSet, extracted map[string]interface{}, time time.Time, entry string) {
mc.chain.NextStage(labels, extracted, time, entry)
}
22 changes: 11 additions & 11 deletions pkg/logentry/stages/match_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,18 +65,18 @@ func TestMatchPipeline(t *testing.T) {
t.Fatal(err)
}
lbls := model.LabelSet{}
ts := time.Now()

// Process the first log line which should extract the output from the `message` field
entry := testMatchLogLineApp1
extracted := map[string]interface{}{}
pl.Process(lbls, extracted, &ts, &entry)
assert.Equal(t, "app1 log line", entry)
result := &resultChain{}
pl.Process(lbls, extracted, time.Now(), testMatchLogLineApp1, result)
assert.Equal(t, "app1 log line", result.entry)

// Process the second log line which should extract the output from the `msg` field
entry = testMatchLogLineApp2
extracted = map[string]interface{}{}
pl.Process(lbls, extracted, &ts, &entry)
assert.Equal(t, "app2 log line", entry)
result = &resultChain{}
pl.Process(lbls, extracted, time.Now(), testMatchLogLineApp2, result)
assert.Equal(t, "app2 log line", result.entry)

got, err := registry.Gather()
if err != nil {
Expand Down Expand Up @@ -170,21 +170,21 @@ func TestMatcher(t *testing.T) {
return
}
if s != nil {
ts, entry := time.Now(), "foo"
extracted := map[string]interface{}{
"test_label": "unimportant value",
}
labels := toLabelSet(tt.labels)
s.Process(labels, extracted, &ts, &entry)
result := &resultChain{}
s.Process(labels, extracted, time.Now(), "foo", result)

// test_label should only be in the label set if the stage ran
if _, ok := labels["test_label"]; ok {
if _, ok := result.labels["test_label"]; ok {
if !tt.shouldRun {
t.Error("stage ran but should have not")
}
}
if tt.shouldDrop {
if _, ok := labels[dropLabel]; !ok {
if _, ok := result.labels[dropLabel]; !ok {
t.Error("stage should have been dropped")
}
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/logentry/stages/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ type metricStage struct {
}

// Process implements Stage
func (m *metricStage) Process(labels model.LabelSet, extracted map[string]interface{}, t *time.Time, entry *string) {
func (m *metricStage) Process(labels model.LabelSet, extracted map[string]interface{}, time time.Time, entry string, chain StageChain) {
for name, collector := range m.metrics {
if v, ok := extracted[*m.cfg[name].Source]; ok {
switch vec := collector.(type) {
Expand All @@ -128,6 +128,8 @@ func (m *metricStage) Process(labels model.LabelSet, extracted map[string]interf
}
}
}

chain.NextStage(labels, extracted, time, entry)
}

// Name implements Stage
Expand Down
19 changes: 8 additions & 11 deletions pkg/logentry/stages/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,9 @@ func TestMetricsPipeline(t *testing.T) {
t.Fatal(err)
}
lbls := model.LabelSet{}
ts := time.Now()
extracted := map[string]interface{}{}
entry := testMetricLogLine1
pl.Process(lbls, extracted, &ts, &entry)
entry = testMetricLogLine2
pl.Process(lbls, extracted, &ts, &entry)
pl.Process(lbls, extracted, time.Now(), testMetricLogLine1, &resultChain{})
pl.Process(lbls, extracted, time.Now(), testMetricLogLine2, &resultChain{})

if err := testutil.GatherAndCompare(registry,
strings.NewReader(expectedMetrics)); err != nil {
Expand Down Expand Up @@ -244,14 +241,14 @@ func TestMetricStage_Process(t *testing.T) {
if err != nil {
t.Fatalf("failed to create stage with metrics: %v", err)
}
var ts = time.Now()
var entry = logFixture

extr := map[string]interface{}{}
jsonStage.Process(labelFoo, extr, &ts, &entry)
regexStage.Process(labelFoo, extr, &ts, &regexLogFixture)
metricStage.Process(labelFoo, extr, &ts, &entry)

jsonStage.Process(labelFoo, extr, time.Now(), logFixture, &resultChain{})
regexStage.Process(labelFoo, extr, time.Now(), regexLogFixture, &resultChain{})
metricStage.Process(labelFoo, extr, time.Now(), logFixture, &resultChain{})
// Process the same extracted values again with different labels so we can verify proper metric/label assignments
metricStage.Process(labelFu, extr, &ts, &entry)
metricStage.Process(labelFu, extr, time.Now(), logFixture, &resultChain{})

names := metricNames(metricsConfig)
if err := testutil.GatherAndCompare(registry,
Expand Down
Loading