Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

check for stream selectors to have atleast one equality matcher #3216

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/chunkenc/memchunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -960,7 +960,7 @@ func BenchmarkBufferedIteratorLabels(b *testing.B) {
} {
b.Run(test, func(b *testing.B) {
b.ReportAllocs()
expr, err := logql.ParseLogSelector(test)
expr, err := logql.ParseLogSelector(test, true)
if err != nil {
b.Fatal(err)
}
Expand Down Expand Up @@ -999,7 +999,7 @@ func BenchmarkBufferedIteratorLabels(b *testing.B) {
} {
b.Run(test, func(b *testing.B) {
b.ReportAllocs()
expr, err := logql.ParseSampleExpr(test)
expr, err := logql.ParseSampleExpr(test, true)
if err != nil {
b.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/tailer.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type tailer struct {
}

func newTailer(orgID, query string, conn TailServer) (*tailer, error) {
expr, err := logql.ParseLogSelector(query)
expr, err := logql.ParseLogSelector(query, true)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/logentry/stages/match.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func validateMatcherConfig(cfg *MatcherConfig) (logql.LogSelectorExpr, error) {
return nil, errors.New(ErrStagesWithDropLine)
}

selector, err := logql.ParseLogSelector(cfg.Selector)
selector, err := logql.ParseLogSelector(cfg.Selector, false)
if err != nil {
return nil, errors.Wrap(err, ErrSelectorSyntax)
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/logql/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type SelectLogParams struct {
// LogSelector returns the LogSelectorExpr from the SelectParams.
// The `LogSelectorExpr` can then returns all matchers and filters to use for that request.
func (s SelectLogParams) LogSelector() (LogSelectorExpr, error) {
return ParseLogSelector(s.Selector)
return ParseLogSelector(s.Selector, true)
}

type SelectSampleParams struct {
Expand All @@ -54,13 +54,13 @@ type SelectSampleParams struct {
// Expr returns the SampleExpr from the SelectSampleParams.
// The `LogSelectorExpr` can then returns all matchers and filters to use for that request.
func (s SelectSampleParams) Expr() (SampleExpr, error) {
return ParseSampleExpr(s.Selector)
return ParseSampleExpr(s.Selector, true)
}

// LogSelector returns the LogSelectorExpr from the SelectParams.
// The `LogSelectorExpr` can then returns all matchers and filters to use for that request.
func (s SelectSampleParams) LogSelector() (LogSelectorExpr, error) {
expr, err := ParseSampleExpr(s.Selector)
expr, err := ParseSampleExpr(s.Selector, true)
if err != nil {
return nil, err
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/logql/ast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func Test_logSelectorExpr_String(t *testing.T) {
selector string
expectFilter bool
}{
{`{foo!~"bar"}`, false},
{`{foo="bar"}`, false},
{`{foo="bar", bar!="baz"}`, false},
{`{foo="bar", bar!="baz"} != "bip" !~ ".+bop"`, true},
{`{foo="bar"} |= "baz" |~ "blip" != "flip" !~ "flap"`, true},
Expand All @@ -36,7 +36,7 @@ func Test_logSelectorExpr_String(t *testing.T) {
tt := tt
t.Run(tt.selector, func(t *testing.T) {
t.Parallel()
expr, err := ParseLogSelector(tt.selector)
expr, err := ParseLogSelector(tt.selector, true)
if err != nil {
t.Fatalf("failed to parse log selector: %s", err)
}
Expand Down Expand Up @@ -150,7 +150,7 @@ func Test_NilFilterDoesntPanic(t *testing.T) {
`{namespace="dev", container_name="cart"} |= "bleep" |= "bloop" |= ""`,
} {
t.Run(tc, func(t *testing.T) {
expr, err := ParseLogSelector(tc)
expr, err := ParseLogSelector(tc, true)
require.Nil(t, err)

p, err := expr.Pipeline()
Expand Down Expand Up @@ -239,7 +239,7 @@ func Test_FilterMatcher(t *testing.T) {
tt := tt
t.Run(tt.q, func(t *testing.T) {
t.Parallel()
expr, err := ParseLogSelector(tt.q)
expr, err := ParseLogSelector(tt.q, true)
assert.Nil(t, err)
assert.Equal(t, tt.expectedMatchers, expr.Matchers())
p, err := expr.Pipeline()
Expand Down Expand Up @@ -297,7 +297,7 @@ func TestStringer(t *testing.T) {
}

func BenchmarkContainsFilter(b *testing.B) {
expr, err := ParseLogSelector(`{app="foo"} |= "foo"`)
expr, err := ParseLogSelector(`{app="foo"} |= "foo"`, true)
if err != nil {
b.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/logql/functions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func Test_Extractor(t *testing.T) {
`,
} {
t.Run(tc, func(t *testing.T) {
expr, err := ParseSampleExpr(tc)
expr, err := ParseSampleExpr(tc, true)
require.Nil(t, err)
_, err = expr.Extractor()
require.Nil(t, err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/logql/log/parser_hints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func Test_ParserHints(t *testing.T) {
tt := tt
t.Run(tt.expr, func(t *testing.T) {
t.Parallel()
expr, err := logql.ParseSampleExpr(tt.expr)
expr, err := logql.ParseSampleExpr(tt.expr, true)
require.NoError(t, err)

ex, err := expr.Extractor()
Expand Down
2 changes: 1 addition & 1 deletion pkg/logql/optimize.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func optimizeSampleExpr(expr SampleExpr) (SampleExpr, error) {
}
// clone the expr.
q := expr.String()
expr, err := ParseSampleExpr(q)
expr, err := ParseSampleExpr(q, true)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/logql/optimize_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func Test_optimizeSampleExpr(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.in, func(t *testing.T) {
e, err := ParseSampleExpr(tt.in)
e, err := ParseSampleExpr(tt.in, true)
require.NoError(t, err)
got, err := optimizeSampleExpr(e)
require.NoError(t, err)
Expand Down
49 changes: 47 additions & 2 deletions pkg/logql/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@ import (
"sync"
"text/scanner"

"github.com/cortexproject/cortex/pkg/util"
"github.com/prometheus/prometheus/pkg/labels"
promql_parser "github.com/prometheus/prometheus/promql/parser"
)

const errAtleastOneEqualityMatcherRequired = "queries require at least one regexp or equality matcher that does not have an empty-compatible value. For instance, app=~\".*\" does not meet this requirement, but app=~\".+\" will"

var parserPool = sync.Pool{
New: func() interface{} {
p := &parser{
Expand Down Expand Up @@ -72,6 +75,19 @@ func ParseExpr(input string) (expr Expr, err error) {
return p.Parse()
}

// checkEqualityMatchers checks whether a query would touch all the streams in the query range or uses at least one matcher to select specific streams.
func checkEqualityMatchers(matchers []*labels.Matcher) error {
if len(matchers) == 0 {
return nil
}
_, matchers = util.SplitFiltersAndMatchers(matchers)
if len(matchers) == 0 {
return errors.New(errAtleastOneEqualityMatcherRequired)
}

return nil
}

// ParseMatchers parses a string and returns labels matchers, if the expression contains
// anything else it will return an error.
func ParseMatchers(input string) ([]*labels.Matcher, error) {
Expand All @@ -87,7 +103,7 @@ func ParseMatchers(input string) ([]*labels.Matcher, error) {
}

// ParseSampleExpr parses a string and returns the sampleExpr
func ParseSampleExpr(input string) (SampleExpr, error) {
func ParseSampleExpr(input string, equalityMatcherRequired bool) (SampleExpr, error) {
expr, err := ParseExpr(input)
if err != nil {
return nil, err
Expand All @@ -96,11 +112,33 @@ func ParseSampleExpr(input string) (SampleExpr, error) {
if !ok {
return nil, errors.New("only sample expression supported")
}

if equalityMatcherRequired {
err = sampleExprCheckEqualityMatchers(sampleExpr)
if err != nil {
return nil, err
}
}
return sampleExpr, nil
}

func sampleExprCheckEqualityMatchers(expr SampleExpr) error {
switch e := expr.(type) {
case *binOpExpr:
if err := sampleExprCheckEqualityMatchers(e.SampleExpr); err != nil {
return err
}

return sampleExprCheckEqualityMatchers(e.RHS)
case *literalExpr:
return nil
default:
return checkEqualityMatchers(expr.Selector().Matchers())
}
}

// ParseLogSelector parses a log selector expression `{app="foo"} |= "filter"`
func ParseLogSelector(input string) (LogSelectorExpr, error) {
func ParseLogSelector(input string, equalityMatcherRequired bool) (LogSelectorExpr, error) {
expr, err := ParseExpr(input)
if err != nil {
return nil, err
Expand All @@ -109,6 +147,13 @@ func ParseLogSelector(input string) (LogSelectorExpr, error) {
if !ok {
return nil, errors.New("only log selector is supported")
}

if equalityMatcherRequired {
err = checkEqualityMatchers(logSelector.Matchers())
if err != nil {
return nil, err
}
}
return logSelector, nil
}

Expand Down
94 changes: 93 additions & 1 deletion pkg/logql/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2320,7 +2320,7 @@ func TestIsParseError(t *testing.T) {
func Test_PipelineCombined(t *testing.T) {
query := `{job="cortex-ops/query-frontend"} |= "logging.go" | logfmt | line_format "{{.msg}}" | regexp "(?P<method>\\w+) (?P<path>[\\w|/]+) \\((?P<status>\\d+?)\\) (?P<duration>.*)" | (duration > 1s or status==200) and method="POST" | line_format "{{.duration}}|{{.method}}|{{.status}}"`

expr, err := ParseLogSelector(query)
expr, err := ParseLogSelector(query, true)
require.Nil(t, err)

p, err := expr.Pipeline()
Expand Down Expand Up @@ -2365,3 +2365,95 @@ func Benchmark_CompareParseLabels(b *testing.B) {
}
})
}

func TestParseSampleExpr_equalityMatcher(t *testing.T) {
for _, tc := range []struct {
in string
err error
}{
{
in: `count_over_time({foo="bar"}[5m])`,
},
{
in: `count_over_time({foo!="bar"}[5m])`,
err: errors.New(errAtleastOneEqualityMatcherRequired),
},
{
in: `count_over_time({app="baz", foo!="bar"}[5m])`,
},
{
in: `count_over_time({app=~".+"}[5m])`,
},
{
in: `count_over_time({app=~".*"}[5m])`,
err: errors.New(errAtleastOneEqualityMatcherRequired),
},
{
in: `count_over_time({app=~"bar|baz"}[5m])`,
},
{
in: `count_over_time({app!~"bar|baz"}[5m])`,
err: errors.New(errAtleastOneEqualityMatcherRequired),
},
{
in: `1 + count_over_time({app=~".*"}[5m])`,
err: errors.New(errAtleastOneEqualityMatcherRequired),
},
{
in: `1 + count_over_time({app=~".+"}[5m]) + count_over_time({app=~".*"}[5m])`,
err: errors.New(errAtleastOneEqualityMatcherRequired),
},
{
in: `1 + count_over_time({app=~".+"}[5m]) + count_over_time({app=~".+"}[5m])`,
},
{
in: `1 + count_over_time({app=~".+"}[5m]) + count_over_time({app=~".*"}[5m]) + 1`,
err: errors.New(errAtleastOneEqualityMatcherRequired),
},
{
in: `1 + count_over_time({app=~".+"}[5m]) + count_over_time({app=~".+"}[5m]) + 1`,
},
} {
t.Run(tc.in, func(t *testing.T) {
_, err := ParseSampleExpr(tc.in, true)
require.Equal(t, tc.err, err)
})
}
}

func TestParseLogSelectorExpr_equalityMatcher(t *testing.T) {
for _, tc := range []struct {
in string
err error
}{
{
in: `{foo="bar"}`,
},
{
in: `{foo!="bar"}`,
err: errors.New(errAtleastOneEqualityMatcherRequired),
},
{
in: `{app="baz", foo!="bar"}`,
},
{
in: `{app=~".+"}`,
},
{
in: `{app=~".*"}`,
err: errors.New(errAtleastOneEqualityMatcherRequired),
},
{
in: `{foo=~"bar|baz"}`,
},
{
in: `{foo!~"bar|baz"}`,
err: errors.New(errAtleastOneEqualityMatcherRequired),
},
} {
t.Run(tc.in, func(t *testing.T) {
_, err := ParseLogSelector(tc.in, true)
require.Equal(t, tc.err, err)
})
}
}
24 changes: 12 additions & 12 deletions pkg/logql/sharding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,22 +36,22 @@ func TestMappingEquivalence(t *testing.T) {
{`1 + 1`, false},
{`{a="1"}`, false},
{`{a="1"} |= "number: 10"`, false},
{`rate({a=~".*"}[1s])`, false},
{`sum by (a) (rate({a=~".*"}[1s]))`, false},
{`sum(rate({a=~".*"}[1s]))`, false},
{`max without (a) (rate({a=~".*"}[1s]))`, false},
{`count(rate({a=~".*"}[1s]))`, false},
{`avg(rate({a=~".*"}[1s]))`, true},
{`avg(rate({a=~".*"}[1s])) by (a)`, true},
{`1 + sum by (cluster) (rate({a=~".*"}[1s]))`, false},
{`sum(max(rate({a=~".*"}[1s])))`, false},
{`max(count(rate({a=~".*"}[1s])))`, false},
{`max(sum by (cluster) (rate({a=~".*"}[1s]))) / count(rate({a=~".*"}[1s]))`, false},
{`rate({a=~".+"}[1s])`, false},
{`sum by (a) (rate({a=~".+"}[1s]))`, false},
{`sum(rate({a=~".+"}[1s]))`, false},
{`max without (a) (rate({a=~".+"}[1s]))`, false},
{`count(rate({a=~".+"}[1s]))`, false},
{`avg(rate({a=~".+"}[1s]))`, true},
{`avg(rate({a=~".+"}[1s])) by (a)`, true},
{`1 + sum by (cluster) (rate({a=~".+"}[1s]))`, false},
{`sum(max(rate({a=~".+"}[1s])))`, false},
{`max(count(rate({a=~".+"}[1s])))`, false},
{`max(sum by (cluster) (rate({a=~".+"}[1s]))) / count(rate({a=~".+"}[1s]))`, false},
// topk prefers already-seen values in tiebreakers. Since the test data generates
// the same log lines for each series & the resulting promql.Vectors aren't deterministically
// sorted by labels, we don't expect this to pass.
// We could sort them as stated, but it doesn't seem worth the performance hit.
// {`topk(3, rate({a=~".*"}[1s]))`, false},
// {`topk(3, rate({a=~".+"}[1s]))`, false},
} {
q := NewMockQuerier(
shards,
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ func parseRegexQuery(httpRequest *http.Request) (string, error) {
query := httpRequest.Form.Get("query")
regexp := httpRequest.Form.Get("regexp")
if regexp != "" {
expr, err := logql.ParseLogSelector(query)
expr, err := logql.ParseLogSelector(query, true)
if err != nil {
return "", err
}
Expand Down