Skip to content

Commit

Permalink
feat(logqlengine): explain query
Browse files Browse the repository at this point in the history
explain query
  • Loading branch information
tdakkota committed Jul 8, 2024
1 parent b255f2b commit 1a67c55
Show file tree
Hide file tree
Showing 4 changed files with 183 additions and 7 deletions.
11 changes: 10 additions & 1 deletion internal/chstorage/querier_logs_optimizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (

"github.com/go-faster/oteldb/internal/logql"
"github.com/go-faster/oteldb/internal/logql/logqlengine"
"github.com/go-faster/sdk/zctx"
"go.uber.org/zap"
)

// ClickhouseOptimizer replaces LogQL engine execution
Expand All @@ -19,7 +21,14 @@ func (o *ClickhouseOptimizer) Name() string {
}

// Optimize implements [Optimizer].
func (o *ClickhouseOptimizer) Optimize(ctx context.Context, q logqlengine.Query) (logqlengine.Query, error) {
func (o *ClickhouseOptimizer) Optimize(ctx context.Context, q logqlengine.Query, opts logqlengine.OptimizeOptions) (logqlengine.Query, error) {
lg := zap.NewNop()
if opts.Explain {
lg = zctx.From(ctx).With(
zap.String("optimizer", o.Name()),
)
}

switch q := q.(type) {
case *logqlengine.LogQuery:
q.Root = o.optimizePipeline(q.Root)
Expand Down
21 changes: 18 additions & 3 deletions internal/logql/logqlengine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (e *Engine) ParseOptions() logql.ParseOptions {
}

// NewQuery creates new [Query].
func (e *Engine) NewQuery(ctx context.Context, query string) (_ Query, rerr error) {
func (e *Engine) NewQuery(ctx context.Context, query string) (q Query, rerr error) {
ctx, span := e.tracer.Start(ctx, "logql.Engine.NewQuery", trace.WithAttributes(
attribute.String("logql.query", query),
))
Expand All @@ -116,12 +116,25 @@ func (e *Engine) NewQuery(ctx context.Context, query string) (_ Query, rerr erro
return nil, errors.Wrap(err, "parse")
}

q, err := e.buildQuery(ctx, expr)
_, explain := expr.(*logql.ExplainExpr)
if explain {
logs := new(explainLogs)
ctx = logs.InjectLogger(ctx)

defer func() {
q = &ExplainQuery{
Explain: q,
logs: logs,
}
}()
}

q, err = e.buildQuery(ctx, expr)
if err != nil {
return nil, err
}

q, err = e.applyOptimizers(ctx, q)
q, err = e.applyOptimizers(ctx, q, OptimizeOptions{Explain: explain})
if err != nil {
return nil, errors.Wrap(err, "optimize")
}
Expand All @@ -131,6 +144,8 @@ func (e *Engine) NewQuery(ctx context.Context, query string) (_ Query, rerr erro

func (e *Engine) buildQuery(ctx context.Context, expr logql.Expr) (_ Query, rerr error) {
switch expr := logql.UnparenExpr(expr).(type) {
case *logql.ExplainExpr:
return e.buildQuery(ctx, expr.X)
case *logql.LogExpr:
return e.buildLogQuery(ctx, expr)
case *logql.LiteralExpr:
Expand Down
147 changes: 147 additions & 0 deletions internal/logql/logqlengine/engine_explain_query.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
package logqlengine

import (
"context"
"fmt"
"slices"
"sync"
"time"

"github.com/go-faster/sdk/zctx"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"github.com/go-faster/oteldb/internal/lokiapi"
)

type explainLogs struct {
entries []lokiapi.LogEntry
mux sync.Mutex
}

func (e *explainLogs) InjectLogger(ctx context.Context) context.Context {
lg := zctx.From(ctx).WithOptions(
zap.WrapCore(func(c zapcore.Core) zapcore.Core {
return zapcore.NewTee(c, e.Core())
}),
)
return zctx.Base(ctx, lg.Named("explain"))
}

func (e *explainLogs) Core() zapcore.Core {
return &explainCore{
enc: zapcore.NewConsoleEncoder(zap.NewDevelopmentEncoderConfig()),
with: nil,
logs: e,
}
}

func (e *explainLogs) Data() lokiapi.QueryResponseData {
e.mux.Lock()
defer e.mux.Unlock()

stream := lokiapi.Stream{
Stream: lokiapi.NewOptLabelSet(lokiapi.LabelSet{"log": "explain"}),
Values: e.entries,
}

return lokiapi.QueryResponseData{
Type: lokiapi.StreamsResultQueryResponseData,
StreamsResult: lokiapi.StreamsResult{
Result: lokiapi.Streams{stream},
},
}
}

func (e *explainLogs) Push(entry lokiapi.LogEntry) {
e.mux.Lock()
defer e.mux.Unlock()

e.entries = append(e.entries, entry)
}

type explainCore struct {
enc zapcore.Encoder
with []zapcore.Field
logs *explainLogs
}

var _ zapcore.Core = (*explainCore)(nil)

func (t *explainCore) Enabled(zapcore.Level) bool {
return true
}

// With adds structured context to the Core.
func (t *explainCore) With(fields []zapcore.Field) zapcore.Core {
return &explainCore{
enc: t.enc.Clone(),
with: append(slices.Clone(t.with), fields...),
logs: t.logs,
}
}

// Check determines whether the supplied Entry should be logged (using the
// embedded LevelEnabler and possibly some extra logic). If the entry
// should be logged, the Core adds itself to the CheckedEntry and returns
// the result.
//
// Callers must use Check before calling Write.
func (t *explainCore) Check(ent zapcore.Entry, ce *zapcore.CheckedEntry) *zapcore.CheckedEntry {
return ce.AddCore(ent, t)
}

// Write serializes the Entry and any Fields supplied at the log site and
// writes them to their destination.
//
// If called, Write should always log the Entry and Fields; it should not
// replicate the logic of Check.
func (t *explainCore) Write(e zapcore.Entry, fields []zapcore.Field) error {
buf, err := t.enc.EncodeEntry(e, fields)
if err != nil {
return err
}
t.logs.Push(lokiapi.LogEntry{
T: uint64(e.Time.UnixNano()),
V: buf.String(),
})
return nil
}

// Sync flushes buffered logs (if any).
func (t *explainCore) Sync() error {
return nil
}

// ExplainQuery is simple Explain expression query.
type ExplainQuery struct {
Explain Query

logs *explainLogs
}

var _ Query = (*ExplainQuery)(nil)

// Eval implements [Query].
func (q *ExplainQuery) Eval(ctx context.Context, params EvalParams) (lokiapi.QueryResponseData, error) {
ctx = q.logs.InjectLogger(ctx)
lg := zctx.From(ctx)

var (
start = time.Now()
resultField zap.Field
)
data, err := q.Explain.Eval(ctx, params)
if err != nil {
resultField = zap.Error(err)
} else {
resultField = zap.String("data_type", string(data.Type))
}
lg.Debug("Evaluated query",
zap.String("query_type", fmt.Sprintf("%T", q.Explain)),
resultField,
zap.Duration("took", time.Since(start)),
)

return q.logs.Data(), nil
}
11 changes: 8 additions & 3 deletions internal/logql/logqlengine/engine_optimizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,20 @@ import (
type Optimizer interface {
// Name returns optimizer name.
Name() string
Optimize(ctx context.Context, q Query) (Query, error)
Optimize(ctx context.Context, q Query, opts OptimizeOptions) (Query, error)
}

// OptimizeOptions defines options for [Optimizer.Optimize].
type OptimizeOptions struct {
Explain bool
}

// DefaultOptimizers returns slice of default [Optimizer]s.
func DefaultOptimizers() []Optimizer {
return []Optimizer{}
}

func (e *Engine) applyOptimizers(ctx context.Context, q Query) (_ Query, rerr error) {
func (e *Engine) applyOptimizers(ctx context.Context, q Query, opts OptimizeOptions) (_ Query, rerr error) {
ctx, span := e.tracer.Start(ctx, "logql.Engine.applyOptimizers")
defer func() {
if rerr != nil {
Expand All @@ -29,7 +34,7 @@ func (e *Engine) applyOptimizers(ctx context.Context, q Query) (_ Query, rerr er

var err error
for _, o := range e.optimizers {
q, err = o.Optimize(ctx, q)
q, err = o.Optimize(ctx, q, opts)
if err != nil {
return nil, errors.Wrapf(err, "optimizer %q failed", o.Name())
}
Expand Down

0 comments on commit 1a67c55

Please sign in to comment.