-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Max bytes read limit #8670
Max bytes read limit #8670
Changes from 50 commits
91e7206
1f81aea
77895e8
1e2fc1b
f149835
2043438
94f941b
0aea9e1
4780298
798313a
4aa7a34
2a1df3e
1674e67
648589e
aaf90e5
c9c8610
75b1d54
af1c0d2
a024f95
db3fc7c
e3eb162
d57b79b
d5665f2
7bc5eaf
8c26b1e
1818e8d
877e39e
5d87192
deeca4a
43d96bc
ddce6aa
350e15c
226a076
28c27fa
954407e
3d2fff3
1704c5c
9b8aa08
77b046f
ff84730
c350297
f1ba7eb
8ec053c
f422e0a
54ec3ba
4f11f25
c482fe9
db98d7c
8515b43
cfcc4e4
6661d30
9c62ef7
17bed47
5a591b3
10f5a43
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,29 +5,37 @@ import ( | |
"fmt" | ||
"net/http" | ||
"sort" | ||
"strings" | ||
"sync" | ||
"time" | ||
|
||
"github.com/dustin/go-humanize" | ||
"github.com/go-kit/log" | ||
"github.com/go-kit/log/level" | ||
"github.com/grafana/dskit/tenant" | ||
"github.com/opentracing/opentracing-go" | ||
"github.com/pkg/errors" | ||
"github.com/prometheus/common/model" | ||
"github.com/prometheus/prometheus/model/timestamp" | ||
"github.com/weaveworks/common/httpgrpc" | ||
"github.com/weaveworks/common/user" | ||
|
||
"github.com/grafana/loki/pkg/logproto" | ||
"github.com/grafana/loki/pkg/logql" | ||
"github.com/grafana/loki/pkg/logql/syntax" | ||
"github.com/grafana/loki/pkg/querier/queryrange/queryrangebase" | ||
"github.com/grafana/loki/pkg/storage/config" | ||
"github.com/grafana/loki/pkg/storage/stores/index/stats" | ||
"github.com/grafana/loki/pkg/util" | ||
util_log "github.com/grafana/loki/pkg/util/log" | ||
"github.com/grafana/loki/pkg/util/spanlogger" | ||
"github.com/grafana/loki/pkg/util/validation" | ||
) | ||
|
||
const ( | ||
limitErrTmpl = "maximum of series (%d) reached for a single query" | ||
limitErrTmpl = "maximum of series (%d) reached for a single query" | ||
limErrQueryTooManyBytesTmpl = "the query would read too many bytes (query: %s, limit: %s). Consider adding more specific stream selectors or reduce the time range of the query" | ||
limErrQuerierTooManyBytesTmpl = "query too large to execute on a single querier, either because parallelization is not enabled, the query is unshardable, or a shard query is too big to execute: (query: %s, limit: %s). Consider adding more specific stream selectors or reduce the time range of the query" | ||
) | ||
|
||
var ( | ||
|
@@ -45,13 +53,16 @@ type Limits interface { | |
// TSDBMaxQueryParallelism returns the limit to the number of split queries the | ||
// frontend will process in parallel for TSDB queries. | ||
TSDBMaxQueryParallelism(context.Context, string) int | ||
MaxQueryBytesRead(context.Context, string) int | ||
MaxQuerierBytesRead(context.Context, string) int | ||
} | ||
|
||
type limits struct { | ||
Limits | ||
// Use pointers so nil value can indicate if the value was set. | ||
splitDuration *time.Duration | ||
maxQueryParallelism *int | ||
maxQueryBytesRead *int | ||
} | ||
|
||
func (l limits) QuerySplitDuration(user string) time.Duration { | ||
|
@@ -179,6 +190,178 @@ func (l limitsMiddleware) Do(ctx context.Context, r queryrangebase.Request) (que | |
return l.next.Do(ctx, r) | ||
} | ||
|
||
type querySizeLimiter struct { | ||
logger log.Logger | ||
next queryrangebase.Handler | ||
statsHandler queryrangebase.Handler | ||
cfg []config.PeriodConfig | ||
maxLookBackPeriod time.Duration | ||
limitFunc func(context.Context, string) int | ||
limitErrorTmpl string | ||
} | ||
|
||
func newQuerySizeLimiter( | ||
next queryrangebase.Handler, | ||
cfg []config.PeriodConfig, | ||
logger log.Logger, | ||
limits Limits, | ||
codec queryrangebase.Codec, | ||
limitFunc func(context.Context, string) int, | ||
limitErrorTmpl string, | ||
statsHandler ...queryrangebase.Handler, | ||
) *querySizeLimiter { | ||
q := &querySizeLimiter{ | ||
logger: logger, | ||
next: next, | ||
cfg: cfg, | ||
limitFunc: limitFunc, | ||
limitErrorTmpl: limitErrorTmpl, | ||
} | ||
|
||
q.statsHandler = next | ||
if len(statsHandler) > 0 { | ||
q.statsHandler = statsHandler[0] | ||
} | ||
|
||
// Parallelize the index stats requests, so it doesn't send a huge request to a single index-gw (i.e. {app=~".+"} for 30d). | ||
// Indices are sharded by 24 hours, so we split the stats request in 24h intervals. | ||
statsSplitTimeMiddleware := SplitByIntervalMiddleware(cfg, WithSplitByLimits(limits, 24*time.Hour), codec, splitByTime, nil) | ||
q.statsHandler = statsSplitTimeMiddleware.Wrap(q.statsHandler) | ||
|
||
// Get MaxLookBackPeriod from downstream engine. This is needed for instant limited queries at getStatsForMatchers | ||
ng := logql.NewDownstreamEngine(logql.EngineOpts{LogExecutingQuery: false}, DownstreamHandler{next: next, limits: limits}, limits, logger) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should try to get this from the appropriate config instead of by creating a new engine |
||
q.maxLookBackPeriod = ng.Opts().MaxLookBackPeriod | ||
|
||
return q | ||
} | ||
|
||
// NewQuerierSizeLimiterMiddleware creates a new Middleware that enforces query size limits after sharding and splitting. | ||
// The errorTemplate should format two strings: the bytes that would be read and the bytes limit. | ||
func NewQuerierSizeLimiterMiddleware( | ||
cfg []config.PeriodConfig, | ||
logger log.Logger, | ||
limits Limits, | ||
codec queryrangebase.Codec, | ||
statsHandler ...queryrangebase.Handler, | ||
) queryrangebase.Middleware { | ||
return queryrangebase.MiddlewareFunc(func(next queryrangebase.Handler) queryrangebase.Handler { | ||
return newQuerySizeLimiter(next, cfg, logger, limits, codec, limits.MaxQuerierBytesRead, limErrQuerierTooManyBytesTmpl, statsHandler...) | ||
}) | ||
} | ||
|
||
// NewQuerySizeLimiterMiddleware creates a new Middleware that enforces query size limits. | ||
// The errorTemplate should format two strings: the bytes that would be read and the bytes limit. | ||
func NewQuerySizeLimiterMiddleware( | ||
cfg []config.PeriodConfig, | ||
logger log.Logger, | ||
limits Limits, | ||
codec queryrangebase.Codec, | ||
statsHandler ...queryrangebase.Handler, | ||
) queryrangebase.Middleware { | ||
return queryrangebase.MiddlewareFunc(func(next queryrangebase.Handler) queryrangebase.Handler { | ||
return newQuerySizeLimiter(next, cfg, logger, limits, codec, limits.MaxQueryBytesRead, limErrQueryTooManyBytesTmpl, statsHandler...) | ||
}) | ||
} | ||
|
||
// getBytesReadForRequest returns the number of bytes that would be read for the query in r. | ||
// Since the query expression may contain multiple stream matchers, this function sums up the | ||
// bytes that will be read for each stream. | ||
// E.g. for the following query: | ||
// | ||
// count_over_time({job="foo"}[5m]) / count_over_time({job="bar"}[5m] offset 10m) | ||
// | ||
// this function will sum the bytes read for each of the following streams, taking into account | ||
// individual intervals and offsets | ||
// - {job="foo"} | ||
// - {job="bar"} | ||
func (q *querySizeLimiter) getBytesReadForRequest(ctx context.Context, r queryrangebase.Request) (uint64, error) { | ||
sp, ctx := spanlogger.NewWithLogger(ctx, q.logger, "querySizeLimiter.getBytesReadForRequest") | ||
defer sp.Finish() | ||
|
||
expr, err := syntax.ParseExpr(r.GetQuery()) | ||
if err != nil { | ||
return 0, err | ||
} | ||
|
||
matcherGroups, err := syntax.MatcherGroups(expr) | ||
if err != nil { | ||
return 0, err | ||
} | ||
|
||
// TODO: Set concurrency dynamically as in shardResolverForConf? | ||
start := time.Now() | ||
const maxConcurrentIndexReq = 10 | ||
matcherStats, err := getStatsForMatchers(ctx, q.logger, q.statsHandler, model.Time(r.GetStart()), model.Time(r.GetEnd()), matcherGroups, maxConcurrentIndexReq, q.maxLookBackPeriod) | ||
if err != nil { | ||
return 0, err | ||
} | ||
|
||
combinedStats := stats.MergeStats(matcherStats...) | ||
|
||
level.Debug(sp).Log( | ||
append( | ||
combinedStats.LoggingKeyValues(), | ||
"msg", "queried index", | ||
"type", "combined", | ||
"len", len(matcherStats), | ||
"max_parallelism", maxConcurrentIndexReq, | ||
"duration", time.Since(start), | ||
"total_bytes", strings.Replace(humanize.Bytes(combinedStats.Bytes), " ", "", 1), | ||
)..., | ||
) | ||
|
||
return combinedStats.Bytes, nil | ||
} | ||
|
||
func (q *querySizeLimiter) getSchemaCfg(r queryrangebase.Request) (config.PeriodConfig, error) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think you can reuse There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done with c9c8610.
I don't know what you mean by this. If two consecutive PeriodConfig are TSDB, combine them into a new PeriodConfig with a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's leave it for a followup PR but yes. The idea is we can shard requests across two different TSDB schemas |
||
maxRVDuration, maxOffset, err := maxRangeVectorAndOffsetDuration(r.GetQuery()) | ||
if err != nil { | ||
return config.PeriodConfig{}, errors.New("failed to get range-vector and offset duration: " + err.Error()) | ||
} | ||
|
||
adjustedStart := int64(model.Time(r.GetStart()).Add(-maxRVDuration).Add(-maxOffset)) | ||
adjustedEnd := int64(model.Time(r.GetEnd()).Add(-maxOffset)) | ||
|
||
return ShardingConfigs(q.cfg).ValidRange(adjustedStart, adjustedEnd) | ||
} | ||
|
||
func (q *querySizeLimiter) Do(ctx context.Context, r queryrangebase.Request) (queryrangebase.Response, error) { | ||
log, ctx := spanlogger.New(ctx, "query_size_limits") | ||
defer log.Finish() | ||
|
||
// Only support TSDB | ||
schemaCfg, err := q.getSchemaCfg(r) | ||
if err != nil { | ||
return nil, httpgrpc.Errorf(http.StatusInternalServerError, "Failed to get schema config: %s", err.Error()) | ||
} | ||
if schemaCfg.IndexType != config.TSDBType { | ||
return q.next.Do(ctx, r) | ||
} | ||
|
||
tenantIDs, err := tenant.TenantIDs(ctx) | ||
if err != nil { | ||
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error()) | ||
} | ||
|
||
limitFuncCapture := func(id string) int { return q.limitFunc(ctx, id) } | ||
if maxBytesRead := validation.SmallestPositiveNonZeroIntPerTenant(tenantIDs, limitFuncCapture); maxBytesRead > 0 { | ||
bytesRead, err := q.getBytesReadForRequest(ctx, r) | ||
if err != nil { | ||
return nil, httpgrpc.Errorf(http.StatusInternalServerError, "Failed to get bytes read stats for query: %s", err.Error()) | ||
} | ||
|
||
if bytesRead > uint64(maxBytesRead) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. let's add a log line for this so we can see the limits, what the request shows, and whether it's rejected or accepted. We can then build metric queries from this line to better understand behavior There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. Few things:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it's easier to print it on success and failure so we can compare with less effort. Something like
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sounds good. Done with 6661d30. |
||
statsBytesStr := humanize.Bytes(bytesRead) | ||
maxBytesReadStr := humanize.Bytes(uint64(maxBytesRead)) | ||
errorMsg := fmt.Sprintf(q.limitErrorTmpl, statsBytesStr, maxBytesReadStr) | ||
level.Warn(log).Log("msg", errorMsg, "limitBytes", maxBytesReadStr, "queryBytes", statsBytesStr) | ||
return nil, httpgrpc.Errorf(http.StatusBadRequest, errorMsg) | ||
} | ||
} | ||
|
||
return q.next.Do(ctx, r) | ||
} | ||
|
||
type seriesLimiter struct { | ||
hashes map[uint64]struct{} | ||
rw sync.RWMutex | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we not have different error messages when the different conditions are true? This would be quite hard for me as a querier of Loki to understand. As an operator, I wouldn't know which config option to change in order to ease this limit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you mean by "when the different conditions are true"? Do you mean when both limits are surpassed?
If so, we cannot print both errors because one is enforced before the other.
We do have different error messages for
MaxQueryBytes
andMaxQuerierBytes
:MaxQueryBytes
:the query would read too many bytes (query: %s, limit: %s). Consider adding more specific stream selectors or reduce the time range of the query
MaxQuerierBytes
:query too large to execute on a single querier, either because parallelization is not enabled, the query is unshardable, or a shard query is too big to execute: (query: %s, limit: %s). Consider adding more specific stream selectors or reduce the time range of the query
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The conditions i was referring to are:
"either because parallelization is not enabled, the query is unshardable, or a shard query is too big to execute"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good point. We will implement this on a new PR.