-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Max bytes read limit #8670
Max bytes read limit #8670
Changes from 10 commits
91e7206
1f81aea
77895e8
1e2fc1b
f149835
2043438
94f941b
0aea9e1
4780298
798313a
4aa7a34
2a1df3e
1674e67
648589e
aaf90e5
c9c8610
75b1d54
af1c0d2
a024f95
db3fc7c
e3eb162
d57b79b
d5665f2
7bc5eaf
8c26b1e
1818e8d
877e39e
5d87192
deeca4a
43d96bc
ddce6aa
350e15c
226a076
28c27fa
954407e
3d2fff3
1704c5c
9b8aa08
77b046f
ff84730
c350297
f1ba7eb
8ec053c
f422e0a
54ec3ba
4f11f25
c482fe9
db98d7c
8515b43
cfcc4e4
6661d30
9c62ef7
17bed47
5a591b3
10f5a43
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -10,6 +10,9 @@ import ( | |||||
|
||||||
"github.com/go-kit/log/level" | ||||||
"github.com/grafana/dskit/tenant" | ||||||
"github.com/grafana/loki/pkg/logql/syntax" | ||||||
"github.com/grafana/loki/pkg/querier/queryrange/queryrangebase/definitions" | ||||||
"github.com/grafana/loki/pkg/util/flagext" | ||||||
"github.com/opentracing/opentracing-go" | ||||||
"github.com/prometheus/common/model" | ||||||
"github.com/prometheus/prometheus/model/timestamp" | ||||||
|
@@ -27,7 +30,9 @@ import ( | |||||
) | ||||||
|
||||||
const ( | ||||||
limitErrTmpl = "maximum of series (%d) reached for a single query" | ||||||
limitErrTmpl = "maximum of series (%d) reached for a single query" | ||||||
limErrQueryTooManyBytesTmpl = "the query would read too many bytes (query: %s, limit: %s)" | ||||||
limErrSubqueryTooManyBytesTmpl = "after splitting and sharding, at least one sub-query would read too many bytes (query: %s, limit: %s)" | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done, thank you! |
||||||
) | ||||||
|
||||||
var ( | ||||||
|
@@ -45,13 +50,16 @@ type Limits interface { | |||||
// TSDBMaxQueryParallelism returns the limit to the number of split queries the | ||||||
// frontend will process in parallel for TSDB queries. | ||||||
TSDBMaxQueryParallelism(string) int | ||||||
MaxQueryBytesRead(u string) int | ||||||
MaxSubqueryBytesRead(u string) int | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd rename this to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good call, done. |
||||||
} | ||||||
|
||||||
type limits struct { | ||||||
Limits | ||||||
// Use pointers so nil value can indicate if the value was set. | ||||||
splitDuration *time.Duration | ||||||
maxQueryParallelism *int | ||||||
maxQueryBytesRead *int | ||||||
} | ||||||
|
||||||
func (l limits) QuerySplitDuration(user string) time.Duration { | ||||||
|
@@ -178,6 +186,83 @@ func (l limitsMiddleware) Do(ctx context.Context, r queryrangebase.Request) (que | |||||
return l.next.Do(ctx, r) | ||||||
} | ||||||
|
||||||
type querySizeLimiter struct { | ||||||
next queryrangebase.Handler | ||||||
maxQueryBytesRead func(string) int | ||||||
errorTemplate string | ||||||
} | ||||||
|
||||||
// NewQuerySizeLimiterMiddleware creates a new Middleware that enforces query size limits. | ||||||
// The errorTemplate should format two strings: the bytes that would be read and the bytes limit. | ||||||
func NewQuerySizeLimiterMiddleware(maxQueryBytesRead func(string) int, errorTemplate string) queryrangebase.Middleware { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Each request will send a request to the index-gws to sample the index. Here's a few things we'll want to take into account
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Implemented in 1674e67.
By "first-pass IndexStats requests" here: Do you mean the IndexStat requests to get the stats for all the matcher groups in the whole query (before splitting and sharding)? I'm not sure I understand the 24h part, do we have any docs so I can better understand this 24h sharding at the index level? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||||||
return queryrangebase.MiddlewareFunc(func(next queryrangebase.Handler) queryrangebase.Handler { | ||||||
return &querySizeLimiter{ | ||||||
next: next, | ||||||
maxQueryBytesRead: maxQueryBytesRead, | ||||||
errorTemplate: errorTemplate, | ||||||
} | ||||||
}) | ||||||
} | ||||||
|
||||||
// getIndexStatsForRequest return the index stats for the matchers in r's query | ||||||
func (q *querySizeLimiter) getIndexStatsForRequest(ctx context.Context, r queryrangebase.Request) (*logproto.IndexStatsResponse, error) { | ||||||
matchers, err := syntax.ExtractMatchersFromQuery(r.GetQuery()) | ||||||
if err != nil { | ||||||
return nil, err | ||||||
} | ||||||
|
||||||
// Get Stats for this query | ||||||
var indexStatsReq definitions.Request = &logproto.IndexStatsRequest{} | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IIUC, it's expected that the stats can report more bytes than what is actually read. What's the maximum difference we have seen or are aware of? I'm concerned about a query that would run just ok being refused because the stats misreported that the query would load too much data. If we know that the stats values can be up to X% bigger than the true values, maybe we can add that X% as a threshold to the limit set by the user? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can also just target the bytes returned by the index, which seems more straightforward There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
That's what the IndexStatsRequest returns, doesn't it? If so, my comment above still applies: IIRC the IndexStatsResponse can return way more bytes than what is actually in the chunks. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The index will return the bytes in the chunks, but we may not need to query them all (in the case of reaching the For instance, if the |
||||||
indexStatsReq = indexStatsReq.WithStartEnd(r.GetStart(), r.GetEnd()) | ||||||
indexStatsReq = indexStatsReq.WithQuery(syntax.MatchersString(matchers)) | ||||||
|
||||||
resp, err := q.next.Do(ctx, indexStatsReq) | ||||||
if err != nil { | ||||||
return nil, err | ||||||
} | ||||||
|
||||||
return resp.(*IndexStatsResponse).Response, nil | ||||||
|
||||||
} | ||||||
|
||||||
// skipRequestType returns whether we should enforce the q.maxQueryBytesRead limit | ||||||
// on the r request type. | ||||||
// This is needed when we have two instances of this querySizeLimiter in the same middleware pipeline | ||||||
// since we don't want to compute the stats for the stats request from the upper querySizeLimiter. | ||||||
func (q *querySizeLimiter) skipRequestType(r queryrangebase.Request) bool { | ||||||
_, ok := r.(*logproto.IndexStatsRequest) | ||||||
return ok | ||||||
} | ||||||
|
||||||
func (q *querySizeLimiter) Do(ctx context.Context, r queryrangebase.Request) (queryrangebase.Response, error) { | ||||||
if q.skipRequestType(r) { | ||||||
return q.next.Do(ctx, r) | ||||||
} | ||||||
|
||||||
log, ctx := spanlogger.New(ctx, "query_size_limits") | ||||||
defer log.Finish() | ||||||
|
||||||
tenantIDs, err := tenant.TenantIDs(ctx) | ||||||
if err != nil { | ||||||
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error()) | ||||||
} | ||||||
|
||||||
if maxBytesRead := validation.SmallestPositiveNonZeroIntPerTenant(tenantIDs, q.maxQueryBytesRead); maxBytesRead > 0 { | ||||||
stats, err := q.getIndexStatsForRequest(ctx, r) | ||||||
if err != nil { | ||||||
return nil, httpgrpc.Errorf(http.StatusInternalServerError, "Failed to get index stats for query", err.Error()) | ||||||
} | ||||||
|
||||||
if int(stats.Bytes) > maxBytesRead { | ||||||
statsBytesStr := flagext.ByteSize(stats.Bytes).String() | ||||||
maxBytesReadStr := flagext.ByteSize(maxBytesRead).String() | ||||||
return nil, httpgrpc.Errorf(http.StatusBadRequest, q.errorTemplate, statsBytesStr, maxBytesReadStr) | ||||||
} | ||||||
} | ||||||
|
||||||
return q.next.Do(ctx, r) | ||||||
} | ||||||
|
||||||
type seriesLimiter struct { | ||||||
hashes map[uint64]struct{} | ||||||
rw sync.RWMutex | ||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -211,7 +211,19 @@ func NewResultsCacheMiddleware( | |
}), nil | ||
} | ||
|
||
// skipRequestType returns whether we should cache the r request type. | ||
// This is needed when we have middlewares that send different requests types down | ||
// in the pipeline that do not support caching. | ||
func (s *resultsCache) skipRequestType(r Request) bool { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is needed to avoid caching the IndexStats request sent down the middleware pipeline by the I feel this type-checking solution is a bit too hacky. I think it would be nice if we could set a flag in the context to skip caching. Same for splitting and sharding. In the I don't think implementing that would be too complicated. Moreover, we could set those context flags by looking at the HTTP request headers, which can be handy for debugging and testing. Wdyt? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Rather than threading this logic through our middleware chain, what if we had a separate Another idea for later: what if we cached There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
IIUC, the pattern used in In other words, when sharding is disabled, the request will skip the sharding-related middleware ( There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
_, ok := r.(*logproto.IndexStatsRequest) | ||
return ok | ||
} | ||
|
||
func (s resultsCache) Do(ctx context.Context, r Request) (Response, error) { | ||
if s.skipRequestType(r) { | ||
return s.next.Do(ctx, r) | ||
} | ||
|
||
tenantIDs, err := tenant.TenantIDs(ctx) | ||
if err != nil { | ||
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error()) | ||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -96,8 +96,10 @@ type Limits struct { | |||||
QueryTimeout model.Duration `yaml:"query_timeout" json:"query_timeout"` | ||||||
|
||||||
// Query frontend enforced limits. The default is actually parameterized by the queryrange config. | ||||||
QuerySplitDuration model.Duration `yaml:"split_queries_by_interval" json:"split_queries_by_interval"` | ||||||
MinShardingLookback model.Duration `yaml:"min_sharding_lookback" json:"min_sharding_lookback"` | ||||||
QuerySplitDuration model.Duration `yaml:"split_queries_by_interval" json:"split_queries_by_interval"` | ||||||
MinShardingLookback model.Duration `yaml:"min_sharding_lookback" json:"min_sharding_lookback"` | ||||||
MaxQueryBytesRead flagext.ByteSize `yaml:"max_query_bytes_read" json:"max_query_bytes_read"` | ||||||
MaxSubqueryBytesRead flagext.ByteSize `yaml:"max_subquery_bytes_read" json:"max_subquery_bytes_read"` | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We had this open question in the epic:
I decided to implement this check on the frontend rather than on the querier since that way the query and subqueries limits enforcement are close to each other on the same component.
salvacorts marked this conversation as resolved.
Show resolved
Hide resolved
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
nit: naming as specified earlier There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||||||
|
||||||
// Ruler defaults and limits. | ||||||
RulerEvaluationDelay model.Duration `yaml:"ruler_evaluation_delay_duration" json:"ruler_evaluation_delay_duration"` | ||||||
|
@@ -223,6 +225,11 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { | |||||
_ = l.MinShardingLookback.Set("0s") | ||||||
f.Var(&l.MinShardingLookback, "frontend.min-sharding-lookback", "Limit queries that can be sharded. Queries within the time range of now and now minus this sharding lookback are not sharded. The default value of 0s disables the lookback, causing sharding of all queries at all times.") | ||||||
|
||||||
_ = l.MaxQueryBytesRead.Set("0B") | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
nit: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thank you! |
||||||
f.Var(&l.MaxQueryBytesRead, "frontend.max-query-bytes-read", "TODO: Max number of bytes a query would fetch") | ||||||
_ = l.MaxSubqueryBytesRead.Set("0B") | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
nit: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thank you! |
||||||
f.Var(&l.MaxSubqueryBytesRead, "frontend.max-subquery-bytes-read", "TODO: Max number of bytes a sub query would fetch after splitting and sharding") | ||||||
|
||||||
_ = l.MaxCacheFreshness.Set("1m") | ||||||
f.Var(&l.MaxCacheFreshness, "frontend.max-cache-freshness", "Most recent allowed cacheable result per-tenant, to prevent caching very recent results that might still be in flux.") | ||||||
|
||||||
|
@@ -474,6 +481,16 @@ func (o *Overrides) QuerySplitDuration(userID string) time.Duration { | |||||
return time.Duration(o.getOverridesForUser(userID).QuerySplitDuration) | ||||||
} | ||||||
|
||||||
// MaxQueryBytesRead returns the maximum bytes a query can read. | ||||||
func (o *Overrides) MaxQueryBytesRead(userID string) int { | ||||||
return o.getOverridesForUser(userID).MaxQueryBytesRead.Val() | ||||||
} | ||||||
jeschkies marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
|
||||||
// MaxSubqueryBytesRead returns the maximum bytes a sub query can read after splitting and sharding. | ||||||
func (o *Overrides) MaxSubqueryBytesRead(userID string) int { | ||||||
return o.getOverridesForUser(userID).MaxSubqueryBytesRead.Val() | ||||||
} | ||||||
|
||||||
// MaxConcurrentTailRequests returns the limit to number of concurrent tail requests. | ||||||
func (o *Overrides) MaxConcurrentTailRequests(userID string) int { | ||||||
return o.getOverridesForUser(userID).MaxConcurrentTailRequests | ||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a function
syntax.MatcherGroups
which is is what you'll want as it gives[(matchers, time_range)]
pairs within a query. This is used inqueryrange/shard_resolver.go
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call! I didn't think about the queries that may contain multiple matchers. E.g from (
TestMatcherGroups
).IIUC, similarly to in
shard_resolver.go
, we need to create a IndexStats request for each matcher. Then we need to sum the bytes for each IndexStats response. That's the amount of data a query would read.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I implemented this in 2a1df3e. Looks like it works fine. If this is what you meant, I do have some new questions 😅:
ForEach
is set to nonconfigurable values.len(matcherGroups)
or to a fixed not-so-high value (e.g. 10). But happy to change my mind.max_look_back_period
from the query start time? I think I don't quite understand why is that needed on sharding.Tested in loki-dev-009
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
max_look_back_period
is generally most important in limited instant queries (which are rarely used). If I submit{job="foo"}
withstart=end
, It would always return zero results. Prometheus has a concept of "look back amount of time for instant queries" since metric data is sampled at some configurablescrape_interval
(commonly 15s, 30s, or 1m). We sort of copy that idea and say, "find me logs from the past when start=end".There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just came across this myself. I don't think we need an extra method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed