From 1a67c555b1a387be88a17058e1f664253ff99484 Mon Sep 17 00:00:00 2001 From: tdakkota Date: Fri, 5 Jul 2024 18:24:45 +0300 Subject: [PATCH] feat(logqlengine): explain query explain query --- internal/chstorage/querier_logs_optimizer.go | 11 +- internal/logql/logqlengine/engine.go | 21 ++- .../logql/logqlengine/engine_explain_query.go | 147 ++++++++++++++++++ .../logql/logqlengine/engine_optimizer.go | 11 +- 4 files changed, 183 insertions(+), 7 deletions(-) create mode 100644 internal/logql/logqlengine/engine_explain_query.go diff --git a/internal/chstorage/querier_logs_optimizer.go b/internal/chstorage/querier_logs_optimizer.go index 2bd97a54..37acb15f 100644 --- a/internal/chstorage/querier_logs_optimizer.go +++ b/internal/chstorage/querier_logs_optimizer.go @@ -5,6 +5,8 @@ import ( "github.com/go-faster/oteldb/internal/logql" "github.com/go-faster/oteldb/internal/logql/logqlengine" + "github.com/go-faster/sdk/zctx" + "go.uber.org/zap" ) // ClickhouseOptimizer replaces LogQL engine execution @@ -19,7 +21,14 @@ func (o *ClickhouseOptimizer) Name() string { } // Optimize implements [Optimizer]. -func (o *ClickhouseOptimizer) Optimize(ctx context.Context, q logqlengine.Query) (logqlengine.Query, error) { +func (o *ClickhouseOptimizer) Optimize(ctx context.Context, q logqlengine.Query, opts logqlengine.OptimizeOptions) (logqlengine.Query, error) { + lg := zap.NewNop() + if opts.Explain { + lg = zctx.From(ctx).With( + zap.String("optimizer", o.Name()), + ) + } + switch q := q.(type) { case *logqlengine.LogQuery: q.Root = o.optimizePipeline(q.Root) diff --git a/internal/logql/logqlengine/engine.go b/internal/logql/logqlengine/engine.go index ef9f5e5f..e54157cb 100644 --- a/internal/logql/logqlengine/engine.go +++ b/internal/logql/logqlengine/engine.go @@ -100,7 +100,7 @@ func (e *Engine) ParseOptions() logql.ParseOptions { } // NewQuery creates new [Query]. -func (e *Engine) NewQuery(ctx context.Context, query string) (_ Query, rerr error) { +func (e *Engine) NewQuery(ctx context.Context, query string) (q Query, rerr error) { ctx, span := e.tracer.Start(ctx, "logql.Engine.NewQuery", trace.WithAttributes( attribute.String("logql.query", query), )) @@ -116,12 +116,25 @@ func (e *Engine) NewQuery(ctx context.Context, query string) (_ Query, rerr erro return nil, errors.Wrap(err, "parse") } - q, err := e.buildQuery(ctx, expr) + _, explain := expr.(*logql.ExplainExpr) + if explain { + logs := new(explainLogs) + ctx = logs.InjectLogger(ctx) + + defer func() { + q = &ExplainQuery{ + Explain: q, + logs: logs, + } + }() + } + + q, err = e.buildQuery(ctx, expr) if err != nil { return nil, err } - q, err = e.applyOptimizers(ctx, q) + q, err = e.applyOptimizers(ctx, q, OptimizeOptions{Explain: explain}) if err != nil { return nil, errors.Wrap(err, "optimize") } @@ -131,6 +144,8 @@ func (e *Engine) NewQuery(ctx context.Context, query string) (_ Query, rerr erro func (e *Engine) buildQuery(ctx context.Context, expr logql.Expr) (_ Query, rerr error) { switch expr := logql.UnparenExpr(expr).(type) { + case *logql.ExplainExpr: + return e.buildQuery(ctx, expr.X) case *logql.LogExpr: return e.buildLogQuery(ctx, expr) case *logql.LiteralExpr: diff --git a/internal/logql/logqlengine/engine_explain_query.go b/internal/logql/logqlengine/engine_explain_query.go new file mode 100644 index 00000000..4e61f3de --- /dev/null +++ b/internal/logql/logqlengine/engine_explain_query.go @@ -0,0 +1,147 @@ +package logqlengine + +import ( + "context" + "fmt" + "slices" + "sync" + "time" + + "github.com/go-faster/sdk/zctx" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + + "github.com/go-faster/oteldb/internal/lokiapi" +) + +type explainLogs struct { + entries []lokiapi.LogEntry + mux sync.Mutex +} + +func (e *explainLogs) InjectLogger(ctx context.Context) context.Context { + lg := zctx.From(ctx).WithOptions( + zap.WrapCore(func(c zapcore.Core) zapcore.Core { + return zapcore.NewTee(c, e.Core()) + }), + ) + return zctx.Base(ctx, lg.Named("explain")) +} + +func (e *explainLogs) Core() zapcore.Core { + return &explainCore{ + enc: zapcore.NewConsoleEncoder(zap.NewDevelopmentEncoderConfig()), + with: nil, + logs: e, + } +} + +func (e *explainLogs) Data() lokiapi.QueryResponseData { + e.mux.Lock() + defer e.mux.Unlock() + + stream := lokiapi.Stream{ + Stream: lokiapi.NewOptLabelSet(lokiapi.LabelSet{"log": "explain"}), + Values: e.entries, + } + + return lokiapi.QueryResponseData{ + Type: lokiapi.StreamsResultQueryResponseData, + StreamsResult: lokiapi.StreamsResult{ + Result: lokiapi.Streams{stream}, + }, + } +} + +func (e *explainLogs) Push(entry lokiapi.LogEntry) { + e.mux.Lock() + defer e.mux.Unlock() + + e.entries = append(e.entries, entry) +} + +type explainCore struct { + enc zapcore.Encoder + with []zapcore.Field + logs *explainLogs +} + +var _ zapcore.Core = (*explainCore)(nil) + +func (t *explainCore) Enabled(zapcore.Level) bool { + return true +} + +// With adds structured context to the Core. +func (t *explainCore) With(fields []zapcore.Field) zapcore.Core { + return &explainCore{ + enc: t.enc.Clone(), + with: append(slices.Clone(t.with), fields...), + logs: t.logs, + } +} + +// Check determines whether the supplied Entry should be logged (using the +// embedded LevelEnabler and possibly some extra logic). If the entry +// should be logged, the Core adds itself to the CheckedEntry and returns +// the result. +// +// Callers must use Check before calling Write. +func (t *explainCore) Check(ent zapcore.Entry, ce *zapcore.CheckedEntry) *zapcore.CheckedEntry { + return ce.AddCore(ent, t) +} + +// Write serializes the Entry and any Fields supplied at the log site and +// writes them to their destination. +// +// If called, Write should always log the Entry and Fields; it should not +// replicate the logic of Check. +func (t *explainCore) Write(e zapcore.Entry, fields []zapcore.Field) error { + buf, err := t.enc.EncodeEntry(e, fields) + if err != nil { + return err + } + t.logs.Push(lokiapi.LogEntry{ + T: uint64(e.Time.UnixNano()), + V: buf.String(), + }) + return nil +} + +// Sync flushes buffered logs (if any). +func (t *explainCore) Sync() error { + return nil +} + +// ExplainQuery is simple Explain expression query. +type ExplainQuery struct { + Explain Query + + logs *explainLogs +} + +var _ Query = (*ExplainQuery)(nil) + +// Eval implements [Query]. +func (q *ExplainQuery) Eval(ctx context.Context, params EvalParams) (lokiapi.QueryResponseData, error) { + ctx = q.logs.InjectLogger(ctx) + lg := zctx.From(ctx) + + var ( + start = time.Now() + resultField zap.Field + ) + data, err := q.Explain.Eval(ctx, params) + if err != nil { + resultField = zap.Error(err) + } else { + resultField = zap.String("data_type", string(data.Type)) + } + lg.Debug("Evaluated query", + zap.String("query_type", fmt.Sprintf("%T", q.Explain)), + resultField, + zap.Duration("took", time.Since(start)), + ) + + return q.logs.Data(), nil +} diff --git a/internal/logql/logqlengine/engine_optimizer.go b/internal/logql/logqlengine/engine_optimizer.go index 719d1ce9..4e0ea442 100644 --- a/internal/logql/logqlengine/engine_optimizer.go +++ b/internal/logql/logqlengine/engine_optimizer.go @@ -10,7 +10,12 @@ import ( type Optimizer interface { // Name returns optimizer name. Name() string - Optimize(ctx context.Context, q Query) (Query, error) + Optimize(ctx context.Context, q Query, opts OptimizeOptions) (Query, error) +} + +// OptimizeOptions defines options for [Optimizer.Optimize]. +type OptimizeOptions struct { + Explain bool } // DefaultOptimizers returns slice of default [Optimizer]s. @@ -18,7 +23,7 @@ func DefaultOptimizers() []Optimizer { return []Optimizer{} } -func (e *Engine) applyOptimizers(ctx context.Context, q Query) (_ Query, rerr error) { +func (e *Engine) applyOptimizers(ctx context.Context, q Query, opts OptimizeOptions) (_ Query, rerr error) { ctx, span := e.tracer.Start(ctx, "logql.Engine.applyOptimizers") defer func() { if rerr != nil { @@ -29,7 +34,7 @@ func (e *Engine) applyOptimizers(ctx context.Context, q Query) (_ Query, rerr er var err error for _, o := range e.optimizers { - q, err = o.Optimize(ctx, q) + q, err = o.Optimize(ctx, q, opts) if err != nil { return nil, errors.Wrapf(err, "optimizer %q failed", o.Name()) }