Skip to content

Commit

Permalink
Merge pull request #449 from go-faster/feat/query-explainer
Browse files Browse the repository at this point in the history
feat(logqlengine): explain query
  • Loading branch information
tdakkota authored Jul 8, 2024
2 parents 230558c + e40c2d9 commit 0591eaa
Show file tree
Hide file tree
Showing 13 changed files with 414 additions and 31 deletions.
81 changes: 81 additions & 0 deletions integration/lokie2e/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"net/http/httptest"
"os"
"regexp"
"slices"
"testing"
"time"
Expand Down Expand Up @@ -522,6 +523,86 @@ func runTest(
})
}
})
t.Run("Explain", func(t *testing.T) {
for i, tt := range []struct {
query string
contains []string
}{
{
`{http_method=~".+"} |= "HEAD"`,
[]string{
`Offloading line filters.+|=`,
`Pipeline could be fully offloaded to Clickhouse`,
},
},
{
`{http_method=~".+"} | http_method = "GET"`,
[]string{
`Offloading pipeline label filters.+http_method=`,
`Pipeline could be fully offloaded to Clickhouse`,
},
},
{
`{http_method=~".+"} |= "HEAD" | http_method = "GET" | json | status != 200`,
[]string{
`Offloading line filters.+|=`,
`Offloading pipeline label filters.+http_method=`,
},
},

{
`sum by (http_method) ( count_over_time({http_method=~".+"} [30s]) )`,
[]string{
`Sampling could be offloaded to Clickhouse`,
},
},
{
`sum by (http_method) ( count_over_time({http_method=~".+"} |= "HEAD" [30s]) )`,
[]string{
`Offloading line filters.+|=`,
`Pipeline could be fully offloaded to Clickhouse`,
`Sampling could be offloaded to Clickhouse`,
},
},
} {
tt := tt
t.Run(fmt.Sprintf("Test%d", i+1), func(t *testing.T) {
t.Parallel()

resp, err := c.QueryRange(ctx, lokiapi.QueryRangeParams{
// Expected result is empty
Query: "@explain\n" + tt.query,
// Query all data in a one step.
Start: lokiapi.NewOptLokiTime(asLokiTime(set.End)),
End: lokiapi.NewOptLokiTime(asLokiTime(set.End + otelstorage.Timestamp(10*time.Second))),
Step: lokiapi.NewOptPrometheusDuration("30s"),
Limit: lokiapi.NewOptInt(1000),
})
require.NoError(t, err)

data, ok := resp.Data.GetStreamsResult()
require.True(t, ok)
streams := data.Result
require.Len(t, streams, 1)
entries := streams[0].Values
require.NotEmpty(t, streams, entries)

for _, pattern := range tt.contains {
re, err := regexp.Compile(pattern)
require.NoError(t, err)

require.True(t,
slices.ContainsFunc(entries, func(s lokiapi.LogEntry) bool {
return re.MatchString(s.V)
}),
"There is should be at least one log entry that matches %q in %#v",
pattern,
entries,
)
}
})
}
})
t.Run("MetricQueries", func(t *testing.T) {
t.Run("EmptySampleQuery", func(t *testing.T) {
resp, err := c.QueryRange(ctx, lokiapi.QueryRangeParams{
Expand Down
24 changes: 21 additions & 3 deletions internal/chstorage/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
"go.opentelemetry.io/otel/trace/noop"
"go.uber.org/zap"

"github.com/go-faster/oteldb/internal/chstorage/chsql"
"github.com/go-faster/oteldb/internal/tracestorage"
Expand Down Expand Up @@ -96,20 +97,37 @@ type selectQuery struct {
}

func (q *Querier) do(ctx context.Context, s selectQuery) error {
lg := zctx.From(ctx)

query, err := s.Query.Prepare(s.OnResult)
if err != nil {
return errors.Wrap(err, "build query")
}
query.ExternalData = s.ExternalData
query.ExternalTable = s.ExternalTable
query.Logger = zctx.From(ctx).Named("ch")
query.Logger = lg.Named("ch")

queryStartTime := time.Now()
if err := q.ch.Do(ctx, query); err != nil {
err = q.ch.Do(ctx, query)
took := time.Since(queryStartTime)
if ce := lg.Check(zap.DebugLevel, "Query Clickhouse"); ce != nil {
errField := zap.Skip()
if err != nil {
errField = zap.Error(err)
}
ce.Write(
zap.String("query_type", s.Type),
zap.String("table", s.Table),
zap.String("signal", s.Signal),
zap.Duration("took", took),
errField,
)
}
if err != nil {
return errors.Wrapf(err, "execute %s (signal: %s)", s.Type, s.Signal)
}

q.clickhouseRequestHistogram.Record(ctx, time.Since(queryStartTime).Seconds(),
q.clickhouseRequestHistogram.Record(ctx, took.Seconds(),
metric.WithAttributes(
attribute.String("chstorage.query_type", s.Type),
attribute.String("chstorage.table", s.Table),
Expand Down
58 changes: 44 additions & 14 deletions internal/chstorage/querier_logs_optimizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package chstorage
import (
"context"

"github.com/go-faster/sdk/zctx"
"go.uber.org/zap"

"github.com/go-faster/oteldb/internal/logql"
"github.com/go-faster/oteldb/internal/logql/logqlengine"
)
Expand All @@ -19,23 +22,30 @@ func (o *ClickhouseOptimizer) Name() string {
}

// Optimize implements [Optimizer].
func (o *ClickhouseOptimizer) Optimize(ctx context.Context, q logqlengine.Query) (logqlengine.Query, error) {
func (o *ClickhouseOptimizer) Optimize(ctx context.Context, q logqlengine.Query, opts logqlengine.OptimizeOptions) (logqlengine.Query, error) {
lg := zap.NewNop()
if opts.Explain {
lg = zctx.From(ctx).With(
zap.String("optimizer", o.Name()),
)
}

switch q := q.(type) {
case *logqlengine.LogQuery:
q.Root = o.optimizePipeline(q.Root)
q.Root = o.optimizePipeline(q.Root, lg)
case *logqlengine.MetricQuery:
if err := logqlengine.VisitNode(q.Root, func(n *logqlengine.SamplingNode) error {
n.Input = o.optimizePipeline(n.Input)
n.Input = o.optimizePipeline(n.Input, lg)
return nil
}); err != nil {
return nil, err
}
q.Root = o.optimizeSampling(q.Root)
q.Root = o.optimizeSampling(q.Root, lg)
}
return q, nil
}

func (o *ClickhouseOptimizer) optimizeSampling(n logqlengine.MetricNode) logqlengine.MetricNode {
func (o *ClickhouseOptimizer) optimizeSampling(n logqlengine.MetricNode, lg *zap.Logger) logqlengine.MetricNode {
switch n := n.(type) {
case *logqlengine.VectorAggregation:
switch n.Expr.Op {
Expand All @@ -53,25 +63,25 @@ func (o *ClickhouseOptimizer) optimizeSampling(n logqlengine.MetricNode) logqlen
if !ok {
return n
}
n.Input = o.buildRangeAggregationSampling(rn, labels)
n.Input = o.buildRangeAggregationSampling(rn, labels, lg)

return n
case *logqlengine.LabelReplace:
n.Input = o.optimizeSampling(n.Input)
n.Input = o.optimizeSampling(n.Input, lg)
return n
case *logqlengine.LiteralBinOp:
n.Input = o.optimizeSampling(n.Input)
n.Input = o.optimizeSampling(n.Input, lg)
return n
case *logqlengine.BinOp:
n.Left = o.optimizeSampling(n.Left)
n.Right = o.optimizeSampling(n.Right)
n.Left = o.optimizeSampling(n.Left, lg)
n.Right = o.optimizeSampling(n.Right, lg)
return n
default:
return n
}
}

func (o *ClickhouseOptimizer) buildRangeAggregationSampling(n *logqlengine.RangeAggregation, grouping []logql.Label) logqlengine.MetricNode {
func (o *ClickhouseOptimizer) buildRangeAggregationSampling(n *logqlengine.RangeAggregation, grouping []logql.Label, lg *zap.Logger) logqlengine.MetricNode {
if g := n.Expr.Grouping; g != nil {
return n
}
Expand All @@ -93,6 +103,12 @@ func (o *ClickhouseOptimizer) buildRangeAggregationSampling(n *logqlengine.Range
return n
}

if ce := lg.Check(zap.DebugLevel, "Sampling could be offloaded to Clickhouse"); ce != nil {
ce.Write(
zap.Stringer("sampling_op", samplingOp),
zap.Stringers("grouping_labels", grouping),
)
}
n.Input = &SamplingNode{
Sel: pipelineNode.Sel,
Sampling: samplingOp,
Expand Down Expand Up @@ -123,7 +139,7 @@ func getSamplingOp(e *logql.RangeAggregationExpr) (op SamplingOp, _ bool) {
}
}

func (o *ClickhouseOptimizer) optimizePipeline(n logqlengine.PipelineNode) logqlengine.PipelineNode {
func (o *ClickhouseOptimizer) optimizePipeline(n logqlengine.PipelineNode, lg *zap.Logger) logqlengine.PipelineNode {
pn, ok := n.(*logqlengine.ProcessorNode)
if !ok {
return n
Expand All @@ -137,11 +153,26 @@ func (o *ClickhouseOptimizer) optimizePipeline(n logqlengine.PipelineNode) logql
}

sn.Sel.Line = o.offloadLineFilters(pn.Pipeline)
if f := sn.Sel.Line; len(f) > 0 {
if ce := lg.Check(zap.DebugLevel, "Offloading line filters"); ce != nil {
ce.Write(zap.Stringers("line_filters", f))
}
}

sn.Sel.PipelineLabels = o.offloadLabelFilters(pn.Pipeline)
if f := sn.Sel.PipelineLabels; len(f) > 0 {
if ce := lg.Check(zap.DebugLevel, "Offloading pipeline label filters"); ce != nil {
ce.Write(zap.Stringers("pipeline_labels", f))
}
}

offloaded := len(sn.Sel.Line) + len(sn.Sel.PipelineLabels)
// Replace original node with [InputNode], since we can execute filtering entirely in
// Clickhouse.
if len(pn.Pipeline) == offloaded && !pn.EnableOTELAdapter {
lg.Debug("Pipeline could be fully offloaded to Clickhouse",
zap.Stringer("selector", logql.Selector{Matchers: sn.Sel.Labels}),
)
return sn
}
return n
Expand All @@ -156,8 +187,7 @@ stageLoop:
continue
}
filters = append(filters, stage.Pred)
case *logql.LineFormat,
*logql.DecolorizeExpr,
case *logql.DecolorizeExpr,
*logql.LineFilter:
// Do nothing on label set, just skip.
default:
Expand Down
15 changes: 11 additions & 4 deletions internal/logql/expr.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,14 @@ type Expr interface {
expr()
}

func (*ParenExpr) expr() {}

func (*ParenExpr) metricExpr() {}

// ParenExpr is parenthesized Expr.
type ParenExpr struct {
X Expr
}

func (*ParenExpr) expr() {}
func (*ParenExpr) metricExpr() {}

// UnparenExpr recursively extracts expression from parentheses.
func UnparenExpr(e Expr) Expr {
p, ok := e.(*ParenExpr)
Expand All @@ -22,3 +21,11 @@ func UnparenExpr(e Expr) Expr {
}
return UnparenExpr(p.X)
}

// ExplainExpr is a wrapper around Expr to explain.
type ExplainExpr struct {
X Expr
}

func (*ExplainExpr) expr() {}
func (*ExplainExpr) metricExpr() {}
12 changes: 11 additions & 1 deletion internal/logql/lexer/lexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,20 @@ func (l *lexer) setError(msg string, pos scanner.Position) {
func (l *lexer) nextToken(r rune, text string) (tok Token, _ bool) {
tok.Pos = l.scanner.Position
tok.Text = text
if r == '-' && l.scanner.Peek() == '-' {
switch c := [2]rune{r, l.scanner.Peek()}; c {
case [2]rune{'-', '-'}:
tok.Type = ParserFlag
tok.Text = scanFlag(&l.scanner, text)
return tok, true
case [2]rune{'@', 'e'}:
l.scanner.Scan()
if l.scanner.TokenText() == "explain" {
tok.Type = Explain
tok.Text = "@explain"
return tok, true
}
l.setError(fmt.Sprintf("unexpected pragma %q", l.scanner.TokenText()), tok.Pos)
return tok, false
}
switch r {
case scanner.Int, scanner.Float:
Expand Down
18 changes: 18 additions & 0 deletions internal/logql/lexer/lexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,20 @@ var tests = []TestCase{
},
false,
},

// Explain token.
{
`@explain {foo =~ "bar"}`,
[]Token{
{Type: Explain, Text: "@explain"},
{Type: OpenBrace, Text: "{"},
{Type: Ident, Text: "foo"},
{Type: Re, Text: "=~"},
{Type: String, Text: "bar"},
{Type: CloseBrace, Text: "}"},
},
false,
},
}

func TestTokenize(t *testing.T) {
Expand Down Expand Up @@ -300,6 +314,10 @@ func TestTokenizeErrors(t *testing.T) {
`0ee1`,
`at test.ql:1:1: exponent has no digits`,
},
{
`@error`,
`at test.ql:1:1: unexpected pragma "error"`,
},
}
for i, tt := range tests {
tt := tt
Expand Down
5 changes: 5 additions & 0 deletions internal/logql/lexer/token.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,9 @@ const (
DurationSecondsConv

ParserFlag

// Extension: for query debugging purposes.
Explain
)

var tokens = map[string]TokenType{
Expand Down Expand Up @@ -251,4 +254,6 @@ var tokens = map[string]TokenType{
"bytes": BytesConv,
"duration": DurationConv,
"duration_seconds": DurationSecondsConv,

"@explain": Explain,
}
5 changes: 3 additions & 2 deletions internal/logql/lexer/tokentype_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 0591eaa

Please sign in to comment.