Skip to content

Commit

Permalink
feat(logqlengine): implement or in line filters
Browse files Browse the repository at this point in the history
  • Loading branch information
tdakkota committed Jul 2, 2024
1 parent 308551c commit f59f962
Show file tree
Hide file tree
Showing 2 changed files with 158 additions and 17 deletions.
70 changes: 53 additions & 17 deletions internal/logql/logqlengine/line_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,59 @@ func buildLineFilter(stage *logql.LineFilter) (Processor, error) {
case logql.OpPattern, logql.OpNotPattern:
return nil, &logqlerrors.UnsupportedError{Msg: fmt.Sprintf("%s line filter is unsupported", op)}
}

matcher, err := buildLineMatcher(stage.Op, stage.By)
if err != nil {
return nil, err
}

if len(stage.Or) > 0 {
return nil, &logqlerrors.UnsupportedError{Msg: "or in line filters is unsupported"}
matchers := make([]StringMatcher, 0, len(stage.Or)+1)
matchers = append(matchers, matcher)

for _, by := range stage.Or {
m, err := buildLineMatcher(stage.Op, by)
if err != nil {
return nil, err
}
matchers = append(matchers, m)
}
return &OrLineFilter{matchers: matchers}, nil
}

return &LineFilter{matcher: matcher}, nil
}

func buildLineMatcher(op logql.BinOp, by logql.LineFilterValue) (StringMatcher, error) {
switch op {
case logql.OpPattern, logql.OpNotPattern:
return nil, &logqlerrors.UnsupportedError{Msg: fmt.Sprintf("%s line filter is unsupported", op)}
}

if stage.By.IP {
matcher, err := buildIPMatcher(stage.Op, stage.By.Value)
if by.IP {
matcher, err := buildIPMatcher(op, by.Value)
if err != nil {
return nil, err
}

return &IPLineFilter{matcher: matcher}, nil
return &IPLineMatcher{matcher: matcher}, nil
}
return buildStringMatcher(op, by.Value, by.Re, false)
}

matcher, err := buildStringMatcher(stage.Op, stage.By.Value, stage.By.Re, false)
if err != nil {
return nil, err
}
// OrLineFilter is a line matching Processor.
type OrLineFilter struct {
matchers []StringMatcher
}

return &LineFilter{matcher: matcher}, nil
// Process implements Processor.
func (lf *OrLineFilter) Process(_ otelstorage.Timestamp, line string, _ logqlabels.LabelSet) (_ string, keep bool) {
// TODO(tdakkota): cache IP captures
for _, m := range lf.matchers {
if m.Match(line) {
return line, true
}
}
return line, false
}

// LineFilter is a line matching Processor.
Expand All @@ -47,13 +81,15 @@ func (lf *LineFilter) Process(_ otelstorage.Timestamp, line string, _ logqlabels
return line, keep
}

// IPLineFilter looks for IP address in a line and applies matcher to it.
type IPLineFilter struct {
// IPLineMatcher looks for IP address in a line and applies matcher to it.
type IPLineMatcher struct {
matcher IPMatcher
}

// Process implements Processor.
func (lf *IPLineFilter) Process(_ otelstorage.Timestamp, line string, _ logqlabels.LabelSet) (_ string, keep bool) {
var _ StringMatcher = (*IPLineMatcher)(nil)

// Match implements StringMatcher.
func (lf *IPLineMatcher) Match(line string) bool {
for i := 0; i < len(line); {
c := line[i]
if !isHexDigit(c) && c != ':' {
Expand All @@ -66,7 +102,7 @@ func (lf *IPLineFilter) Process(_ otelstorage.Timestamp, line string, _ logqlabe

ip, err := netip.ParseAddr(capture)
if err == nil && lf.matcher.Match(ip) {
return line, true
return true
}
continue
}
Expand All @@ -75,14 +111,14 @@ func (lf *IPLineFilter) Process(_ otelstorage.Timestamp, line string, _ logqlabe

ip, err := netip.ParseAddr(capture)
if err == nil && lf.matcher.Match(ip) {
return line, true
return true
}
continue
}
i++
}

return line, false
return false
}

func tryCaptureIPv4(s string) (string, bool) {
Expand Down
105 changes: 105 additions & 0 deletions internal/logql/logqlengine/line_filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package logqlengine

import (
"fmt"
"regexp"
"testing"

"github.com/stretchr/testify/require"
Expand All @@ -10,6 +11,110 @@ import (
"github.com/go-faster/oteldb/internal/logql/logqlengine/logqlabels"
)

func TestLineFilter(t *testing.T) {
tests := []struct {
input string
filter logql.LineFilter
wantOk bool
}{
{
"",
logql.LineFilter{
Op: logql.OpEq,
By: logql.LineFilterValue{
Value: "",
},
},
true,
},
{
"foo",
logql.LineFilter{
Op: logql.OpEq,
By: logql.LineFilterValue{
Value: "foo",
},
},
true,
},
{
"",
logql.LineFilter{
Op: logql.OpRe,
By: logql.LineFilterValue{
Value: ".+",
Re: regexp.MustCompile(".+"),
},
},
false,
},
{
"foo",
logql.LineFilter{
Op: logql.OpRe,
By: logql.LineFilterValue{
Value: "(foo|bar)",
Re: regexp.MustCompile("(foo|bar)"),
},
},
true,
},
{
"192.168.1.1",
logql.LineFilter{
Op: logql.OpEq,
By: logql.LineFilterValue{
Value: "192.168.1.0/24",
IP: true,
},
},
true,
},
{
"foo",
logql.LineFilter{
Op: logql.OpEq,
By: logql.LineFilterValue{
Value: "bar",
},
Or: []logql.LineFilterValue{
{Value: "baz"},
{Value: "foo"},
},
},
true,
},
{
"foo 192.168.1.1",
logql.LineFilter{
Op: logql.OpEq,
By: logql.LineFilterValue{
Value: "bar",
},
Or: []logql.LineFilterValue{
{Value: "baz"},
{Value: "192.168.1.0/24", IP: true},
},
},
true,
},
}
for i, tt := range tests {
tt := tt
t.Run(fmt.Sprintf("Test%d", i+1), func(t *testing.T) {
set := logqlabels.NewLabelSet()

f, err := buildLineFilter(&tt.filter)
require.NoError(t, err)

newLine, gotOk := f.Process(0, tt.input, set)
// Ensure that extractor does not change the line.
require.Equal(t, tt.input, newLine)
require.Equal(t, tt.wantOk, gotOk)
})
}
}

var ipLineFilterTests = []struct {
input string
pattern string
Expand Down

0 comments on commit f59f962

Please sign in to comment.