Skip to content

Commit

Permalink
fix(logqlengine): properly handle negated OR line filter
Browse files Browse the repository at this point in the history
  • Loading branch information
tdakkota committed Jul 3, 2024
1 parent fd980f5 commit 721e905
Showing 1 changed file with 44 additions and 11 deletions.
55 changes: 44 additions & 11 deletions internal/logql/logqlengine/line_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,25 +11,48 @@ import (
)

func buildLineFilter(stage *logql.LineFilter) (Processor, error) {
matcher, err := buildLineMatcher(stage.Op, stage.By)
if err != nil {
return nil, err
}

if len(stage.Or) > 0 {
var (
op = stage.Op
negate bool
)
switch op {
case logql.OpNotEq:
op = logql.OpEq
negate = true
case logql.OpNotRe:
op = logql.OpRe
negate = true
case logql.OpNotPattern:
op = logql.OpPattern
negate = true
}

matchers := make([]StringMatcher, 0, len(stage.Or)+1)

matcher, err := buildLineMatcher(op, stage.By)
if err != nil {
return nil, err
}
matchers = append(matchers, matcher)

for _, by := range stage.Or {
m, err := buildLineMatcher(stage.Op, by)
m, err := buildLineMatcher(op, by)
if err != nil {
return nil, err
}
matchers = append(matchers, m)
}
return &OrLineFilter{matchers: matchers}, nil
return &OrLineFilter{
matchers: matchers,
negate: negate,
}, nil
}

matcher, err := buildLineMatcher(stage.Op, stage.By)
if err != nil {
return nil, err
}
return &LineFilter{matcher: matcher}, nil
}

Expand All @@ -52,17 +75,27 @@ func buildLineMatcher(op logql.BinOp, by logql.LineFilterValue) (StringMatcher,
// OrLineFilter is a line matching Processor.
type OrLineFilter struct {
matchers []StringMatcher
negate bool
}

// Process implements Processor.
func (lf *OrLineFilter) Process(_ otelstorage.Timestamp, line string, _ logqlabels.LabelSet) (_ string, keep bool) {
func (lf *OrLineFilter) match(line string) bool {
// TODO(tdakkota): cache IP captures
for _, m := range lf.matchers {
if m.Match(line) {
return line, true
return true
}
}
return line, false
return false
}

// Process implements Processor.
func (lf *OrLineFilter) Process(_ otelstorage.Timestamp, line string, _ logqlabels.LabelSet) (_ string, keep bool) {
// TODO(tdakkota): cache IP captures
keep = lf.match(line)
if lf.negate {
keep = !keep
}
return line, keep
}

// LineFilter is a line matching Processor.
Expand Down

0 comments on commit 721e905

Please sign in to comment.