Skip to content

Commit

Permalink
check for stream selectors to have atleast one equality matcher (#3216)
Browse files Browse the repository at this point in the history
* check for stream selectors to have atleast one equality matcher

* move equality matchers check to logql
  • Loading branch information
sandeepsukhani authored Mar 30, 2021
1 parent e0f0ee0 commit b152bc2
Show file tree
Hide file tree
Showing 13 changed files with 169 additions and 32 deletions.
4 changes: 2 additions & 2 deletions pkg/chunkenc/memchunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -960,7 +960,7 @@ func BenchmarkBufferedIteratorLabels(b *testing.B) {
} {
b.Run(test, func(b *testing.B) {
b.ReportAllocs()
expr, err := logql.ParseLogSelector(test)
expr, err := logql.ParseLogSelector(test, true)
if err != nil {
b.Fatal(err)
}
Expand Down Expand Up @@ -999,7 +999,7 @@ func BenchmarkBufferedIteratorLabels(b *testing.B) {
} {
b.Run(test, func(b *testing.B) {
b.ReportAllocs()
expr, err := logql.ParseSampleExpr(test)
expr, err := logql.ParseSampleExpr(test, true)
if err != nil {
b.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/tailer.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type tailer struct {
}

func newTailer(orgID, query string, conn TailServer) (*tailer, error) {
expr, err := logql.ParseLogSelector(query)
expr, err := logql.ParseLogSelector(query, true)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/logentry/stages/match.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func validateMatcherConfig(cfg *MatcherConfig) (logql.LogSelectorExpr, error) {
return nil, errors.New(ErrStagesWithDropLine)
}

selector, err := logql.ParseLogSelector(cfg.Selector)
selector, err := logql.ParseLogSelector(cfg.Selector, false)
if err != nil {
return nil, errors.Wrap(err, ErrSelectorSyntax)
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/logql/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type SelectLogParams struct {
// LogSelector returns the LogSelectorExpr from the SelectParams.
// The `LogSelectorExpr` can then returns all matchers and filters to use for that request.
func (s SelectLogParams) LogSelector() (LogSelectorExpr, error) {
return ParseLogSelector(s.Selector)
return ParseLogSelector(s.Selector, true)
}

type SelectSampleParams struct {
Expand All @@ -54,13 +54,13 @@ type SelectSampleParams struct {
// Expr returns the SampleExpr from the SelectSampleParams.
// The `LogSelectorExpr` can then returns all matchers and filters to use for that request.
func (s SelectSampleParams) Expr() (SampleExpr, error) {
return ParseSampleExpr(s.Selector)
return ParseSampleExpr(s.Selector, true)
}

// LogSelector returns the LogSelectorExpr from the SelectParams.
// The `LogSelectorExpr` can then returns all matchers and filters to use for that request.
func (s SelectSampleParams) LogSelector() (LogSelectorExpr, error) {
expr, err := ParseSampleExpr(s.Selector)
expr, err := ParseSampleExpr(s.Selector, true)
if err != nil {
return nil, err
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/logql/ast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func Test_logSelectorExpr_String(t *testing.T) {
selector string
expectFilter bool
}{
{`{foo!~"bar"}`, false},
{`{foo="bar"}`, false},
{`{foo="bar", bar!="baz"}`, false},
{`{foo="bar", bar!="baz"} != "bip" !~ ".+bop"`, true},
{`{foo="bar"} |= "baz" |~ "blip" != "flip" !~ "flap"`, true},
Expand All @@ -37,7 +37,7 @@ func Test_logSelectorExpr_String(t *testing.T) {
tt := tt
t.Run(tt.selector, func(t *testing.T) {
t.Parallel()
expr, err := ParseLogSelector(tt.selector)
expr, err := ParseLogSelector(tt.selector, true)
if err != nil {
t.Fatalf("failed to parse log selector: %s", err)
}
Expand Down Expand Up @@ -176,7 +176,7 @@ func Test_NilFilterDoesntPanic(t *testing.T) {
`{namespace="dev", container_name="cart"} |= "bleep" |= "bloop" |= ""`,
} {
t.Run(tc, func(t *testing.T) {
expr, err := ParseLogSelector(tc)
expr, err := ParseLogSelector(tc, true)
require.Nil(t, err)

p, err := expr.Pipeline()
Expand Down Expand Up @@ -265,7 +265,7 @@ func Test_FilterMatcher(t *testing.T) {
tt := tt
t.Run(tt.q, func(t *testing.T) {
t.Parallel()
expr, err := ParseLogSelector(tt.q)
expr, err := ParseLogSelector(tt.q, true)
assert.Nil(t, err)
assert.Equal(t, tt.expectedMatchers, expr.Matchers())
p, err := expr.Pipeline()
Expand Down Expand Up @@ -323,7 +323,7 @@ func TestStringer(t *testing.T) {
}

func BenchmarkContainsFilter(b *testing.B) {
expr, err := ParseLogSelector(`{app="foo"} |= "foo"`)
expr, err := ParseLogSelector(`{app="foo"} |= "foo"`, true)
if err != nil {
b.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/logql/functions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func Test_Extractor(t *testing.T) {
`,
} {
t.Run(tc, func(t *testing.T) {
expr, err := ParseSampleExpr(tc)
expr, err := ParseSampleExpr(tc, true)
require.Nil(t, err)
_, err = expr.Extractor()
require.Nil(t, err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/logql/log/parser_hints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func Test_ParserHints(t *testing.T) {
tt := tt
t.Run(tt.expr, func(t *testing.T) {
t.Parallel()
expr, err := logql.ParseSampleExpr(tt.expr)
expr, err := logql.ParseSampleExpr(tt.expr, true)
require.NoError(t, err)

ex, err := expr.Extractor()
Expand Down
2 changes: 1 addition & 1 deletion pkg/logql/optimize.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func optimizeSampleExpr(expr SampleExpr) (SampleExpr, error) {
}
// clone the expr.
q := expr.String()
expr, err := ParseSampleExpr(q)
expr, err := ParseSampleExpr(q, true)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/logql/optimize_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func Test_optimizeSampleExpr(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.in, func(t *testing.T) {
e, err := ParseSampleExpr(tt.in)
e, err := ParseSampleExpr(tt.in, true)
require.NoError(t, err)
got, err := optimizeSampleExpr(e)
require.NoError(t, err)
Expand Down
49 changes: 47 additions & 2 deletions pkg/logql/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@ import (
"sync"
"text/scanner"

"github.com/cortexproject/cortex/pkg/util"
"github.com/prometheus/prometheus/pkg/labels"
promql_parser "github.com/prometheus/prometheus/promql/parser"
)

const errAtleastOneEqualityMatcherRequired = "queries require at least one regexp or equality matcher that does not have an empty-compatible value. For instance, app=~\".*\" does not meet this requirement, but app=~\".+\" will"

var parserPool = sync.Pool{
New: func() interface{} {
p := &parser{
Expand Down Expand Up @@ -80,6 +83,19 @@ func ParseExpr(input string) (expr Expr, err error) {
return p.Parse()
}

// checkEqualityMatchers checks whether a query would touch all the streams in the query range or uses at least one matcher to select specific streams.
func checkEqualityMatchers(matchers []*labels.Matcher) error {
if len(matchers) == 0 {
return nil
}
_, matchers = util.SplitFiltersAndMatchers(matchers)
if len(matchers) == 0 {
return errors.New(errAtleastOneEqualityMatcherRequired)
}

return nil
}

// ParseMatchers parses a string and returns labels matchers, if the expression contains
// anything else it will return an error.
func ParseMatchers(input string) ([]*labels.Matcher, error) {
Expand All @@ -95,7 +111,7 @@ func ParseMatchers(input string) ([]*labels.Matcher, error) {
}

// ParseSampleExpr parses a string and returns the sampleExpr
func ParseSampleExpr(input string) (SampleExpr, error) {
func ParseSampleExpr(input string, equalityMatcherRequired bool) (SampleExpr, error) {
expr, err := ParseExpr(input)
if err != nil {
return nil, err
Expand All @@ -104,11 +120,33 @@ func ParseSampleExpr(input string) (SampleExpr, error) {
if !ok {
return nil, errors.New("only sample expression supported")
}

if equalityMatcherRequired {
err = sampleExprCheckEqualityMatchers(sampleExpr)
if err != nil {
return nil, err
}
}
return sampleExpr, nil
}

func sampleExprCheckEqualityMatchers(expr SampleExpr) error {
switch e := expr.(type) {
case *binOpExpr:
if err := sampleExprCheckEqualityMatchers(e.SampleExpr); err != nil {
return err
}

return sampleExprCheckEqualityMatchers(e.RHS)
case *literalExpr:
return nil
default:
return checkEqualityMatchers(expr.Selector().Matchers())
}
}

// ParseLogSelector parses a log selector expression `{app="foo"} |= "filter"`
func ParseLogSelector(input string) (LogSelectorExpr, error) {
func ParseLogSelector(input string, equalityMatcherRequired bool) (LogSelectorExpr, error) {
expr, err := ParseExpr(input)
if err != nil {
return nil, err
Expand All @@ -117,6 +155,13 @@ func ParseLogSelector(input string) (LogSelectorExpr, error) {
if !ok {
return nil, errors.New("only log selector is supported")
}

if equalityMatcherRequired {
err = checkEqualityMatchers(logSelector.Matchers())
if err != nil {
return nil, err
}
}
return logSelector, nil
}

Expand Down
94 changes: 93 additions & 1 deletion pkg/logql/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2460,7 +2460,7 @@ func TestIsParseError(t *testing.T) {
func Test_PipelineCombined(t *testing.T) {
query := `{job="cortex-ops/query-frontend"} |= "logging.go" | logfmt | line_format "{{.msg}}" | regexp "(?P<method>\\w+) (?P<path>[\\w|/]+) \\((?P<status>\\d+?)\\) (?P<duration>.*)" | (duration > 1s or status==200) and method="POST" | line_format "{{.duration}}|{{.method}}|{{.status}}"`

expr, err := ParseLogSelector(query)
expr, err := ParseLogSelector(query, true)
require.Nil(t, err)

p, err := expr.Pipeline()
Expand Down Expand Up @@ -2505,3 +2505,95 @@ func Benchmark_CompareParseLabels(b *testing.B) {
}
})
}

func TestParseSampleExpr_equalityMatcher(t *testing.T) {
for _, tc := range []struct {
in string
err error
}{
{
in: `count_over_time({foo="bar"}[5m])`,
},
{
in: `count_over_time({foo!="bar"}[5m])`,
err: errors.New(errAtleastOneEqualityMatcherRequired),
},
{
in: `count_over_time({app="baz", foo!="bar"}[5m])`,
},
{
in: `count_over_time({app=~".+"}[5m])`,
},
{
in: `count_over_time({app=~".*"}[5m])`,
err: errors.New(errAtleastOneEqualityMatcherRequired),
},
{
in: `count_over_time({app=~"bar|baz"}[5m])`,
},
{
in: `count_over_time({app!~"bar|baz"}[5m])`,
err: errors.New(errAtleastOneEqualityMatcherRequired),
},
{
in: `1 + count_over_time({app=~".*"}[5m])`,
err: errors.New(errAtleastOneEqualityMatcherRequired),
},
{
in: `1 + count_over_time({app=~".+"}[5m]) + count_over_time({app=~".*"}[5m])`,
err: errors.New(errAtleastOneEqualityMatcherRequired),
},
{
in: `1 + count_over_time({app=~".+"}[5m]) + count_over_time({app=~".+"}[5m])`,
},
{
in: `1 + count_over_time({app=~".+"}[5m]) + count_over_time({app=~".*"}[5m]) + 1`,
err: errors.New(errAtleastOneEqualityMatcherRequired),
},
{
in: `1 + count_over_time({app=~".+"}[5m]) + count_over_time({app=~".+"}[5m]) + 1`,
},
} {
t.Run(tc.in, func(t *testing.T) {
_, err := ParseSampleExpr(tc.in, true)
require.Equal(t, tc.err, err)
})
}
}

func TestParseLogSelectorExpr_equalityMatcher(t *testing.T) {
for _, tc := range []struct {
in string
err error
}{
{
in: `{foo="bar"}`,
},
{
in: `{foo!="bar"}`,
err: errors.New(errAtleastOneEqualityMatcherRequired),
},
{
in: `{app="baz", foo!="bar"}`,
},
{
in: `{app=~".+"}`,
},
{
in: `{app=~".*"}`,
err: errors.New(errAtleastOneEqualityMatcherRequired),
},
{
in: `{foo=~"bar|baz"}`,
},
{
in: `{foo!~"bar|baz"}`,
err: errors.New(errAtleastOneEqualityMatcherRequired),
},
} {
t.Run(tc.in, func(t *testing.T) {
_, err := ParseLogSelector(tc.in, true)
require.Equal(t, tc.err, err)
})
}
}
24 changes: 12 additions & 12 deletions pkg/logql/sharding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,22 +36,22 @@ func TestMappingEquivalence(t *testing.T) {
{`1 + 1`, false},
{`{a="1"}`, false},
{`{a="1"} |= "number: 10"`, false},
{`rate({a=~".*"}[1s])`, false},
{`sum by (a) (rate({a=~".*"}[1s]))`, false},
{`sum(rate({a=~".*"}[1s]))`, false},
{`max without (a) (rate({a=~".*"}[1s]))`, false},
{`count(rate({a=~".*"}[1s]))`, false},
{`avg(rate({a=~".*"}[1s]))`, true},
{`avg(rate({a=~".*"}[1s])) by (a)`, true},
{`1 + sum by (cluster) (rate({a=~".*"}[1s]))`, false},
{`sum(max(rate({a=~".*"}[1s])))`, false},
{`max(count(rate({a=~".*"}[1s])))`, false},
{`max(sum by (cluster) (rate({a=~".*"}[1s]))) / count(rate({a=~".*"}[1s]))`, false},
{`rate({a=~".+"}[1s])`, false},
{`sum by (a) (rate({a=~".+"}[1s]))`, false},
{`sum(rate({a=~".+"}[1s]))`, false},
{`max without (a) (rate({a=~".+"}[1s]))`, false},
{`count(rate({a=~".+"}[1s]))`, false},
{`avg(rate({a=~".+"}[1s]))`, true},
{`avg(rate({a=~".+"}[1s])) by (a)`, true},
{`1 + sum by (cluster) (rate({a=~".+"}[1s]))`, false},
{`sum(max(rate({a=~".+"}[1s])))`, false},
{`max(count(rate({a=~".+"}[1s])))`, false},
{`max(sum by (cluster) (rate({a=~".+"}[1s]))) / count(rate({a=~".+"}[1s]))`, false},
// topk prefers already-seen values in tiebreakers. Since the test data generates
// the same log lines for each series & the resulting promql.Vectors aren't deterministically
// sorted by labels, we don't expect this to pass.
// We could sort them as stated, but it doesn't seem worth the performance hit.
// {`topk(3, rate({a=~".*"}[1s]))`, false},
// {`topk(3, rate({a=~".+"}[1s]))`, false},
} {
q := NewMockQuerier(
shards,
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ func parseRegexQuery(httpRequest *http.Request) (string, error) {
query := httpRequest.Form.Get("query")
regexp := httpRequest.Form.Get("regexp")
if regexp != "" {
expr, err := logql.ParseLogSelector(query)
expr, err := logql.ParseLogSelector(query, true)
if err != nil {
return "", err
}
Expand Down

0 comments on commit b152bc2

Please sign in to comment.