Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Max bytes read limit #8670

Merged
merged 55 commits into from
Mar 23, 2023
Merged
Show file tree
Hide file tree
Changes from 47 commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
91e7206
Working get index stats
salvacorts Feb 27, 2023
1f81aea
Refactor where matchers are extracted
salvacorts Feb 27, 2023
77895e8
Working check for limit
salvacorts Feb 28, 2023
1e2fc1b
Check for unsuported req type so we don't need a stats roundtripper
salvacorts Feb 28, 2023
f149835
Refactor ExtractMatcher func
salvacorts Mar 1, 2023
2043438
Add qwuerier-level limit
salvacorts Mar 1, 2023
94f941b
Error message can be configured
salvacorts Mar 1, 2023
0aea9e1
Limit other query types
salvacorts Mar 1, 2023
4780298
Rename querier-level limit
salvacorts Mar 1, 2023
798313a
Add comments to `skipRequestType` func
salvacorts Mar 1, 2023
4aa7a34
Rename Subquery to Querieri in new limits
salvacorts Mar 2, 2023
2a1df3e
Support queries with multiple matchers
salvacorts Mar 2, 2023
1674e67
Check if index type is TSDB
salvacorts Mar 2, 2023
648589e
Skip middlewares for index stats
salvacorts Mar 7, 2023
aaf90e5
Extract func to get index stats for matcher groups
salvacorts Mar 7, 2023
c9c8610
Reuse ShardingConfigs methods
salvacorts Mar 7, 2023
75b1d54
Refactor roundTripper to reuse roundTripperHandler
salvacorts Mar 7, 2023
af1c0d2
Fix errors
salvacorts Mar 7, 2023
a024f95
Re-use sharding index stats to check querier size limit
salvacorts Mar 7, 2023
db3fc7c
Human-readable bytes in errors
salvacorts Mar 7, 2023
e3eb162
Split index stats req in 24h
salvacorts Mar 8, 2023
d57b79b
Use MaxLookBackPeriod
salvacorts Mar 8, 2023
d5665f2
Minor changes
salvacorts Mar 8, 2023
7bc5eaf
Use skipmiddleware on NewLimitedTripperware
salvacorts Mar 8, 2023
8c26b1e
Check unshardable queries and check bytesPerShard
salvacorts Mar 9, 2023
1818e8d
Remove debug code for unsplit index request
salvacorts Mar 9, 2023
877e39e
Fix test compile
salvacorts Mar 9, 2023
5d87192
Fix test compilation
salvacorts Mar 9, 2023
deeca4a
Merge branch 'main' into salvacorts/max_data_query_limit
salvacorts Mar 9, 2023
43d96bc
Update docs
salvacorts Mar 20, 2023
ddce6aa
Apply formatting suggestions
salvacorts Mar 20, 2023
350e15c
Enforce parallelism of at least 1 in split by interval
salvacorts Mar 20, 2023
226a076
Test query size middleware
salvacorts Mar 20, 2023
28c27fa
Test multi-matchers and offset
salvacorts Mar 20, 2023
954407e
Rename function to create size limiter
salvacorts Mar 20, 2023
3d2fff3
Docs for NewRoundTripperHandler
salvacorts Mar 20, 2023
1704c5c
Test limit on non shardable queries
salvacorts Mar 21, 2023
9b8aa08
Test checkQuerySizeLimit on shardable queries
salvacorts Mar 21, 2023
77b046f
Merge branch 'main' into salvacorts/max_data_query_limit
salvacorts Mar 21, 2023
ff84730
Support per-request limits
salvacorts Mar 21, 2023
c350297
Improve docs.
salvacorts Mar 21, 2023
f1ba7eb
Delete unused function
salvacorts Mar 21, 2023
8ec053c
Add Changelog
salvacorts Mar 21, 2023
f422e0a
Enforce limits on instant/range log and metric queries
salvacorts Mar 21, 2023
54ec3ba
Test limits in reountrip tests
salvacorts Mar 21, 2023
4f11f25
Configure retry mechanism on skipMiddleware
salvacorts Mar 22, 2023
c482fe9
Create splitByMetrics if metrics is null
salvacorts Mar 22, 2023
db98d7c
Remove MaxQuerierBytesRead from per query limits
salvacorts Mar 23, 2023
8515b43
Add advice to error message
salvacorts Mar 23, 2023
cfcc4e4
Log when limit is applied
salvacorts Mar 23, 2023
6661d30
Log both above and bellow the limit
salvacorts Mar 23, 2023
9c62ef7
Mapper returns bytesPerShard
salvacorts Mar 23, 2023
17bed47
Codex no longer needed in newASTMapperware
salvacorts Mar 23, 2023
5a591b3
Logging limit result in astMapperware.checkQuerySizeLimit
salvacorts Mar 23, 2023
10f5a43
Merge branch 'main' into salvacorts/max_data_query_limit
salvacorts Mar 23, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
* [6675](https://github.com/grafana/loki/pull/6675) **btaani**: Add logfmt expression parser for selective extraction of labels from logfmt formatted logs
* [8474](https://github.com/grafana/loki/pull/8474) **farodin91**: Add support for short-lived S3 session tokens
* [8774](https://github.com/grafana/loki/pull/8774) **slim-bean**: Add new logql template functions `bytes`, `duration`, `unixEpochMillis`, `unixEpochNanos`, `toDateInZone`, `b64Enc`, and `b64Dec`
* [8670](https://github.com/grafana/loki/pull/8670) **salvacorts** Introduce two new limits to refuse log and metric queries that would read too much data.

##### Fixes

Expand Down
11 changes: 11 additions & 0 deletions docs/sources/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -2326,6 +2326,17 @@ The `limits_config` block configures global and per-tenant limits in Loki.
# CLI flag: -frontend.min-sharding-lookback
[min_sharding_lookback: <duration> | default = 0s]

# Max number of bytes a query can fetch. Enforced in log and metric queries only
# when TSDB is used. The default value of 0 disables this limit.
# CLI flag: -frontend.max-query-bytes-read
[max_query_bytes_read: <int> | default = 0B]

# Max number of bytes a query can fetch after splitting and sharding. Enforced
# in log and metric queries only when TSDB is used. The default value of 0
# disables this limit.
# CLI flag: -frontend.max-querier-bytes-read
[max_querier_bytes_read: <int> | default = 0B]

# Duration to delay the evaluation of rules to ensure the underlying metrics
# have been pushed to Cortex.
# CLI flag: -ruler.evaluation-delay-duration
Expand Down
15 changes: 15 additions & 0 deletions pkg/querier/queryrange/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/grafana/loki/pkg/logqlmodel"
"github.com/grafana/loki/pkg/logqlmodel/stats"
"github.com/grafana/loki/pkg/querier/queryrange/queryrangebase"
indexStats "github.com/grafana/loki/pkg/storage/stores/index/stats"
"github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/httpreq"
"github.com/grafana/loki/pkg/util/marshal"
Expand Down Expand Up @@ -685,6 +686,20 @@ func (Codec) MergeResponse(responses ...queryrangebase.Response) (queryrangebase
Data: names,
Statistics: mergedStats,
}, nil
case *IndexStatsResponse:
headers := responses[0].(*IndexStatsResponse).Headers
stats := make([]*indexStats.Stats, len(responses))
for i, res := range responses {
stats[i] = res.(*IndexStatsResponse).Response
}

mergedIndexStats := indexStats.MergeStats(stats...)

return &IndexStatsResponse{
Response: &mergedIndexStats,
Headers: headers,
}, nil

default:
return nil, errors.New("unknown response in merging responses")
}
Expand Down
183 changes: 182 additions & 1 deletion pkg/querier/queryrange/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,29 +5,37 @@ import (
"fmt"
"net/http"
"sort"
"strings"
"sync"
"time"

"github.com/dustin/go-humanize"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/tenant"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/user"

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logql/syntax"
"github.com/grafana/loki/pkg/querier/queryrange/queryrangebase"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/index/stats"
"github.com/grafana/loki/pkg/util"
util_log "github.com/grafana/loki/pkg/util/log"
"github.com/grafana/loki/pkg/util/spanlogger"
"github.com/grafana/loki/pkg/util/validation"
)

const (
limitErrTmpl = "maximum of series (%d) reached for a single query"
limitErrTmpl = "maximum of series (%d) reached for a single query"
limErrQueryTooManyBytesTmpl = "the query would read too many bytes (query: %s, limit: %s)"
salvacorts marked this conversation as resolved.
Show resolved Hide resolved
limErrQuerierTooManyBytesTmpl = "query too large to execute on a single querier, either because parallelization is not enabled, the query is unshardable, or a shard query is too big to execute: (query: %s, limit: %s)"
)

var (
Expand All @@ -45,13 +53,16 @@ type Limits interface {
// TSDBMaxQueryParallelism returns the limit to the number of split queries the
// frontend will process in parallel for TSDB queries.
TSDBMaxQueryParallelism(context.Context, string) int
MaxQueryBytesRead(context.Context, string) int
MaxQuerierBytesRead(context.Context, string) int
}

type limits struct {
Limits
// Use pointers so nil value can indicate if the value was set.
splitDuration *time.Duration
maxQueryParallelism *int
maxQueryBytesRead *int
}

func (l limits) QuerySplitDuration(user string) time.Duration {
Expand Down Expand Up @@ -179,6 +190,176 @@ func (l limitsMiddleware) Do(ctx context.Context, r queryrangebase.Request) (que
return l.next.Do(ctx, r)
}

type querySizeLimiter struct {
logger log.Logger
next queryrangebase.Handler
statsHandler queryrangebase.Handler
cfg []config.PeriodConfig
maxLookBackPeriod time.Duration
limitFunc func(context.Context, string) int
limitErrorTmpl string
}

func newQuerySizeLimiter(
next queryrangebase.Handler,
cfg []config.PeriodConfig,
logger log.Logger,
limits Limits,
codec queryrangebase.Codec,
limitFunc func(context.Context, string) int,
limitErrorTmpl string,
statsHandler ...queryrangebase.Handler,
) *querySizeLimiter {
q := &querySizeLimiter{
logger: logger,
next: next,
cfg: cfg,
limitFunc: limitFunc,
limitErrorTmpl: limitErrorTmpl,
}

q.statsHandler = next
if len(statsHandler) > 0 {
q.statsHandler = statsHandler[0]
}

// Parallelize the index stats requests, so it doesn't send a huge request to a single index-gw (i.e. {app=~".+"} for 30d).
// Indices are sharded by 24 hours, so we split the stats request in 24h intervals.
statsSplitTimeMiddleware := SplitByIntervalMiddleware(cfg, WithSplitByLimits(limits, 24*time.Hour), codec, splitByTime, nil)
q.statsHandler = statsSplitTimeMiddleware.Wrap(q.statsHandler)

// Get MaxLookBackPeriod from downstream engine. This is needed for instant limited queries at getStatsForMatchers
ng := logql.NewDownstreamEngine(logql.EngineOpts{LogExecutingQuery: false}, DownstreamHandler{next: next, limits: limits}, limits, logger)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should try to get this from the appropriate config instead of by creating a new engine

q.maxLookBackPeriod = ng.Opts().MaxLookBackPeriod

return q
}

// NewQuerierSizeLimiterMiddleware creates a new Middleware that enforces query size limits after sharding and splitting.
// The errorTemplate should format two strings: the bytes that would be read and the bytes limit.
func NewQuerierSizeLimiterMiddleware(
cfg []config.PeriodConfig,
logger log.Logger,
limits Limits,
codec queryrangebase.Codec,
statsHandler ...queryrangebase.Handler,
) queryrangebase.Middleware {
return queryrangebase.MiddlewareFunc(func(next queryrangebase.Handler) queryrangebase.Handler {
return newQuerySizeLimiter(next, cfg, logger, limits, codec, limits.MaxQuerierBytesRead, limErrQuerierTooManyBytesTmpl, statsHandler...)
})
}

// NewQuerySizeLimiterMiddleware creates a new Middleware that enforces query size limits.
// The errorTemplate should format two strings: the bytes that would be read and the bytes limit.
func NewQuerySizeLimiterMiddleware(
cfg []config.PeriodConfig,
logger log.Logger,
limits Limits,
codec queryrangebase.Codec,
statsHandler ...queryrangebase.Handler,
) queryrangebase.Middleware {
return queryrangebase.MiddlewareFunc(func(next queryrangebase.Handler) queryrangebase.Handler {
return newQuerySizeLimiter(next, cfg, logger, limits, codec, limits.MaxQueryBytesRead, limErrQueryTooManyBytesTmpl, statsHandler...)
})
}

// getBytesReadForRequest returns the number of bytes that would be read for the query in r.
// Since the query expression may contain multiple stream matchers, this function sums up the
// bytes that will be read for each stream.
// E.g. for the following query:
//
// count_over_time({job="foo"}[5m]) / count_over_time({job="bar"}[5m] offset 10m)
//
// this function will sum the bytes read for each of the following streams, taking into account
// individual intervals and offsets
// - {job="foo"}
// - {job="bar"}
func (q *querySizeLimiter) getBytesReadForRequest(ctx context.Context, r queryrangebase.Request) (uint64, error) {
sp, ctx := spanlogger.NewWithLogger(ctx, q.logger, "querySizeLimiter.getBytesReadForRequest")
defer sp.Finish()

expr, err := syntax.ParseExpr(r.GetQuery())
if err != nil {
return 0, err
}

matcherGroups, err := syntax.MatcherGroups(expr)
if err != nil {
return 0, err
}

// TODO: Set concurrency dynamically as in shardResolverForConf?
start := time.Now()
const maxConcurrentIndexReq = 10
matcherStats, err := getStatsForMatchers(ctx, q.logger, q.statsHandler, model.Time(r.GetStart()), model.Time(r.GetEnd()), matcherGroups, maxConcurrentIndexReq, q.maxLookBackPeriod)
if err != nil {
return 0, err
}

combinedStats := stats.MergeStats(matcherStats...)

level.Debug(sp).Log(
append(
combinedStats.LoggingKeyValues(),
"msg", "queried index",
"type", "combined",
"len", len(matcherStats),
"max_parallelism", maxConcurrentIndexReq,
"duration", time.Since(start),
"total_bytes", strings.Replace(humanize.Bytes(combinedStats.Bytes), " ", "", 1),
)...,
)

return combinedStats.Bytes, nil
}

func (q *querySizeLimiter) getSchemaCfg(r queryrangebase.Request) (config.PeriodConfig, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can reuse ShardingConfigs here. We can actually extend that functionality in the case there are multiple period configs in the query and they all are tsdb (and have the IndexStats support).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done with c9c8610.

We can actually extend that functionality in the case there are multiple period configs in the query and they all are tsdb

I don't know what you mean by this. If two consecutive PeriodConfig are TSDB, combine them into a new PeriodConfig with a From and Interval adjusted to cover both original periods?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's leave it for a followup PR but yes. The idea is we can shard requests across two different TSDB schemas

maxRVDuration, maxOffset, err := maxRangeVectorAndOffsetDuration(r.GetQuery())
if err != nil {
return config.PeriodConfig{}, errors.New("failed to get range-vector and offset duration: " + err.Error())
}

adjustedStart := int64(model.Time(r.GetStart()).Add(-maxRVDuration).Add(-maxOffset))
adjustedEnd := int64(model.Time(r.GetEnd()).Add(-maxOffset))

return ShardingConfigs(q.cfg).ValidRange(adjustedStart, adjustedEnd)
}

func (q *querySizeLimiter) Do(ctx context.Context, r queryrangebase.Request) (queryrangebase.Response, error) {
log, ctx := spanlogger.New(ctx, "query_size_limits")
defer log.Finish()

// Only support TSDB
schemaCfg, err := q.getSchemaCfg(r)
if err != nil {
return nil, httpgrpc.Errorf(http.StatusInternalServerError, "Failed to get schema config: %s", err.Error())
}
if schemaCfg.IndexType != config.TSDBType {
return q.next.Do(ctx, r)
}

tenantIDs, err := tenant.TenantIDs(ctx)
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}

limitFuncCapture := func(id string) int { return q.limitFunc(ctx, id) }
if maxBytesRead := validation.SmallestPositiveNonZeroIntPerTenant(tenantIDs, limitFuncCapture); maxBytesRead > 0 {
bytesRead, err := q.getBytesReadForRequest(ctx, r)
if err != nil {
return nil, httpgrpc.Errorf(http.StatusInternalServerError, "Failed to get bytes read stats for query: %s", err.Error())
}

if bytesRead > uint64(maxBytesRead) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's add a log line for this so we can see the limits, what the request shows, and whether it's rejected or accepted. We can then build metric queries from this line to better understand behavior

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Few things:

  • I'm writing the log inside the if condition. So this log will only be printed when rejected. IIUC, we can still know how many queries are rejected/accepted by comparing the rate of this log message being printed and the QPS.
  • I'm printing the error message that gets forwarded to the user. That way we can distinguish between the MaxQueryBytes and the MaxQuerierBytes.
  • Even though the limit bytes and the query bytes are already printed in the error message, I'm also adding them to the log message so we can extract these values easier.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's easier to print it on success and failure so we can compare with less effort. Something like

status=accepted|rejected limit_bytes=1 limit_name=max_query|max_querier resolved_bytes=2

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. Done with 6661d30.

statsBytesStr := humanize.Bytes(bytesRead)
maxBytesReadStr := humanize.Bytes(uint64(maxBytesRead))
return nil, httpgrpc.Errorf(http.StatusBadRequest, q.limitErrorTmpl, statsBytesStr, maxBytesReadStr)
}
}

return q.next.Do(ctx, r)
}

type seriesLimiter struct {
hashes map[uint64]struct{}
rw sync.RWMutex
Expand Down
Loading