Skip to content

Commit

Permalink
feat(logql): parse or in line filters
Browse files Browse the repository at this point in the history
  • Loading branch information
tdakkota committed Apr 24, 2024
1 parent 2250aa6 commit ef690b6
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 39 deletions.
6 changes: 3 additions & 3 deletions internal/chstorage/querier_logs_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,12 +448,12 @@ func (q logQueryPredicates) write(

switch m.Op {
case logql.OpEq, logql.OpNotEq:
fmt.Fprintf(query, "positionUTF8(body, %s) > 0", singleQuoted(m.Value))
fmt.Fprintf(query, "positionUTF8(body, %s) > 0", singleQuoted(m.By.Value))
{
// HACK: check for special case of hex-encoded trace_id and span_id.
// Like `{http_method=~".+"} |= "af36000000000000c517000000000003"`.
// TODO(ernado): also handle regex?
encoded := strings.ToLower(m.Value)
encoded := strings.ToLower(m.By.Value)
v, _ := hex.DecodeString(encoded)
switch len(v) {
case len(otelstorage.TraceID{}):
Expand All @@ -463,7 +463,7 @@ func (q logQueryPredicates) write(
}
}
case logql.OpRe, logql.OpNotRe:
fmt.Fprintf(query, "match(body, %s)", singleQuoted(m.Value))
fmt.Fprintf(query, "match(body, %s)", singleQuoted(m.By.Value))
}
query.WriteByte(')')
}
Expand Down
11 changes: 8 additions & 3 deletions internal/logql/logqlengine/line_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,25 @@ import (
"net/netip"

"github.com/go-faster/oteldb/internal/logql"
"github.com/go-faster/oteldb/internal/logql/logqlengine/logqlerrors"
"github.com/go-faster/oteldb/internal/otelstorage"
)

func buildLineFilter(stage *logql.LineFilter) (Processor, error) {
if stage.IP {
matcher, err := buildIPMatcher(stage.Op, stage.Value)
if len(stage.Or) > 0 {
return nil, &logqlerrors.UnsupportedError{Msg: "or in line filters is unsupported"}
}

if stage.By.IP {
matcher, err := buildIPMatcher(stage.Op, stage.By.Value)
if err != nil {
return nil, err
}

return &IPLineFilter{matcher: matcher}, nil
}

matcher, err := buildStringMatcher(stage.Op, stage.Value, stage.Re, false)
matcher, err := buildStringMatcher(stage.Op, stage.By.Value, stage.By.Re, false)
if err != nil {
return nil, err
}
Expand Down
16 changes: 10 additions & 6 deletions internal/logql/logqlengine/line_filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,11 @@ func TestIPLineFilter(t *testing.T) {
set := NewLabelSet()

f, err := buildLineFilter(&logql.LineFilter{
Op: logql.OpEq,
Value: tt.pattern,
IP: true,
Op: logql.OpEq,
By: logql.LineFilterValue{
Value: tt.pattern,
IP: true,
},
})
require.NoError(t, err)

Expand All @@ -129,9 +131,11 @@ func FuzzIPLineFilter(f *testing.F) {

f.Fuzz(func(t *testing.T, line, pattern string) {
f, err := buildLineFilter(&logql.LineFilter{
Op: logql.OpEq,
Value: pattern,
IP: true,
Op: logql.OpEq,
By: logql.LineFilterValue{
Value: pattern,
IP: true,
},
})
if err != nil {
t.Skipf("Invalid pattern: %q", pattern)
Expand Down
44 changes: 32 additions & 12 deletions internal/logql/parser_pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ func (p *parser) parsePipeline(allowUnwrap bool) (stages []PipelineStage, err er
for {
switch t := p.peek(); t.Type {
case lexer.PipeExact, lexer.PipeMatch, lexer.NotEq, lexer.NotRe: // ( "|=" | "|~" | "!=" | "!~" )
lf, err := p.parseLineFilter()
lf, err := p.parseLineFilters()
if err != nil {
return stages, err
}
Expand Down Expand Up @@ -117,7 +117,7 @@ func (p *parser) parsePipeline(allowUnwrap bool) (stages []PipelineStage, err er
}
}

func (p *parser) parseLineFilter() (f *LineFilter, err error) {
func (p *parser) parseLineFilters() (f *LineFilter, err error) {
t := p.next()

f = new(LineFilter)
Expand All @@ -131,47 +131,67 @@ func (p *parser) parseLineFilter() (f *LineFilter, err error) {
case lexer.NotRe: // "!~"
f.Op = OpNotRe
default:
return nil, p.unexpectedToken(t)
return f, p.unexpectedToken(t)
}

f.By, err = p.parseLineFilterValue(f.Op)
if err != nil {
return nil, err
}

for {
if t := p.peek(); t.Type != lexer.Or {
return f, nil
}
p.next()

sub, err := p.parseLineFilterValue(f.Op)
if err != nil {
return nil, err
}
f.Or = append(f.Or, sub)
}
}

func (p *parser) parseLineFilterValue(op BinOp) (f LineFilterValue, err error) {
switch t := p.peek(); t.Type {
case lexer.String:
f.Value, err = p.parseString()
if err != nil {
return nil, err
return f, err
}

switch f.Op {
switch op {
case OpRe, OpNotRe:
f.Re, err = regexp.Compile(f.Value)
if err != nil {
return nil, errors.Wrapf(err, "invalid regex in line filter %q", f.Value)
return f, errors.Wrapf(err, "invalid regex in line filter %q", f.Value)
}
}
case lexer.IP:
p.next()

switch f.Op {
switch op {
case OpEq, OpNotEq:
default:
return nil, errors.Errorf("invalid IP line filter operation %q", f.Op)
return f, errors.Errorf("invalid IP line filter operation %q", op)
}

if err := p.consume(lexer.OpenParen); err != nil {
return nil, err
return f, err
}

f.Value, err = p.parseString()
if err != nil {
return nil, err
return f, err
}
f.IP = true

if err := p.consume(lexer.CloseParen); err != nil {
return nil, err
return f, err
}
default:
return nil, p.unexpectedToken(t)
return f, p.unexpectedToken(t)
}
return f, nil
}
Expand Down
51 changes: 37 additions & 14 deletions internal/logql/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,23 @@ var tests = []TestCase{
"{} |= `foo`",
&LogExpr{
Pipeline: []PipelineStage{
&LineFilter{Op: OpEq, Value: "foo"},
&LineFilter{Op: OpEq, By: LineFilterValue{Value: "foo"}},
},
},
false,
},
{
`{} |= "foo" or "bar" or ip("baz")`,
&LogExpr{
Pipeline: []PipelineStage{
&LineFilter{
Op: OpEq,
By: LineFilterValue{Value: "foo"},
Or: []LineFilterValue{
{Value: "bar"},
{Value: "baz", IP: true},
},
},
},
},
false,
Expand All @@ -148,11 +164,11 @@ var tests = []TestCase{
},
},
Pipeline: []PipelineStage{
&LineFilter{Op: OpEq, Value: "bad"},
&LineFilter{Op: OpRe, Value: "error", Re: regexp.MustCompile(`error`)},
&LineFilter{Op: OpNotEq, Value: "good"},
&LineFilter{Op: OpNotRe, Value: "exception", Re: regexp.MustCompile(`exception`)},
&LineFilter{Op: OpEq, Value: "127.0.0.1", IP: true},
&LineFilter{Op: OpEq, By: LineFilterValue{Value: "bad"}},
&LineFilter{Op: OpRe, By: LineFilterValue{Value: "error", Re: regexp.MustCompile(`error`)}},
&LineFilter{Op: OpNotEq, By: LineFilterValue{Value: "good"}},
&LineFilter{Op: OpNotRe, By: LineFilterValue{Value: "exception", Re: regexp.MustCompile(`exception`)}},
&LineFilter{Op: OpEq, By: LineFilterValue{Value: "127.0.0.1", IP: true}},
},
},
false,
Expand All @@ -168,7 +184,7 @@ var tests = []TestCase{
},
},
Pipeline: []PipelineStage{
&LineFilter{Op: OpEq, Value: "bad"},
&LineFilter{Op: OpEq, By: LineFilterValue{Value: "bad"}},
},
},
},
Expand All @@ -191,7 +207,7 @@ var tests = []TestCase{
},
},
Pipeline: []PipelineStage{
&LineFilter{Op: OpEq, Value: "bad"},
&LineFilter{Op: OpEq, By: LineFilterValue{Value: "bad"}},
&LogfmtExpressionParser{},
&JSONExpressionParser{},
&RegexpLabelParser{
Expand Down Expand Up @@ -223,7 +239,7 @@ var tests = []TestCase{
},
},
Pipeline: []PipelineStage{
&LineFilter{Op: OpEq, Value: "bad"},
&LineFilter{Op: OpEq, By: LineFilterValue{Value: "bad"}},
&JSONExpressionParser{},
&JSONExpressionParser{
Labels: []Label{
Expand Down Expand Up @@ -548,7 +564,7 @@ var tests = []TestCase{
},
},
Pipeline: []PipelineStage{
&LineFilter{Op: OpEq, Value: "error"},
&LineFilter{Op: OpEq, By: LineFilterValue{Value: "error"}},
&LogfmtExpressionParser{},
},
Unwrap: &UnwrapExpr{
Expand Down Expand Up @@ -600,7 +616,7 @@ var tests = []TestCase{
},
},
Pipeline: []PipelineStage{
&LineFilter{Op: OpEq, Value: "error"},
&LineFilter{Op: OpEq, By: LineFilterValue{Value: "error"}},
},
Unwrap: &UnwrapExpr{
Label: "duration",
Expand All @@ -621,7 +637,7 @@ var tests = []TestCase{
},
},
Pipeline: []PipelineStage{
&LineFilter{Op: OpEq, Value: "error"},
&LineFilter{Op: OpEq, By: LineFilterValue{Value: "error"}},
},
Unwrap: &UnwrapExpr{
Op: "duration",
Expand Down Expand Up @@ -671,8 +687,8 @@ var tests = []TestCase{
},
},
Pipeline: []PipelineStage{
&LineFilter{Op: OpEq, Value: "error"},
&LineFilter{Op: OpNotEq, Value: "timeout"},
&LineFilter{Op: OpEq, By: LineFilterValue{Value: "error"}},
&LineFilter{Op: OpNotEq, By: LineFilterValue{Value: "timeout"}},
&JSONExpressionParser{},
&LabelFilter{
Pred: &DurationFilter{"duration", OpGt, 10 * time.Second},
Expand Down Expand Up @@ -1073,6 +1089,8 @@ var tests = []TestCase{
{`{foo = "bar"} | unwrap label`, nil, true},
{`{foo = "bar"} |= foo`, nil, true},
{`{foo = "bar"} |= ip("foo"`, nil, true},
{`{foo = "bar"} |= ip("foo") or ip(`, nil, true},
{`{foo = "bar"} |= ip("foo") or ip("foo"`, nil, true},
// Tail expression
{`{foo = "bar"} |= "foo" {}`, nil, true},
// Missing identifier.
Expand All @@ -1086,13 +1104,17 @@ var tests = []TestCase{
{`{foo = "bar"} | drop`, nil, true},
{`{foo = "bar"} | drop foo,`, nil, true},
{`{foo = "bar"} | drop foo=`, nil, true},
// Missing expression.
{`{foo = "bar"} |= "foo" or`, nil, true},
{`{foo = "bar"} | label="foo",`, nil, true},
// Missing string value.
{`{foo = "bar"} | json bar=`, nil, true},
{`{foo = "bar"} | regexp`, nil, true},
{`{foo = "bar"} | pattern`, nil, true},
{`{foo = "bar"} | line_format`, nil, true},
{`{foo = "bar"} | addr == ip()`, nil, true},
{`{foo = "bar"} |= ip()`, nil, true},
{`{foo = "bar"} |= ip("foo") or ip()`, nil, true},
// Invalid comparison operation.
{`{foo = "bar"} | addr >= ip("127.0.0.1")`, nil, true},
{`{foo = "bar"} |~ ip("127.0.0.1")`, nil, true},
Expand Down Expand Up @@ -1143,6 +1165,7 @@ var tests = []TestCase{
// Invalid regexp.
{`{foo=~"\\"}`, nil, true},
{`{} |~ "\\"`, nil, true},
{`{} |~ ".+" or "\\"`, nil, true},
{`{} | regexp "\\"`, nil, true},
{`{} | foo=~"\\"`, nil, true},
{`label_replace(rate({job="mysql"}[1m]), "dst", "replacement", "src", "\\")`, nil, true},
Expand Down
8 changes: 7 additions & 1 deletion internal/logql/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,13 @@ func (*DistinctFilter) pipelineStage() {}

// LineFilter is a line filter (`|=`, `!=`, `=~`, `!~`).
type LineFilter struct {
Op BinOp // OpEq, OpNotEq, OpRe, OpNotRe
Op BinOp // OpEq, OpNotEq, OpRe, OpNotRe
By LineFilterValue
Or []LineFilterValue
}

// LineFilterValue is a line filter literal to search by.
type LineFilterValue struct {
Value string // Equals to value or to unparsed regexp
Re *regexp.Regexp // Equals to nil, if Op is not OpRe or OpNotRe
IP bool // true, if this line filter is IP filter.
Expand Down

0 comments on commit ef690b6

Please sign in to comment.