Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store: add downloaded bytes limit #5801

Merged
merged 8 commits into from
Oct 27, 2022
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
### Added

- [#5814](https://github.com/thanos-io/thanos/pull/5814) - Add metric `thanos_bucket_store_postings_size_bytes` that shows the distribution of how many postings (in bytes) were needed for each Series() call in Thanos Store. Useful for determining limits.
- [#5801](https://github.com/thanos-io/thanos/pull/5801) Store: add a new limiter `--store.grpc.downloaded-bytes-limit` that limits the number of bytes downloaded in each Series/LabelNames/LabelValues call. Use `thanos_bucket_store_postings_size_bytes` for determining the limits.

### Changed

Expand Down
6 changes: 6 additions & 0 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type storeConfig struct {
chunkPoolSize units.Base2Bytes
maxSampleCount uint64
maxTouchedSeriesCount uint64
maxDownloadedBytes units.Base2Bytes
maxConcurrency int
component component.StoreAPI
debugLogging bool
Expand Down Expand Up @@ -109,6 +110,10 @@ func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) {
"Maximum amount of touched series returned via a single Series call. The Series call fails if this limit is exceeded. 0 means no limit.").
Default("0").Uint64Var(&sc.maxTouchedSeriesCount)

cmd.Flag("store.grpc.downloaded-bytes-limit",
"Maximum amount of downloaded (either fetched or touched) bytes in a single Series/LabelNames/LabelValues call. The Series call fails if this limit is exceeded. 0 means no limit.").
Default("0").BytesVar(&sc.maxDownloadedBytes)

cmd.Flag("store.grpc.series-max-concurrency", "Maximum number of concurrent Series calls.").Default("20").IntVar(&sc.maxConcurrency)

sc.component = component.Store
Expand Down Expand Up @@ -345,6 +350,7 @@ func runStore(
conf.dataDir,
store.NewChunksLimiterFactory(conf.maxSampleCount/store.MaxSamplesPerChunk), // The samples limit is an approximation based on the max number of samples per chunk.
store.NewSeriesLimiterFactory(conf.maxTouchedSeriesCount),
store.NewBytesLimiterFactory(conf.maxDownloadedBytes),
store.NewGapBasedPartitioner(store.PartitionerMaxGapSize),
conf.blockSyncConcurrency,
conf.advertiseCompatibilityLabel,
Expand Down
6 changes: 6 additions & 0 deletions docs/components/store.md
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,12 @@ Flags:
If true, Store Gateway will lazy memory map
index-header only once the block is required by
a query.
--store.grpc.downloaded-bytes-limit=0
Maximum amount of downloaded (either
fetched or touched) bytes in a single
Series/LabelNames/LabelValues call. The Series
call fails if this limit is exceeded. 0 means
no limit.
--store.grpc.series-max-concurrency=20
Maximum number of concurrent Series calls.
--store.grpc.series-sample-limit=0
Expand Down
1 change: 1 addition & 0 deletions internal/cortex/storegateway/bucket_stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,7 @@ func (u *BucketStores) getOrCreateStore(userID string) (*store.BucketStore, erro
u.syncDirForUser(userID),
newChunksLimiterFactory(u.limits, userID),
newSeriesLimiterFactory(u.limits, userID),
store.NewBytesLimiterFactory(0),
u.partitioner,
u.cfg.BucketStore.BlockSyncConcurrency,
false, // No need to enable backward compatibility with Thanos pre 0.8.0 queriers
Expand Down
73 changes: 59 additions & 14 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,10 @@ type BucketStore struct {
// seriesLimiterFactory creates a new limiter used to limit the number of touched series by each Series() call,
// or LabelName and LabelValues calls when used with matchers.
seriesLimiterFactory SeriesLimiterFactory
partitioner Partitioner

// bytesLimiterFactory creates a new limiter used to limit the amount of bytes fetched/touched by each Series() call.
bytesLimiterFactory BytesLimiterFactory
partitioner Partitioner

filterConfig *FilterConfig
advLabelSets []labelpb.ZLabelSet
Expand Down Expand Up @@ -420,6 +423,7 @@ func NewBucketStore(
dir string,
chunksLimiterFactory ChunksLimiterFactory,
seriesLimiterFactory SeriesLimiterFactory,
bytesLimiterFactory BytesLimiterFactory,
partitioner Partitioner,
blockSyncConcurrency int,
enableCompatibilityLabel bool,
Expand All @@ -446,6 +450,7 @@ func NewBucketStore(
queryGate: gate.NewNoop(),
chunksLimiterFactory: chunksLimiterFactory,
seriesLimiterFactory: seriesLimiterFactory,
bytesLimiterFactory: bytesLimiterFactory,
partitioner: partitioner,
enableCompatibilityLabel: enableCompatibilityLabel,
postingOffsetsInMemSampling: postingOffsetsInMemSampling,
Expand Down Expand Up @@ -815,14 +820,15 @@ func blockSeries(
matchers []*labels.Matcher,
chunksLimiter ChunksLimiter,
seriesLimiter SeriesLimiter,
bytesLimiter BytesLimiter, // Rate limiter for used bytes.
skipChunks bool,
minTime, maxTime int64,
loadAggregates []storepb.Aggr,
shardMatcher *storepb.ShardMatcher,
emptyPostingsCount prometheus.Counter,
calculateChunkHash bool,
) (storepb.SeriesSet, *queryStats, error) {
ps, err := indexr.ExpandedPostings(ctx, matchers)
ps, err := indexr.ExpandedPostings(ctx, matchers, bytesLimiter)
if err != nil {
return nil, nil, errors.Wrap(err, "expanded matching posting")
}
Expand All @@ -840,7 +846,7 @@ func blockSeries(
// Preload all series index data.
// TODO(bwplotka): Consider not keeping all series in memory all the time.
// TODO(bwplotka): Do lazy loading in one step as `ExpandingPostings` method.
if err := indexr.PreloadSeries(ctx, ps); err != nil {
if err := indexr.PreloadSeries(ctx, ps, bytesLimiter); err != nil {
return nil, nil, errors.Wrap(err, "preload series")
}

Expand Down Expand Up @@ -905,7 +911,7 @@ func blockSeries(
return newBucketSeriesSet(res), indexr.stats, nil
}

if err := chunkr.load(ctx, res, loadAggregates, calculateChunkHash); err != nil {
if err := chunkr.load(ctx, res, loadAggregates, calculateChunkHash, bytesLimiter); err != nil {
return nil, nil, errors.Wrap(err, "load chunks")
}

Expand Down Expand Up @@ -1053,6 +1059,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
req.MaxTime = s.limitMaxTime(req.MaxTime)

var (
bytesLimiter = s.bytesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("bytes"))
ctx = srv.Context()
stats = &queryStats{}
res []storepb.SeriesSet
Expand Down Expand Up @@ -1128,6 +1135,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
blockMatchers,
chunksLimiter,
seriesLimiter,
bytesLimiter,
req.SkipChunks,
req.MinTime, req.MaxTime,
req.Aggregates,
Expand Down Expand Up @@ -1292,6 +1300,7 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq
var mtx sync.Mutex
var sets [][]string
var seriesLimiter = s.seriesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("series"))
var bytesLimiter = s.bytesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("bytes"))

for _, b := range s.blocks {
b := b
Expand Down Expand Up @@ -1350,6 +1359,7 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq
reqSeriesMatchersNoExtLabels,
nil,
seriesLimiter,
bytesLimiter,
true,
req.Start,
req.End,
Expand Down Expand Up @@ -1458,6 +1468,7 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR
var mtx sync.Mutex
var sets [][]string
var seriesLimiter = s.seriesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("series"))
var bytesLimiter = s.bytesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("bytes"))

for _, b := range s.blocks {
b := b
Expand Down Expand Up @@ -1519,6 +1530,7 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR
reqSeriesMatchersNoExtLabels,
nil,
seriesLimiter,
bytesLimiter,
true,
req.Start,
req.End,
Expand Down Expand Up @@ -1913,7 +1925,7 @@ func newBucketIndexReader(block *bucketBlock) *bucketIndexReader {
// Reminder: A posting is a reference (represented as a uint64) to a series reference, which in turn points to the first
// chunk where the series contains the matching label-value pair for a given block of data. Postings can be fetched by
// single label name=value.
func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms []*labels.Matcher) ([]storage.SeriesRef, error) {
func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms []*labels.Matcher, bytesLimiter BytesLimiter) ([]storage.SeriesRef, error) {
var (
postingGroups []*postingGroup
allRequested = false
Expand Down Expand Up @@ -1962,7 +1974,7 @@ func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms []*labels.M
keys = append(keys, allPostingsLabel)
}

fetchedPostings, err := r.fetchPostings(ctx, keys)
fetchedPostings, err := r.fetchPostings(ctx, keys, bytesLimiter)
if err != nil {
return nil, errors.Wrap(err, "get postings")
}
Expand Down Expand Up @@ -2102,7 +2114,7 @@ type postingPtr struct {
// fetchPostings fill postings requested by posting groups.
// It returns one postings for each key, in the same order.
// If postings for given key is not fetched, entry at given index will be nil.
func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Label) ([]index.Postings, error) {
func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Label, bytesLimiter BytesLimiter) ([]index.Postings, error) {
timer := prometheus.NewTimer(r.block.metrics.postingsFetchDuration)
defer timer.ObserveDuration()

Expand All @@ -2112,6 +2124,11 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab

// Fetch postings from the cache with a single call.
fromCache, _ := r.block.indexCache.FetchMultiPostings(ctx, r.block.meta.ULID, keys)
for _, dataFromCache := range fromCache {
if err := bytesLimiter.Reserve(uint64(len(dataFromCache))); err != nil {
return nil, errors.Wrap(err, "bytes limit exceeded while loading postings from index cache")
}
}

// Iterate over all groups and fetch posting from cache.
// If we have a miss, mark key to be fetched in `ptrs` slice.
Expand Down Expand Up @@ -2174,6 +2191,15 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab
return uint64(ptrs[i].ptr.Start), uint64(ptrs[i].ptr.End)
})

for _, part := range parts {
start := int64(part.Start)
length := int64(part.End) - start

if err := bytesLimiter.Reserve(uint64(length)); err != nil {
return nil, errors.Wrap(err, "bytes limit exceeded while fetching postings")
}
}

g, ctx := errgroup.WithContext(ctx)
for _, part := range parts {
i, j := part.ElemRng[0], part.ElemRng[1]
Expand Down Expand Up @@ -2320,7 +2346,7 @@ func (it *bigEndianPostings) length() int {
return len(it.list) / 4
}

func (r *bucketIndexReader) PreloadSeries(ctx context.Context, ids []storage.SeriesRef) error {
func (r *bucketIndexReader) PreloadSeries(ctx context.Context, ids []storage.SeriesRef, bytesLimiter BytesLimiter) error {
timer := prometheus.NewTimer(r.block.metrics.seriesFetchDuration)
defer timer.ObserveDuration()

Expand All @@ -2329,26 +2355,36 @@ func (r *bucketIndexReader) PreloadSeries(ctx context.Context, ids []storage.Ser
fromCache, ids := r.block.indexCache.FetchMultiSeries(ctx, r.block.meta.ULID, ids)
for id, b := range fromCache {
r.loadedSeries[id] = b
if err := bytesLimiter.Reserve(uint64(len(b))); err != nil {
return errors.Wrap(err, "exceeded bytes limit while loading series from index cache")
}
}

parts := r.block.partitioner.Partition(len(ids), func(i int) (start, end uint64) {
return uint64(ids[i]), uint64(ids[i] + maxSeriesSize)
})

g, ctx := errgroup.WithContext(ctx)
for _, p := range parts {
s, e := p.Start, p.End
i, j := p.ElemRng[0], p.ElemRng[1]

g.Go(func() error {
return r.loadSeries(ctx, ids[i:j], false, s, e)
return r.loadSeries(ctx, ids[i:j], false, s, e, bytesLimiter)
})
}
return g.Wait()
}

func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []storage.SeriesRef, refetch bool, start, end uint64) error {
func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []storage.SeriesRef, refetch bool, start, end uint64, bytesLimiter BytesLimiter) error {
begin := time.Now()

if bytesLimiter != nil {
if err := bytesLimiter.Reserve(uint64(end - start)); err != nil {
return errors.Wrap(err, "exceeded bytes limit while fetching series")
}
}

b, err := r.block.readIndexRange(ctx, int64(start), int64(end-start))
if err != nil {
return errors.Wrap(err, "read series range")
Expand Down Expand Up @@ -2378,7 +2414,7 @@ func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []storage.Series
level.Warn(r.block.logger).Log("msg", "series size exceeded expected size; refetching", "id", id, "series length", n+int(l), "maxSeriesSize", maxSeriesSize)

// Fetch plus to get the size of next one if exists.
return r.loadSeries(ctx, ids[i:], true, uint64(id), uint64(id)+uint64(n+int(l)+1))
return r.loadSeries(ctx, ids[i:], true, uint64(id), uint64(id)+uint64(n+int(l)+1), bytesLimiter)
}
c = c[n : n+int(l)]
r.mtx.Lock()
Expand Down Expand Up @@ -2598,7 +2634,7 @@ func (r *bucketChunkReader) addLoad(id chunks.ChunkRef, seriesEntry, chunk int)
}

// load loads all added chunks and saves resulting aggrs to res.
func (r *bucketChunkReader) load(ctx context.Context, res []seriesEntry, aggrs []storepb.Aggr, calculateChunkChecksum bool) error {
func (r *bucketChunkReader) load(ctx context.Context, res []seriesEntry, aggrs []storepb.Aggr, calculateChunkChecksum bool, bytesLimiter BytesLimiter) error {
g, ctx := errgroup.WithContext(ctx)

for seq, pIdxs := range r.toLoad {
Expand All @@ -2609,12 +2645,18 @@ func (r *bucketChunkReader) load(ctx context.Context, res []seriesEntry, aggrs [
return uint64(pIdxs[i].offset), uint64(pIdxs[i].offset) + EstimatedMaxChunkSize
})

for _, p := range parts {
if err := bytesLimiter.Reserve(uint64(p.End - p.Start)); err != nil {
return errors.Wrap(err, "bytes limit exceeded while fetching chunks")
}
}

for _, p := range parts {
seq := seq
p := p
indices := pIdxs[p.ElemRng[0]:p.ElemRng[1]]
g.Go(func() error {
return r.loadChunks(ctx, res, aggrs, seq, p, indices, calculateChunkChecksum)
return r.loadChunks(ctx, res, aggrs, seq, p, indices, calculateChunkChecksum, bytesLimiter)
})
}
}
Expand All @@ -2623,7 +2665,7 @@ func (r *bucketChunkReader) load(ctx context.Context, res []seriesEntry, aggrs [

// loadChunks will read range [start, end] from the segment file with sequence number seq.
// This data range covers chunks starting at supplied offsets.
func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, aggrs []storepb.Aggr, seq int, part Part, pIdxs []loadIdx, calculateChunkChecksum bool) error {
func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, aggrs []storepb.Aggr, seq int, part Part, pIdxs []loadIdx, calculateChunkChecksum bool, bytesLimiter BytesLimiter) error {
fetchBegin := time.Now()

// Get a reader for the required range.
Expand Down Expand Up @@ -2719,6 +2761,9 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, a

// Read entire chunk into new buffer.
// TODO: readChunkRange call could be avoided for any chunk but last in this particular part.
if err := bytesLimiter.Reserve(uint64(chunkLen)); err != nil {
return errors.Wrap(err, "bytes limit exceeded while fetching chunks")
}
nb, err := r.block.readChunkRange(ctx, seq, int64(pIdx.offset), int64(chunkLen), []byteRange{{offset: 0, length: chunkLen}})
if err != nil {
return errors.Wrapf(err, "preloaded chunk too small, expecting %d, and failed to fetch full chunk", chunkLen)
Expand Down
15 changes: 8 additions & 7 deletions pkg/store/bucket_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func newCustomSeriesLimiterFactory(limit uint64, code codes.Code) SeriesLimiterF
}
}

func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, manyParts bool, chunksLimiterFactory ChunksLimiterFactory, seriesLimiterFactory SeriesLimiterFactory, relabelConfig []*relabel.Config, filterConf *FilterConfig) *storeSuite {
func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, manyParts bool, chunksLimiterFactory ChunksLimiterFactory, seriesLimiterFactory SeriesLimiterFactory, bytesLimiterFactory BytesLimiterFactory, relabelConfig []*relabel.Config, filterConf *FilterConfig) *storeSuite {
series := []labels.Labels{
labels.FromStrings("a", "1", "b", "1"),
labels.FromStrings("a", "1", "b", "2"),
Expand Down Expand Up @@ -187,6 +187,7 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m
dir,
chunksLimiterFactory,
seriesLimiterFactory,
bytesLimiterFactory,
NewGapBasedPartitioner(PartitionerMaxGapSize),
20,
true,
Expand Down Expand Up @@ -486,7 +487,7 @@ func TestBucketStore_e2e(t *testing.T) {

dir := t.TempDir()

s := prepareStoreWithTestBlocks(t, dir, bkt, false, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0), emptyRelabelConfig, allowAllFilterConf)
s := prepareStoreWithTestBlocks(t, dir, bkt, false, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0), NewBytesLimiterFactory(0), emptyRelabelConfig, allowAllFilterConf)

if ok := t.Run("no index cache", func(t *testing.T) {
s.cache.SwapWith(noopCache{})
Expand Down Expand Up @@ -539,7 +540,7 @@ func TestBucketStore_ManyParts_e2e(t *testing.T) {

dir := t.TempDir()

s := prepareStoreWithTestBlocks(t, dir, bkt, true, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0), emptyRelabelConfig, allowAllFilterConf)
s := prepareStoreWithTestBlocks(t, dir, bkt, true, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0), NewBytesLimiterFactory(0), emptyRelabelConfig, allowAllFilterConf)

indexCache, err := storecache.NewInMemoryIndexCacheWithConfig(s.logger, nil, storecache.InMemoryIndexCacheConfig{
MaxItemSize: 1e5,
Expand All @@ -565,7 +566,7 @@ func TestBucketStore_TimePartitioning_e2e(t *testing.T) {
// The query will fetch 2 series from 2 blocks, so we do expect to hit a total of 4 chunks.
expectedChunks := uint64(2 * 2)

s := prepareStoreWithTestBlocks(t, dir, bkt, false, NewChunksLimiterFactory(expectedChunks), NewSeriesLimiterFactory(0), emptyRelabelConfig, &FilterConfig{
s := prepareStoreWithTestBlocks(t, dir, bkt, false, NewChunksLimiterFactory(expectedChunks), NewSeriesLimiterFactory(0), NewBytesLimiterFactory(0), emptyRelabelConfig, &FilterConfig{
MinTime: minTimeDuration,
MaxTime: filterMaxTime,
})
Expand Down Expand Up @@ -644,7 +645,7 @@ func TestBucketStore_Series_ChunksLimiter_e2e(t *testing.T) {

dir := t.TempDir()

s := prepareStoreWithTestBlocks(t, dir, bkt, false, newCustomChunksLimiterFactory(testData.maxChunksLimit, testData.code), newCustomSeriesLimiterFactory(testData.maxSeriesLimit, testData.code), emptyRelabelConfig, allowAllFilterConf)
s := prepareStoreWithTestBlocks(t, dir, bkt, false, newCustomChunksLimiterFactory(testData.maxChunksLimit, testData.code), newCustomSeriesLimiterFactory(testData.maxSeriesLimit, testData.code), NewBytesLimiterFactory(0), emptyRelabelConfig, allowAllFilterConf)
testutil.Ok(t, s.store.SyncBlocks(ctx))

req := &storepb.SeriesRequest{
Expand Down Expand Up @@ -679,7 +680,7 @@ func TestBucketStore_LabelNames_e2e(t *testing.T) {

dir := t.TempDir()

s := prepareStoreWithTestBlocks(t, dir, bkt, false, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0), emptyRelabelConfig, allowAllFilterConf)
s := prepareStoreWithTestBlocks(t, dir, bkt, false, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0), NewBytesLimiterFactory(0), emptyRelabelConfig, allowAllFilterConf)
s.cache.SwapWith(noopCache{})

mint, maxt := s.store.TimeRange()
Expand Down Expand Up @@ -779,7 +780,7 @@ func TestBucketStore_LabelValues_e2e(t *testing.T) {

dir := t.TempDir()

s := prepareStoreWithTestBlocks(t, dir, bkt, false, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0), emptyRelabelConfig, allowAllFilterConf)
s := prepareStoreWithTestBlocks(t, dir, bkt, false, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0), NewBytesLimiterFactory(0), emptyRelabelConfig, allowAllFilterConf)
s.cache.SwapWith(noopCache{})

mint, maxt := s.store.TimeRange()
Expand Down
Loading