Skip to content

Commit

Permalink
chore(chstorage): capture Clickhouse logs when explaining LogQL query
Browse files Browse the repository at this point in the history
  • Loading branch information
tdakkota committed Jul 9, 2024
1 parent 0cc18ab commit b389543
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 15 deletions.
12 changes: 12 additions & 0 deletions internal/chstorage/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"time"

"github.com/ClickHouse/ch-go"
"github.com/ClickHouse/ch-go/proto"
"github.com/go-faster/errors"
"github.com/go-faster/sdk/zctx"
Expand All @@ -15,6 +16,7 @@ import (
"go.uber.org/zap"

"github.com/go-faster/oteldb/internal/chstorage/chsql"
"github.com/go-faster/oteldb/internal/logql/logqlengine"
"github.com/go-faster/oteldb/internal/tracestorage"
)

Expand Down Expand Up @@ -91,6 +93,8 @@ type selectQuery struct {
ExternalData []proto.InputColumn
ExternalTable string

TraceLogs bool

Type string
Signal string
Table string
Expand All @@ -107,6 +111,14 @@ func (q *Querier) do(ctx context.Context, s selectQuery) error {
query.ExternalTable = s.ExternalTable
query.Logger = lg.Named("ch")

if logqlengine.IsExplainQuery(ctx) {
query.Settings = append(query.Settings, ch.Setting{
Key: "send_logs_level",
Value: "trace",
Important: true,
})
}

queryStartTime := time.Now()
err = q.ch.Do(ctx, query)
took := time.Since(queryStartTime)
Expand Down
4 changes: 2 additions & 2 deletions internal/chstorage/querier_logs_optimizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ func (o *ClickhouseOptimizer) Name() string {
}

// Optimize implements [Optimizer].
func (o *ClickhouseOptimizer) Optimize(ctx context.Context, q logqlengine.Query, opts logqlengine.OptimizeOptions) (logqlengine.Query, error) {
func (o *ClickhouseOptimizer) Optimize(ctx context.Context, q logqlengine.Query) (logqlengine.Query, error) {
lg := zap.NewNop()
if opts.Explain {
if logqlengine.IsExplainQuery(ctx) {
lg = zctx.From(ctx).With(
zap.String("optimizer", o.Name()),
)
Expand Down
7 changes: 3 additions & 4 deletions internal/logql/logqlengine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,9 @@ func (e *Engine) NewQuery(ctx context.Context, query string) (q Query, rerr erro
return nil, errors.Wrap(err, "parse")
}

_, explain := expr.(*logql.ExplainExpr)
if explain {
if _, explain := expr.(*logql.ExplainExpr); explain {
logs := new(explainLogs)
ctx = logs.InjectLogger(ctx)
ctx = buildExplainQuery(ctx, logs)

defer func() {
q = &ExplainQuery{
Expand All @@ -134,7 +133,7 @@ func (e *Engine) NewQuery(ctx context.Context, query string) (q Query, rerr erro
return nil, err
}

q, err = e.applyOptimizers(ctx, q, OptimizeOptions{Explain: explain})
q, err = e.applyOptimizers(ctx, q)
if err != nil {
return nil, errors.Wrap(err, "optimize")
}
Expand Down
22 changes: 21 additions & 1 deletion internal/logql/logqlengine/engine_explain_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,26 @@ import (
"github.com/go-faster/oteldb/internal/lokiapi"
)

type (
explainKey struct{}
explainTracer struct{}
)

// IsExplainQuery whether if this LogQL query is explained.
func IsExplainQuery(ctx context.Context) bool {
_, ok := ctx.Value(explainKey{}).(*explainTracer)
return ok
}

func buildExplainQuery(ctx context.Context, logs *explainLogs) context.Context {
return startExplainQuery(ctx, logs)
}

func startExplainQuery(ctx context.Context, logs *explainLogs) context.Context {
ctx = context.WithValue(ctx, explainKey{}, &explainTracer{})
return logs.InjectLogger(ctx)
}

type explainLogs struct {
entries []lokiapi.LogEntry
mux sync.Mutex
Expand Down Expand Up @@ -124,7 +144,7 @@ var _ Query = (*ExplainQuery)(nil)

// Eval implements [Query].
func (q *ExplainQuery) Eval(ctx context.Context, params EvalParams) (lokiapi.QueryResponseData, error) {
ctx = q.logs.InjectLogger(ctx)
ctx = startExplainQuery(ctx, q.logs)
lg := zctx.From(ctx)

var (
Expand Down
11 changes: 3 additions & 8 deletions internal/logql/logqlengine/engine_optimizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,15 @@ import (
type Optimizer interface {
// Name returns optimizer name.
Name() string
Optimize(ctx context.Context, q Query, opts OptimizeOptions) (Query, error)
}

// OptimizeOptions defines options for [Optimizer.Optimize].
type OptimizeOptions struct {
Explain bool
Optimize(ctx context.Context, q Query) (Query, error)
}

// DefaultOptimizers returns slice of default [Optimizer]s.
func DefaultOptimizers() []Optimizer {
return []Optimizer{}
}

func (e *Engine) applyOptimizers(ctx context.Context, q Query, opts OptimizeOptions) (_ Query, rerr error) {
func (e *Engine) applyOptimizers(ctx context.Context, q Query) (_ Query, rerr error) {
ctx, span := e.tracer.Start(ctx, "logql.Engine.applyOptimizers")
defer func() {
if rerr != nil {
Expand All @@ -34,7 +29,7 @@ func (e *Engine) applyOptimizers(ctx context.Context, q Query, opts OptimizeOpti

var err error
for _, o := range e.optimizers {
q, err = o.Optimize(ctx, q, opts)
q, err = o.Optimize(ctx, q)
if err != nil {
return nil, errors.Wrapf(err, "optimizer %q failed", o.Name())
}
Expand Down

0 comments on commit b389543

Please sign in to comment.