Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[bugfix] logq: fix Distinct operator returning too many log lines #9624

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions pkg/logql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ func (q *query) Eval(ctx context.Context) (promql_parser.Value, error) {
}

defer util.LogErrorWithContext(ctx, "closing iterator", iter.Close)
streams, err := readStreams(iter, q.params.Limit(), q.params.Direction(), q.params.Interval())
streams, err := readStreams(iter, q.params.Limit(), q.params.Direction(), q.params.Interval(), q.params.Query())
return streams, err
default:
return nil, errors.New("Unexpected type (%T): cannot evaluate")
Expand Down Expand Up @@ -503,14 +503,31 @@ func PopulateMatrixFromScalar(data promql.Scalar, params Params) promql.Matrix {
return promql.Matrix{series}
}

func readStreams(i iter.EntryIterator, size uint32, dir logproto.Direction, interval time.Duration) (logqlmodel.Streams, error) {
func readStreams(i iter.EntryIterator, size uint32, dir logproto.Direction, interval time.Duration, logql string) (logqlmodel.Streams, error) {
streams := map[string]*logproto.Stream{}
respSize := uint32(0)
// lastEntry should be a really old time so that the first comparison is always true, we use a negative
// value here because many unit tests start at time.Unix(0,0)
lastEntry := lastEntryMinTime

globalPipeline, err := syntax.ParseGlobalPipeline(logql)
if err != nil {
return nil, err
}
for respSize < size && i.Next() {
labels, entry := i.Labels(), i.Entry()

if globalPipeline != nil {
lbs, err := syntax.ParseLabels(labels)
if err != nil {
return nil, err
}
_, _, matches := globalPipeline.ForStream(lbs).ProcessString(entry.Timestamp.UnixNano(), entry.Line)
if !matches {
continue
}
}

forwardShouldOutput := dir == logproto.FORWARD &&
(i.Entry().Timestamp.Equal(lastEntry.Add(interval)) || i.Entry().Timestamp.After(lastEntry.Add(interval)))
backwardShouldOutput := dir == logproto.BACKWARD &&
Expand Down
45 changes: 45 additions & 0 deletions pkg/logql/syntax/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -940,6 +940,51 @@ func IsLogicalBinOp(op string) bool {
}
}

func IsGlobalFilter(expr Expr) bool {
result := false
expr.Walk(func(e interface{}) {
switch e.(type) {
case *DistinctFilterExpr:
result = true
return
}
})
return result
}

func ParseGlobalPipeline(logql string) (Pipeline, error) {
expr, err := ParseExpr(logql)
if err != nil {
//label and series query
return nil, nil
}
isGlobalFilter := IsGlobalFilter(expr)
var globalPipeline Pipeline
if isGlobalFilter {
switch e := expr.(type) {
case LogSelectorExpr:
pipeline, err := e.Pipeline()
if err != nil {
return nil, err
}
globalPipeline = pipeline
case SampleExpr:
selector, err := e.Selector()
if err != nil {
return nil, err
}
pipeline, err := selector.Pipeline()
if err != nil {
return nil, err
}
globalPipeline = pipeline
default:
globalPipeline = nil
}
}
return globalPipeline, nil
}

// SampleExpr is a LogQL expression filtering logs and returning metric samples.
type SampleExpr interface {
// Selector is the LogQL selector to apply when retrieving logs.
Expand Down
25 changes: 25 additions & 0 deletions pkg/logql/syntax/ast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -652,6 +652,31 @@ func TestFilterReodering(t *testing.T) {
})
}

func TestGlobalFilter(t *testing.T) {
for _, tc := range []struct {
in string
out bool
}{
{
in: `{job="varlogs", filename="/var/log/install.log"}|=": "|="softwareupdated["| regexp ".*: (?P<testLabel>.*):.*" |testLabel!=""| line_format "{{.testLabel}}"`,
out: false,
},
{
in: `{job="varlogs", filename="/var/log/install.log"}|=": "|="softwareupdated["| regexp ".*: (?P<testLabel>.*):.*" |testLabel="SUOSUServiceDaemon"
|="SUOSUServiceDaemon"|distinct testLabel`,
out: true,
},
} {
t.Run(tc.in, func(t *testing.T) {
l, err := ParseExpr(tc.in)
require.NoError(t, err)
expr := l.(LogSelectorExpr)
require.Equal(t, tc.out, IsGlobalFilter(expr))
})
}

}

var result bool

func BenchmarkReorderedPipeline(b *testing.B) {
Expand Down
63 changes: 53 additions & 10 deletions pkg/querier/queryrange/split_by_interval.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,20 @@ import (
"net/http"
"time"

"github.com/grafana/loki/pkg/util/math"
"github.com/grafana/dskit/tenant"
"github.com/opentracing/opentracing-go"
otlog "github.com/opentracing/opentracing-go/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
"github.com/weaveworks/common/httpgrpc"

"github.com/grafana/dskit/tenant"

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/syntax"
"github.com/grafana/loki/pkg/querier/queryrange/queryrangebase"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/math"
"github.com/grafana/loki/pkg/util/validation"
)

Expand Down Expand Up @@ -101,13 +100,17 @@ func (h *splitByInterval) Process(
threshold int64,
input []*lokiResult,
maxSeries int,
logql string,
) ([]queryrangebase.Response, error) {
var responses []queryrangebase.Response
ctx, cancel := context.WithCancel(ctx)
defer cancel()

ch := h.Feed(ctx, input)

globalPipeline, err := syntax.ParseGlobalPipeline(logql)
if err != nil {
return nil, err
}
// queries with 0 limits should not be exited early
var unlimited bool
if threshold == 0 {
Expand Down Expand Up @@ -135,17 +138,21 @@ func (h *splitByInterval) Process(
if data.err != nil {
return nil, data.err
}

responses = append(responses, data.resp)

// see if we can exit early if a limit has been reached
if casted, ok := data.resp.(*LokiResponse); !unlimited && ok {
if globalPipeline != nil {
casted, err = globalFilter(casted, globalPipeline)
if err != nil {
return nil, err
}
}
threshold -= casted.Count()

responses = append(responses, casted)
if threshold <= 0 {
return responses, nil
}

} else {
responses = append(responses, data.resp)
}

}
Expand Down Expand Up @@ -230,7 +237,7 @@ func (h *splitByInterval) Do(ctx context.Context, r queryrangebase.Request) (que
maxSeriesCapture := func(id string) int { return h.limits.MaxQuerySeries(ctx, id) }
maxSeries := validation.SmallestPositiveIntPerTenant(tenantIDs, maxSeriesCapture)
maxParallelism := MinWeightedParallelism(ctx, tenantIDs, h.configs, h.limits, model.Time(r.GetStart()), model.Time(r.GetEnd()))
resps, err := h.Process(ctx, maxParallelism, limit, input, maxSeries)
resps, err := h.Process(ctx, maxParallelism, limit, input, maxSeries, r.GetQuery())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -406,3 +413,39 @@ func nextIntervalBoundary(t time.Time, step int64, interval time.Duration) time.
}
return time.Unix(0, target)
}

func globalFilter(r *LokiResponse, globalPipeline syntax.Pipeline) (*LokiResponse, error) {
extractedResp := LokiResponse{
Status: r.Status,
Direction: r.Direction,
Limit: r.Limit,
Version: r.Version,
ErrorType: r.ErrorType,
Error: r.Error,
Statistics: r.Statistics,
Data: LokiData{
ResultType: r.Data.ResultType,
Result: []logproto.Stream{},
},
}
for _, stream := range r.Data.Result {
lbs, err := syntax.ParseLabels(stream.Labels)
if err != nil {
return nil, err
}
extractedStream := logproto.Stream{
Labels: stream.Labels,
Entries: []logproto.Entry{},
Hash: stream.Hash,
}
for _, entry := range stream.Entries {
_, _, matches := globalPipeline.ForStream(lbs).ProcessString(entry.Timestamp.UnixNano(), entry.Line)
if !matches {
continue
}
extractedStream.Entries = append(extractedStream.Entries, entry)
}
extractedResp.Data.Result = append(extractedResp.Data.Result, extractedStream)
}
return &extractedResp, nil
}