diff --git a/pkg/logql/engine.go b/pkg/logql/engine.go index 401ef8f1107a2..fdd5e7f51bbc7 100644 --- a/pkg/logql/engine.go +++ b/pkg/logql/engine.go @@ -118,12 +118,17 @@ type EngineOpts struct { // MaxLookBackPeriod is the maximum amount of time to look back for log lines. // only used for instant log queries. MaxLookBackPeriod time.Duration `yaml:"max_look_back_period"` + + // LogExecutingQuery will control if we log the query when Exec is called. + LogExecutingQuery bool `yaml:"-"` } func (opts *EngineOpts) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { // TODO: remove this configuration after next release. f.DurationVar(&opts.Timeout, prefix+".engine.timeout", DefaultEngineTimeout, "Use querier.query-timeout instead. Timeout for query execution.") f.DurationVar(&opts.MaxLookBackPeriod, prefix+".engine.max-lookback-period", 30*time.Second, "The maximum amount of time to look back for log lines. Used only for instant log queries.") + // Log executing query by default + opts.LogExecutingQuery = true } func (opts *EngineOpts) applyDefault() { @@ -138,6 +143,7 @@ type Engine struct { logger log.Logger evaluator Evaluator limits Limits + opts EngineOpts } // NewEngine creates a new LogQL Engine. @@ -152,6 +158,7 @@ func NewEngine(opts EngineOpts, q Querier, l Limits, logger log.Logger) *Engine evaluator: NewDefaultEvaluator(q, opts.MaxLookBackPeriod), limits: l, Timeout: queryTimeout, + opts: opts, } } @@ -164,8 +171,9 @@ func (ng *Engine) Query(params Params) Query { parse: func(_ context.Context, query string) (syntax.Expr, error) { return syntax.ParseExpr(query) }, - record: true, - limits: ng.limits, + record: true, + logExecQuery: ng.opts.LogExecutingQuery, + limits: ng.limits, } } @@ -176,12 +184,13 @@ type Query interface { } type query struct { - logger log.Logger - params Params - parse func(context.Context, string) (syntax.Expr, error) - limits Limits - evaluator Evaluator - record bool + logger log.Logger + params Params + parse func(context.Context, string) (syntax.Expr, error) + limits Limits + evaluator Evaluator + record bool + logExecQuery bool } func (q *query) resultLength(res promql_parser.Value) int { @@ -203,11 +212,13 @@ func (q *query) Exec(ctx context.Context) (logqlmodel.Result, error) { log, ctx := spanlogger.New(ctx, "query.Exec") defer log.Finish() - queryHash := hashedQuery(q.params.Query()) - if GetRangeType(q.params) == InstantType { - level.Info(logutil.WithContext(ctx, q.logger)).Log("msg", "executing query", "type", "instant", "query", q.params.Query(), "query_hash", queryHash) - } else { - level.Info(logutil.WithContext(ctx, q.logger)).Log("msg", "executing query", "type", "range", "query", q.params.Query(), "length", q.params.End().Sub(q.params.Start()), "step", q.params.Step(), "query_hash", queryHash) + if q.logExecQuery { + queryHash := HashedQuery(q.params.Query()) + if GetRangeType(q.params) == InstantType { + level.Info(logutil.WithContext(ctx, q.logger)).Log("msg", "executing query", "type", "instant", "query", q.params.Query(), "query_hash", queryHash) + } else { + level.Info(logutil.WithContext(ctx, q.logger)).Log("msg", "executing query", "type", "range", "query", q.params.Query(), "length", q.params.End().Sub(q.params.Start()), "step", q.params.Step(), "query_hash", queryHash) + } } rangeType := GetRangeType(q.params) diff --git a/pkg/logql/engine_test.go b/pkg/logql/engine_test.go index 381f14c83d88f..bb1945909dddf 100644 --- a/pkg/logql/engine_test.go +++ b/pkg/logql/engine_test.go @@ -2496,7 +2496,7 @@ func TestHashingStability(t *testing.T) { queryWithEngine := func() string { buf := bytes.NewBufferString("") logger := log.NewLogfmtLogger(buf) - eng := NewEngine(EngineOpts{}, getLocalQuerier(4), NoLimits, logger) + eng := NewEngine(EngineOpts{LogExecutingQuery: true}, getLocalQuerier(4), NoLimits, logger) query := eng.Query(params) _, err := query.Exec(ctx) require.NoError(t, err) @@ -2526,7 +2526,7 @@ func TestHashingStability(t *testing.T) { {`sum (count_over_time({app="myapp",env="myenv"} |= "error" |= "metrics.go" | logfmt [10s])) by(query_hash)`}, } { params.qs = test.qs - expectedQueryHash := hashedQuery(test.qs) + expectedQueryHash := HashedQuery(test.qs) // check that both places will end up having the same query hash, even though they're emitting different log lines. require.Regexp(t, diff --git a/pkg/logql/metrics.go b/pkg/logql/metrics.go index de9068b2c64c9..cd4a3f863e448 100644 --- a/pkg/logql/metrics.go +++ b/pkg/logql/metrics.go @@ -114,7 +114,7 @@ func RecordRangeAndInstantQueryMetrics( logValues = append(logValues, []interface{}{ "latency", latencyType, // this can be used to filter log lines. "query", p.Query(), - "query_hash", hashedQuery(p.Query()), + "query_hash", HashedQuery(p.Query()), "query_type", queryType, "range_type", rt, "length", p.End().Sub(p.Start()), @@ -164,7 +164,7 @@ func RecordRangeAndInstantQueryMetrics( recordUsageStats(queryType, stats) } -func hashedQuery(query string) uint32 { +func HashedQuery(query string) uint32 { h := fnv.New32() _, _ = h.Write([]byte(query)) return h.Sum32() @@ -213,6 +213,11 @@ func RecordLabelQueryMetrics( ingesterLineTotal.Add(float64(stats.Ingester.TotalLinesSent)) } +func PrintMatches(matches []string) string { + // not using comma (,) as separator as matcher may already have comma (e.g: `{a="b", c="d"}`) + return strings.Join(matches, ":") +} + func RecordSeriesQueryMetrics( ctx context.Context, log log.Logger, @@ -240,7 +245,7 @@ func RecordSeriesQueryMetrics( "length", end.Sub(start), "duration", time.Duration(int64(stats.Summary.ExecTime*float64(time.Second))), "status", status, - "match", strings.Join(match, ":"), // not using comma (,) as separator as matcher may already have comma (e.g: `{a="b", c="d"}`) + "match", PrintMatches(match), "throughput", strings.Replace(humanize.Bytes(uint64(stats.Summary.BytesProcessedPerSecond)), " ", "", 1), "total_bytes", strings.Replace(humanize.Bytes(uint64(stats.Summary.TotalBytesProcessed)), " ", "", 1), "total_entries", stats.Summary.TotalEntriesReturned, diff --git a/pkg/logql/metrics_test.go b/pkg/logql/metrics_test.go index 48d9e5d6bd0f0..e0c4b64749be7 100644 --- a/pkg/logql/metrics_test.go +++ b/pkg/logql/metrics_test.go @@ -189,11 +189,11 @@ func Test_testToKeyValues(t *testing.T) { } func TestQueryHashing(t *testing.T) { - h1 := hashedQuery(`{app="myapp",env="myenv"} |= "error" |= "metrics.go" |= logfmt`) - h2 := hashedQuery(`{app="myapp",env="myenv"} |= "error" |= logfmt |= "metrics.go"`) + h1 := HashedQuery(`{app="myapp",env="myenv"} |= "error" |= "metrics.go" |= logfmt`) + h2 := HashedQuery(`{app="myapp",env="myenv"} |= "error" |= logfmt |= "metrics.go"`) // check that it capture differences of order. require.NotEqual(t, h1, h2) - h3 := hashedQuery(`{app="myapp",env="myenv"} |= "error" |= "metrics.go" |= logfmt`) + h3 := HashedQuery(`{app="myapp",env="myenv"} |= "error" |= "metrics.go" |= logfmt`) // check that it evaluate same queries as same hashes, even if evaluated at different timestamps. require.Equal(t, h1, h3) } diff --git a/pkg/querier/queryrange/querysharding.go b/pkg/querier/queryrange/querysharding.go index b28db0113d8b6..02203c9014288 100644 --- a/pkg/querier/queryrange/querysharding.go +++ b/pkg/querier/queryrange/querysharding.go @@ -77,7 +77,7 @@ func newASTMapperware( logger: log.With(logger, "middleware", "QueryShard.astMapperware"), limits: limits, next: next, - ng: logql.NewDownstreamEngine(logql.EngineOpts{}, DownstreamHandler{next: next, limits: limits}, limits, logger), + ng: logql.NewDownstreamEngine(logql.EngineOpts{LogExecutingQuery: false}, DownstreamHandler{next: next, limits: limits}, limits, logger), metrics: metrics, } } diff --git a/pkg/querier/queryrange/roundtrip.go b/pkg/querier/queryrange/roundtrip.go index 21886fadf9bd5..506e249c50325 100644 --- a/pkg/querier/queryrange/roundtrip.go +++ b/pkg/querier/queryrange/roundtrip.go @@ -8,6 +8,7 @@ import ( "time" "github.com/go-kit/log" + "github.com/go-kit/log/level" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" @@ -16,11 +17,13 @@ import ( "github.com/grafana/dskit/tenant" "github.com/grafana/loki/pkg/loghttp" + "github.com/grafana/loki/pkg/logql" "github.com/grafana/loki/pkg/logql/syntax" "github.com/grafana/loki/pkg/logqlmodel/stats" "github.com/grafana/loki/pkg/querier/queryrange/queryrangebase" "github.com/grafana/loki/pkg/storage/chunk/cache" "github.com/grafana/loki/pkg/storage/config" + logutil "github.com/grafana/loki/pkg/util/log" "github.com/grafana/loki/pkg/util/validation" ) @@ -100,19 +103,22 @@ func NewTripperware( seriesRT := seriesTripperware(next) labelsRT := labelsTripperware(next) instantRT := instantMetricTripperware(next) - return newRoundTripper(next, logFilterRT, metricRT, seriesRT, labelsRT, instantRT, limits) + return newRoundTripper(log, next, logFilterRT, metricRT, seriesRT, labelsRT, instantRT, limits) }, c, nil } type roundTripper struct { + logger log.Logger + next, log, metric, series, labels, instantMetric http.RoundTripper limits Limits } // newRoundTripper creates a new queryrange roundtripper -func newRoundTripper(next, log, metric, series, labels, instantMetric http.RoundTripper, limits Limits) roundTripper { +func newRoundTripper(logger log.Logger, next, log, metric, series, labels, instantMetric http.RoundTripper, limits Limits) roundTripper { return roundTripper{ + logger: logger, log: log, limits: limits, metric: metric, @@ -139,6 +145,10 @@ func (r roundTripper) RoundTrip(req *http.Request) (*http.Response, error) { if err != nil { return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error()) } + + queryHash := logql.HashedQuery(rangeQuery.Query) + level.Info(logutil.WithContext(req.Context(), r.logger)).Log("msg", "executing query", "type", "range", "query", rangeQuery.Query, "length", rangeQuery.End.Sub(rangeQuery.Start), "step", rangeQuery.Step, "query_hash", queryHash) + switch e := expr.(type) { case syntax.SampleExpr: return r.metric.RoundTrip(req) @@ -157,16 +167,22 @@ func (r roundTripper) RoundTrip(req *http.Request) (*http.Response, error) { return r.next.RoundTrip(req) } case SeriesOp: - _, err := loghttp.ParseAndValidateSeriesQuery(req) + sr, err := loghttp.ParseAndValidateSeriesQuery(req) if err != nil { return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error()) } + + level.Info(logutil.WithContext(req.Context(), r.logger)).Log("msg", "executing query", "type", "series", "match", logql.PrintMatches(sr.Groups), "length", sr.End.Sub(sr.Start)) + return r.series.RoundTrip(req) case LabelNamesOp: - _, err := loghttp.ParseLabelQuery(req) + lr, err := loghttp.ParseLabelQuery(req) if err != nil { return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error()) } + + level.Info(logutil.WithContext(req.Context(), r.logger)).Log("msg", "executing query", "type", "labels", "label", lr.Name, "length", lr.End.Sub(*lr.Start)) + return r.labels.RoundTrip(req) case InstantQueryOp: instantQuery, err := loghttp.ParseInstantQuery(req) @@ -177,6 +193,10 @@ func (r roundTripper) RoundTrip(req *http.Request) (*http.Response, error) { if err != nil { return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error()) } + + queryHash := logql.HashedQuery(instantQuery.Query) + level.Info(logutil.WithContext(req.Context(), r.logger)).Log("msg", "executing query", "type", "instant", "query", instantQuery.Query, "query_hash", queryHash) + switch expr.(type) { case syntax.SampleExpr: return r.instantMetric.RoundTrip(req) diff --git a/pkg/querier/queryrange/roundtrip_test.go b/pkg/querier/queryrange/roundtrip_test.go index 544c4285f735a..bd202cce69084 100644 --- a/pkg/querier/queryrange/roundtrip_test.go +++ b/pkg/querier/queryrange/roundtrip_test.go @@ -437,6 +437,7 @@ func TestPostQueries(t *testing.T) { req = req.WithContext(user.InjectOrgID(context.Background(), "1")) require.NoError(t, err) _, err = newRoundTripper( + util_log.Logger, queryrangebase.RoundTripFunc(func(*http.Request) (*http.Response, error) { t.Error("unexpected default roundtripper called") return nil, nil diff --git a/pkg/querier/queryrange/split_by_range.go b/pkg/querier/queryrange/split_by_range.go index 1a6a29324c19c..af86e78c502dd 100644 --- a/pkg/querier/queryrange/split_by_range.go +++ b/pkg/querier/queryrange/split_by_range.go @@ -34,7 +34,7 @@ func NewSplitByRangeMiddleware(logger log.Logger, limits Limits, metrics *logql. logger: log.With(logger, "middleware", "InstantQuery.splitByRangeVector"), next: next, limits: limits, - ng: logql.NewDownstreamEngine(logql.EngineOpts{}, DownstreamHandler{ + ng: logql.NewDownstreamEngine(logql.EngineOpts{LogExecutingQuery: false}, DownstreamHandler{ limits: limits, next: next, }, limits, logger),