Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Max bytes read limit #8670

Merged
merged 55 commits into from
Mar 23, 2023
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
91e7206
Working get index stats
salvacorts Feb 27, 2023
1f81aea
Refactor where matchers are extracted
salvacorts Feb 27, 2023
77895e8
Working check for limit
salvacorts Feb 28, 2023
1e2fc1b
Check for unsuported req type so we don't need a stats roundtripper
salvacorts Feb 28, 2023
f149835
Refactor ExtractMatcher func
salvacorts Mar 1, 2023
2043438
Add qwuerier-level limit
salvacorts Mar 1, 2023
94f941b
Error message can be configured
salvacorts Mar 1, 2023
0aea9e1
Limit other query types
salvacorts Mar 1, 2023
4780298
Rename querier-level limit
salvacorts Mar 1, 2023
798313a
Add comments to `skipRequestType` func
salvacorts Mar 1, 2023
4aa7a34
Rename Subquery to Querieri in new limits
salvacorts Mar 2, 2023
2a1df3e
Support queries with multiple matchers
salvacorts Mar 2, 2023
1674e67
Check if index type is TSDB
salvacorts Mar 2, 2023
648589e
Skip middlewares for index stats
salvacorts Mar 7, 2023
aaf90e5
Extract func to get index stats for matcher groups
salvacorts Mar 7, 2023
c9c8610
Reuse ShardingConfigs methods
salvacorts Mar 7, 2023
75b1d54
Refactor roundTripper to reuse roundTripperHandler
salvacorts Mar 7, 2023
af1c0d2
Fix errors
salvacorts Mar 7, 2023
a024f95
Re-use sharding index stats to check querier size limit
salvacorts Mar 7, 2023
db3fc7c
Human-readable bytes in errors
salvacorts Mar 7, 2023
e3eb162
Split index stats req in 24h
salvacorts Mar 8, 2023
d57b79b
Use MaxLookBackPeriod
salvacorts Mar 8, 2023
d5665f2
Minor changes
salvacorts Mar 8, 2023
7bc5eaf
Use skipmiddleware on NewLimitedTripperware
salvacorts Mar 8, 2023
8c26b1e
Check unshardable queries and check bytesPerShard
salvacorts Mar 9, 2023
1818e8d
Remove debug code for unsplit index request
salvacorts Mar 9, 2023
877e39e
Fix test compile
salvacorts Mar 9, 2023
5d87192
Fix test compilation
salvacorts Mar 9, 2023
deeca4a
Merge branch 'main' into salvacorts/max_data_query_limit
salvacorts Mar 9, 2023
43d96bc
Update docs
salvacorts Mar 20, 2023
ddce6aa
Apply formatting suggestions
salvacorts Mar 20, 2023
350e15c
Enforce parallelism of at least 1 in split by interval
salvacorts Mar 20, 2023
226a076
Test query size middleware
salvacorts Mar 20, 2023
28c27fa
Test multi-matchers and offset
salvacorts Mar 20, 2023
954407e
Rename function to create size limiter
salvacorts Mar 20, 2023
3d2fff3
Docs for NewRoundTripperHandler
salvacorts Mar 20, 2023
1704c5c
Test limit on non shardable queries
salvacorts Mar 21, 2023
9b8aa08
Test checkQuerySizeLimit on shardable queries
salvacorts Mar 21, 2023
77b046f
Merge branch 'main' into salvacorts/max_data_query_limit
salvacorts Mar 21, 2023
ff84730
Support per-request limits
salvacorts Mar 21, 2023
c350297
Improve docs.
salvacorts Mar 21, 2023
f1ba7eb
Delete unused function
salvacorts Mar 21, 2023
8ec053c
Add Changelog
salvacorts Mar 21, 2023
f422e0a
Enforce limits on instant/range log and metric queries
salvacorts Mar 21, 2023
54ec3ba
Test limits in reountrip tests
salvacorts Mar 21, 2023
4f11f25
Configure retry mechanism on skipMiddleware
salvacorts Mar 22, 2023
c482fe9
Create splitByMetrics if metrics is null
salvacorts Mar 22, 2023
db98d7c
Remove MaxQuerierBytesRead from per query limits
salvacorts Mar 23, 2023
8515b43
Add advice to error message
salvacorts Mar 23, 2023
cfcc4e4
Log when limit is applied
salvacorts Mar 23, 2023
6661d30
Log both above and bellow the limit
salvacorts Mar 23, 2023
9c62ef7
Mapper returns bytesPerShard
salvacorts Mar 23, 2023
17bed47
Codex no longer needed in newASTMapperware
salvacorts Mar 23, 2023
5a591b3
Logging limit result in astMapperware.checkQuerySizeLimit
salvacorts Mar 23, 2023
10f5a43
Merge branch 'main' into salvacorts/max_data_query_limit
salvacorts Mar 23, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions pkg/logql/syntax/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,29 @@ func ParseMatchers(input string) ([]*labels.Matcher, error) {
return matcherExpr.Mts, nil
}

// ExtractMatchersFromQuery extracts the matchers from a query.
// compared to ParseMatchers, it will not fail if the query contains anything else.
func ExtractMatchersFromQuery(input string) ([]*labels.Matcher, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a function syntax.MatcherGroups which is is what you'll want as it gives [(matchers, time_range)] pairs within a query. This is used in queryrange/shard_resolver.go.

Copy link
Contributor Author

@salvacorts salvacorts Mar 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call! I didn't think about the queries that may contain multiple matchers. E.g from (TestMatcherGroups).

count_over_time({job="foo"}[5m]) / count_over_time({job="bar"}[5m] offset 10m)

IIUC, similarly to in shard_resolver.go, we need to create a IndexStats request for each matcher. Then we need to sum the bytes for each IndexStats response. That's the amount of data a query would read.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I implemented this in 2a1df3e. Looks like it works fine. If this is what you meant, I do have some new questions 😅:

  • Shall we set the concurrency to get index stats from a config as done in shardResolverForConf?
    • I can see some places where the ForEach is set to nonconfigurable values.
    • For simplicity, I think it should be ok to leave the concurrency as len(matcherGroups) or to a fixed not-so-high value (e.g. 10). But happy to change my mind.
  • As in dynamicShardResolver.Shards, for log selector queries, should we also subtract the max_look_back_period from the query start time? I think I don't quite understand why is that needed on sharding.

Tested in loki-dev-009

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

max_look_back_period is generally most important in limited instant queries (which are rarely used). If I submit {job="foo"} with start=end, It would always return zero results. Prometheus has a concept of "look back amount of time for instant queries" since metric data is sampled at some configurable scrape_interval (commonly 15s, 30s, or 1m). We sort of copy that idea and say, "find me logs from the past when start=end".

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just came across this myself. I don't think we need an extra method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed

expr, err := ParseExpr(input)
if err != nil {
return nil, err
}

var matchers []*labels.Matcher
expr.Walk(func(e interface{}) {
switch concrete := e.(type) {
case *MatchersExpr:
matchers = concrete.Matchers()
}
})

if len(matchers) == 0 {
return nil, errors.New("failed to extract matchers from query")
}

return matchers, nil
}

func MatchersString(xs []*labels.Matcher) string {
return newMatcherExpr(xs).String()
}
Expand Down
87 changes: 86 additions & 1 deletion pkg/querier/queryrange/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ import (

"github.com/go-kit/log/level"
"github.com/grafana/dskit/tenant"
"github.com/grafana/loki/pkg/logql/syntax"
"github.com/grafana/loki/pkg/querier/queryrange/queryrangebase/definitions"
"github.com/grafana/loki/pkg/util/flagext"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/timestamp"
Expand All @@ -27,7 +30,9 @@ import (
)

const (
limitErrTmpl = "maximum of series (%d) reached for a single query"
limitErrTmpl = "maximum of series (%d) reached for a single query"
limErrQueryTooManyBytesTmpl = "the query would read too many bytes (query: %s, limit: %s)"
limErrSubqueryTooManyBytesTmpl = "after splitting and sharding, at least one sub-query would read too many bytes (query: %s, limit: %s)"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
limErrSubqueryTooManyBytesTmpl = "after splitting and sharding, at least one sub-query would read too many bytes (query: %s, limit: %s)"
limErrSubqueryTooManyBytesTmpl = "query too large to execute on a single querier, either because parallelization is not enabled or the query is unshardable: (query: %s, limit: %s)"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, thank you!

)

var (
Expand All @@ -45,13 +50,16 @@ type Limits interface {
// TSDBMaxQueryParallelism returns the limit to the number of split queries the
// frontend will process in parallel for TSDB queries.
TSDBMaxQueryParallelism(string) int
MaxQueryBytesRead(u string) int
MaxSubqueryBytesRead(u string) int
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rename this to MaxQuerierBytesRead as the term subquery is overloaded in prometheus (and we may add this concept eventually in loki)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call, done.

}

type limits struct {
Limits
// Use pointers so nil value can indicate if the value was set.
splitDuration *time.Duration
maxQueryParallelism *int
maxQueryBytesRead *int
}

func (l limits) QuerySplitDuration(user string) time.Duration {
Expand Down Expand Up @@ -178,6 +186,83 @@ func (l limitsMiddleware) Do(ctx context.Context, r queryrangebase.Request) (que
return l.next.Do(ctx, r)
}

type querySizeLimiter struct {
next queryrangebase.Handler
maxQueryBytesRead func(string) int
errorTemplate string
}

// NewQuerySizeLimiterMiddleware creates a new Middleware that enforces query size limits.
// The errorTemplate should format two strings: the bytes that would be read and the bytes limit.
func NewQuerySizeLimiterMiddleware(maxQueryBytesRead func(string) int, errorTemplate string) queryrangebase.Middleware {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Each request will send a request to the index-gws to sample the index. Here's a few things we'll want to take into account

  • Skip non-TSDB schemas since they don't have support for this.
  • All TSDB requests already query the index-gw for the same data to use in query planning (guessing correct shard factors). Look at how shard_resolver.go implements this, but we'll want to make sure we only pre-query the index once per request. Thinking about this more, we'll need to query the index for the entire query time-range first to figure out if the query is too large. This will effectively double the index load for IndexStats requests traffic, but I think that's OK for now (we can optimize that part later if/when it becomes an issue). We will want to split these first-pass IndexStats requests in a maximum of 24h chunks since our indices are sharded by 24h periods. Take a look at NewSeriesTripperware for an idea of how this works. Note: adding this limit may expose new bottlenecks in our idx-gws+tsdb that we need to address.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Skip non-TSDB schemas since they don't have support for this.

Implemented in 1674e67.

We will want to split these first-pass IndexStats requests in a maximum of 24h chunks since our indices are sharded by 24h periods. Take a look at NewSeriesTripperware for an idea of how this works. Note: adding this limit may expose new bottlenecks in our idx-gws+tsdb that we need to address.

By "first-pass IndexStats requests" here: Do you mean the IndexStat requests to get the stats for all the matcher groups in the whole query (before splitting and sharding)?

I'm not sure I understand the 24h part, do we have any docs so I can better understand this 24h sharding at the index level?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

return queryrangebase.MiddlewareFunc(func(next queryrangebase.Handler) queryrangebase.Handler {
return &querySizeLimiter{
next: next,
maxQueryBytesRead: maxQueryBytesRead,
errorTemplate: errorTemplate,
}
})
}

// getIndexStatsForRequest return the index stats for the matchers in r's query
func (q *querySizeLimiter) getIndexStatsForRequest(ctx context.Context, r queryrangebase.Request) (*logproto.IndexStatsResponse, error) {
matchers, err := syntax.ExtractMatchersFromQuery(r.GetQuery())
if err != nil {
return nil, err
}

// Get Stats for this query
var indexStatsReq definitions.Request = &logproto.IndexStatsRequest{}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, it's expected that the stats can report more bytes than what is actually read. What's the maximum difference we have seen or are aware of? I'm concerned about a query that would run just ok being refused because the stats misreported that the query would load too much data.

If we know that the stats values can be up to X% bigger than the true values, maybe we can add that X% as a threshold to the limit set by the user?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can also just target the bytes returned by the index, which seems more straightforward

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can also just target the bytes returned by the index, which seems more straightforward.

That's what the IndexStatsRequest returns, doesn't it? If so, my comment above still applies: IIRC the IndexStatsResponse can return way more bytes than what is actually in the chunks.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The index will return the bytes in the chunks, but we may not need to query them all (in the case of reaching the limit first). There is some duplication due to replication_factor (which still needs to be deduped at query time) and some chunks being recorded in multiple indices (which are time-sharded by day). I don't think this really matters though. We'll just set the expected limit based on what the index returns rather than the amount of data that ends up being queried, which is harder to measure/not known before we run the query (in the case of limited queries).

For instance, if the IndexStats returns 100GB but we regularly query 70GB of underlying data, that's fine. We'll just set the query.max-bytes=100GB. Basically, base our idea of what the limit should be on the IndexStats response, not the amount of data we ended up querying.

indexStatsReq = indexStatsReq.WithStartEnd(r.GetStart(), r.GetEnd())
indexStatsReq = indexStatsReq.WithQuery(syntax.MatchersString(matchers))

resp, err := q.next.Do(ctx, indexStatsReq)
if err != nil {
return nil, err
}

return resp.(*IndexStatsResponse).Response, nil

}

// skipRequestType returns whether we should enforce the q.maxQueryBytesRead limit
// on the r request type.
// This is needed when we have two instances of this querySizeLimiter in the same middleware pipeline
// since we don't want to compute the stats for the stats request from the upper querySizeLimiter.
func (q *querySizeLimiter) skipRequestType(r queryrangebase.Request) bool {
_, ok := r.(*logproto.IndexStatsRequest)
return ok
}

func (q *querySizeLimiter) Do(ctx context.Context, r queryrangebase.Request) (queryrangebase.Response, error) {
if q.skipRequestType(r) {
return q.next.Do(ctx, r)
}

log, ctx := spanlogger.New(ctx, "query_size_limits")
defer log.Finish()

tenantIDs, err := tenant.TenantIDs(ctx)
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}

if maxBytesRead := validation.SmallestPositiveNonZeroIntPerTenant(tenantIDs, q.maxQueryBytesRead); maxBytesRead > 0 {
stats, err := q.getIndexStatsForRequest(ctx, r)
if err != nil {
return nil, httpgrpc.Errorf(http.StatusInternalServerError, "Failed to get index stats for query", err.Error())
}

if int(stats.Bytes) > maxBytesRead {
statsBytesStr := flagext.ByteSize(stats.Bytes).String()
maxBytesReadStr := flagext.ByteSize(maxBytesRead).String()
return nil, httpgrpc.Errorf(http.StatusBadRequest, q.errorTemplate, statsBytesStr, maxBytesReadStr)
}
}

return q.next.Do(ctx, r)
}

type seriesLimiter struct {
hashes map[uint64]struct{}
rw sync.RWMutex
Expand Down
12 changes: 12 additions & 0 deletions pkg/querier/queryrange/queryrangebase/results_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,19 @@ func NewResultsCacheMiddleware(
}), nil
}

// skipRequestType returns whether we should cache the r request type.
// This is needed when we have middlewares that send different requests types down
// in the pipeline that do not support caching.
func (s *resultsCache) skipRequestType(r Request) bool {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is needed to avoid caching the IndexStats request sent down the middleware pipeline by the querySizeLimiter. The same applies to sharding and splitting as these panic if the quest type is an IndexStats request.

I feel this type-checking solution is a bit too hacky. I think it would be nice if we could set a flag in the context to skip caching. Same for splitting and sharding. In the querySizeLimiter.Do function we would clone the input request context and enable these new flags so the stats request is not cached, split, or sharded.

I don't think implementing that would be too complicated. Moreover, we could set those context flags by looking at the HTTP request headers, which can be handy for debugging and testing. Wdyt?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than threading this logic through our middleware chain, what if we had a separate skip middleware chain for pre-querying these IndexStats requests? Look at how NewQueryShardMiddleware embeds two different middleware chains depending on whether we should use sharding or not. This would allow us to handle this without subsequent middlewares needing to be aware of this.

Another idea for later: what if we cached IndexStats requests in a results cache? What if we cached other types of index queries in results caches?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Look at how NewQueryShardMiddleware embeds two different middleware chains depending on whether we should use sharding or not. This would allow us to handle this without subsequent middlewares needing to be aware of this.

IIUC, the pattern used in NewQueryShardMiddleware is useful only for bypassing that middleware itself, not the middlewares above or below the shard middleware itself in the pipeline.

In other words, when sharding is disabled, the request will skip the sharding-related middleware (shardingware) configured in NewQueryShardMiddleware but it will still go throughout the rest of middlewares configured in NewMetricTripperware after the sharding middleware (for example, it will go throughout the NewRetryMiddleware). The same applies to the IndexStats req done at dynamicShardResolver.Shard it skips the sharding mechanism, but not whatever is configured after sharding.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've implemented something similar at 648589e and 75b1d54.

_, ok := r.(*logproto.IndexStatsRequest)
return ok
}

func (s resultsCache) Do(ctx context.Context, r Request) (Response, error) {
if s.skipRequestType(r) {
return s.next.Do(ctx, r)
}

tenantIDs, err := tenant.TenantIDs(ctx)
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
Expand Down
19 changes: 19 additions & 0 deletions pkg/querier/queryrange/querysharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,26 @@ type shardSplitter struct {
now func() time.Time // injectable time.Now
}

// skipRequestType returns whether we should apply sharding on the r request type.
// This is needed when we have middlewares that send different requests types down
// in the pipeline that do not support sharding.
func (splitter *shardSplitter) skipRequestType(r queryrangebase.Request) bool {
if _, ok := r.(*LokiRequest); ok {
return false
}

if _, ok := r.(*LokiInstantRequest); ok {
return false
}

return true
}

func (splitter *shardSplitter) Do(ctx context.Context, r queryrangebase.Request) (queryrangebase.Response, error) {
if splitter.skipRequestType(r) {
return splitter.next.Do(ctx, r)
}

tenantIDs, err := tenant.TenantIDs(ctx)
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
Expand Down
38 changes: 37 additions & 1 deletion pkg/querier/queryrange/roundtrip.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ func NewLogFilterTripperware(
StatsCollectorMiddleware(),
NewLimitsMiddleware(limits),
queryrangebase.InstrumentMiddleware("split_by_interval", metrics.InstrumentMiddlewareMetrics),
queryrangebase.InstrumentMiddleware("split_by_interval", metrics.InstrumentMiddlewareMetrics),
salvacorts marked this conversation as resolved.
Show resolved Hide resolved
SplitByIntervalMiddleware(schema.Configs, limits, codec, splitByTime, metrics.SplitByMetrics),
}

Expand Down Expand Up @@ -326,6 +327,12 @@ func NewLogFilterTripperware(
)
}

// Limit the bytes the sub-queries would fetch after splitting and sharding
queryRangeMiddleware = append(
queryRangeMiddleware,
NewQuerySizeLimiterMiddleware(limits.MaxQueryBytesRead, limErrSubqueryTooManyBytesTmpl),
)

if cfg.MaxRetries > 0 {
queryRangeMiddleware = append(
queryRangeMiddleware, queryrangebase.InstrumentMiddleware("retry", metrics.InstrumentMiddlewareMetrics),
Expand Down Expand Up @@ -354,6 +361,7 @@ func NewLimitedTripperware(
queryRangeMiddleware := []queryrangebase.Middleware{
StatsCollectorMiddleware(),
NewLimitsMiddleware(limits),
NewQuerySizeLimiterMiddleware(limits.MaxQueryBytesRead, limErrQueryTooManyBytesTmpl),
queryrangebase.InstrumentMiddleware("split_by_interval", metrics.InstrumentMiddlewareMetrics),
// Limited queries only need to fetch up to the requested line limit worth of logs,
// Our defaults for splitting and parallelism are much too aggressive for large customers and result in
Expand Down Expand Up @@ -396,6 +404,12 @@ func NewLimitedTripperware(
)
}

// Limit the bytes the sub-queries would fetch after splitting and sharding
queryRangeMiddleware = append(
queryRangeMiddleware,
NewQuerySizeLimiterMiddleware(limits.MaxQueryBytesRead, limErrSubqueryTooManyBytesTmpl),
)

if cfg.MaxRetries > 0 {
queryRangeMiddleware = append(
queryRangeMiddleware, queryrangebase.InstrumentMiddleware("retry", metrics.InstrumentMiddlewareMetrics),
Expand Down Expand Up @@ -506,7 +520,11 @@ func NewMetricTripperware(
metrics *Metrics,
registerer prometheus.Registerer,
) (queryrangebase.Tripperware, error) {
queryRangeMiddleware := []queryrangebase.Middleware{StatsCollectorMiddleware(), NewLimitsMiddleware(limits)}
queryRangeMiddleware := []queryrangebase.Middleware{
StatsCollectorMiddleware(),
NewLimitsMiddleware(limits),
}

if cfg.AlignQueriesWithStep {
queryRangeMiddleware = append(
queryRangeMiddleware,
Expand All @@ -515,6 +533,12 @@ func NewMetricTripperware(
)
}

// Limit the bytes the query would fetch regardless of splitting and sharding.
queryRangeMiddleware = append(
queryRangeMiddleware,
NewQuerySizeLimiterMiddleware(limits.MaxQueryBytesRead, limErrQueryTooManyBytesTmpl),
)

queryRangeMiddleware = append(
queryRangeMiddleware,
queryrangebase.InstrumentMiddleware("split_by_interval", metrics.InstrumentMiddlewareMetrics),
Expand Down Expand Up @@ -570,6 +594,12 @@ func NewMetricTripperware(
)
}

// Limit the bytes the sub-queries would fetch after splitting and sharding
queryRangeMiddleware = append(
queryRangeMiddleware,
NewQuerySizeLimiterMiddleware(limits.MaxSubqueryBytesRead, limErrSubqueryTooManyBytesTmpl),
)

if cfg.MaxRetries > 0 {
queryRangeMiddleware = append(
queryRangeMiddleware,
Expand Down Expand Up @@ -618,6 +648,12 @@ func NewInstantMetricTripperware(
)
}

// Limit the bytes the sub-queries would fetch after sharding
queryRangeMiddleware = append(
queryRangeMiddleware,
NewQuerySizeLimiterMiddleware(limits.MaxSubqueryBytesRead, limErrSubqueryTooManyBytesTmpl),
)

if cfg.MaxRetries > 0 {
queryRangeMiddleware = append(
queryRangeMiddleware,
Expand Down
8 changes: 6 additions & 2 deletions pkg/querier/queryrange/split_by_interval.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,13 +317,17 @@ func reduceSplitIntervalForRangeVector(r queryrangebase.Request, interval time.D
func splitMetricByTime(r queryrangebase.Request, interval time.Duration) ([]queryrangebase.Request, error) {
var reqs []queryrangebase.Request

lokiReq, ok := r.(*LokiRequest)
if !ok {
// If this type of request cannot get split
return []queryrangebase.Request{}, nil
}

interval, err := reduceSplitIntervalForRangeVector(r, interval)
if err != nil {
return nil, err
}

lokiReq := r.(*LokiRequest)

// step align start and end time of the query. Start time is rounded down and end time is rounded up.
stepNs := r.GetStep() * 1e6
startNs := lokiReq.StartTs.UnixNano()
Expand Down
21 changes: 19 additions & 2 deletions pkg/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,10 @@ type Limits struct {
QueryTimeout model.Duration `yaml:"query_timeout" json:"query_timeout"`

// Query frontend enforced limits. The default is actually parameterized by the queryrange config.
QuerySplitDuration model.Duration `yaml:"split_queries_by_interval" json:"split_queries_by_interval"`
MinShardingLookback model.Duration `yaml:"min_sharding_lookback" json:"min_sharding_lookback"`
QuerySplitDuration model.Duration `yaml:"split_queries_by_interval" json:"split_queries_by_interval"`
MinShardingLookback model.Duration `yaml:"min_sharding_lookback" json:"min_sharding_lookback"`
MaxQueryBytesRead flagext.ByteSize `yaml:"max_query_bytes_read" json:"max_query_bytes_read"`
MaxSubqueryBytesRead flagext.ByteSize `yaml:"max_subquery_bytes_read" json:"max_subquery_bytes_read"`
Copy link
Contributor Author

@salvacorts salvacorts Mar 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We had this open question in the epic:

Where should we enforce the querier level limit?

I decided to implement this check on the frontend rather than on the querier since that way the query and subqueries limits enforcement are close to each other on the same component.

salvacorts marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
MaxSubqueryBytesRead flagext.ByteSize `yaml:"max_subquery_bytes_read" json:"max_subquery_bytes_read"`
MaxQuerierBytesRead flagext.ByteSize `yaml:"max_querier_bytes_read" json:"max_querier_bytes_read"`

nit: naming as specified earlier

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


// Ruler defaults and limits.
RulerEvaluationDelay model.Duration `yaml:"ruler_evaluation_delay_duration" json:"ruler_evaluation_delay_duration"`
Expand Down Expand Up @@ -223,6 +225,11 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
_ = l.MinShardingLookback.Set("0s")
f.Var(&l.MinShardingLookback, "frontend.min-sharding-lookback", "Limit queries that can be sharded. Queries within the time range of now and now minus this sharding lookback are not sharded. The default value of 0s disables the lookback, causing sharding of all queries at all times.")

_ = l.MaxQueryBytesRead.Set("0B")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
_ = l.MaxQueryBytesRead.Set("0B")

nit: 0B is the default when unset

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you!

f.Var(&l.MaxQueryBytesRead, "frontend.max-query-bytes-read", "TODO: Max number of bytes a query would fetch")
_ = l.MaxSubqueryBytesRead.Set("0B")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
_ = l.MaxSubqueryBytesRead.Set("0B")

nit: 0B is the default zero value when unset

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you!

f.Var(&l.MaxSubqueryBytesRead, "frontend.max-subquery-bytes-read", "TODO: Max number of bytes a sub query would fetch after splitting and sharding")

_ = l.MaxCacheFreshness.Set("1m")
f.Var(&l.MaxCacheFreshness, "frontend.max-cache-freshness", "Most recent allowed cacheable result per-tenant, to prevent caching very recent results that might still be in flux.")

Expand Down Expand Up @@ -474,6 +481,16 @@ func (o *Overrides) QuerySplitDuration(userID string) time.Duration {
return time.Duration(o.getOverridesForUser(userID).QuerySplitDuration)
}

// MaxQueryBytesRead returns the maximum bytes a query can read.
func (o *Overrides) MaxQueryBytesRead(userID string) int {
return o.getOverridesForUser(userID).MaxQueryBytesRead.Val()
}
jeschkies marked this conversation as resolved.
Show resolved Hide resolved

// MaxSubqueryBytesRead returns the maximum bytes a sub query can read after splitting and sharding.
func (o *Overrides) MaxSubqueryBytesRead(userID string) int {
return o.getOverridesForUser(userID).MaxSubqueryBytesRead.Val()
}

// MaxConcurrentTailRequests returns the limit to number of concurrent tail requests.
func (o *Overrides) MaxConcurrentTailRequests(userID string) int {
return o.getOverridesForUser(userID).MaxConcurrentTailRequests
Expand Down