From 931f4e73b532bd9d6f59e31359154c98b32cdbc9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Thu, 27 Oct 2022 14:51:09 +0300 Subject: [PATCH] store: add downloaded bytes limit (#5801) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * store: add downloaded bytes limit Signed-off-by: Giedrius Statkevičius * store: add bytesLimiter to LabelNames, LabelValues Signed-off-by: Giedrius Statkevičius * test: add e2e test for new limiter Signed-off-by: Giedrius Statkevičius * *: update CHANGELOG/etc Signed-off-by: Giedrius Statkevičius * e2e: hard fail on no error We always expect an error here. Signed-off-by: Giedrius Statkevičius * CHANGELOG: fix & improve clarity Signed-off-by: Giedrius Statkevičius Signed-off-by: Giedrius Statkevičius --- CHANGELOG.md | 1 + cmd/thanos/store.go | 6 + docs/components/store.md | 6 + internal/cortex/storegateway/bucket_stores.go | 1 + pkg/store/bucket.go | 73 +++++++-- pkg/store/bucket_e2e_test.go | 15 +- pkg/store/bucket_test.go | 20 ++- pkg/store/limiter.go | 21 +++ test/e2e/store_gateway_test.go | 139 ++++++++++++++++++ 9 files changed, 255 insertions(+), 27 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index bd2f15ae02..dc02b47f18 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re ### Added - [#5814](https://github.com/thanos-io/thanos/pull/5814) - Add metric `thanos_bucket_store_postings_size_bytes` that shows the distribution of how many postings (in bytes) were needed for each Series() call in Thanos Store. Useful for determining limits. +- [#5801](https://github.com/thanos-io/thanos/pull/5801) Store: add a new limiter `--store.grpc.downloaded-bytes-limit` that limits the number of bytes downloaded in each Series/LabelNames/LabelValues call. Use `thanos_bucket_store_postings_size_bytes` for determining the limits. ### Changed diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index 546ee78d0b..b31caa0e42 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -58,6 +58,7 @@ type storeConfig struct { chunkPoolSize units.Base2Bytes maxSampleCount uint64 maxTouchedSeriesCount uint64 + maxDownloadedBytes units.Base2Bytes maxConcurrency int component component.StoreAPI debugLogging bool @@ -109,6 +110,10 @@ func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) { "Maximum amount of touched series returned via a single Series call. The Series call fails if this limit is exceeded. 0 means no limit."). Default("0").Uint64Var(&sc.maxTouchedSeriesCount) + cmd.Flag("store.grpc.downloaded-bytes-limit", + "Maximum amount of downloaded (either fetched or touched) bytes in a single Series/LabelNames/LabelValues call. The Series call fails if this limit is exceeded. 0 means no limit."). + Default("0").BytesVar(&sc.maxDownloadedBytes) + cmd.Flag("store.grpc.series-max-concurrency", "Maximum number of concurrent Series calls.").Default("20").IntVar(&sc.maxConcurrency) sc.component = component.Store @@ -345,6 +350,7 @@ func runStore( conf.dataDir, store.NewChunksLimiterFactory(conf.maxSampleCount/store.MaxSamplesPerChunk), // The samples limit is an approximation based on the max number of samples per chunk. store.NewSeriesLimiterFactory(conf.maxTouchedSeriesCount), + store.NewBytesLimiterFactory(conf.maxDownloadedBytes), store.NewGapBasedPartitioner(store.PartitionerMaxGapSize), conf.blockSyncConcurrency, conf.advertiseCompatibilityLabel, diff --git a/docs/components/store.md b/docs/components/store.md index 1460d142c3..fdfec870ab 100644 --- a/docs/components/store.md +++ b/docs/components/store.md @@ -162,6 +162,12 @@ Flags: If true, Store Gateway will lazy memory map index-header only once the block is required by a query. + --store.grpc.downloaded-bytes-limit=0 + Maximum amount of downloaded (either + fetched or touched) bytes in a single + Series/LabelNames/LabelValues call. The Series + call fails if this limit is exceeded. 0 means + no limit. --store.grpc.series-max-concurrency=20 Maximum number of concurrent Series calls. --store.grpc.series-sample-limit=0 diff --git a/internal/cortex/storegateway/bucket_stores.go b/internal/cortex/storegateway/bucket_stores.go index cfda02a2db..17e2576dff 100644 --- a/internal/cortex/storegateway/bucket_stores.go +++ b/internal/cortex/storegateway/bucket_stores.go @@ -493,6 +493,7 @@ func (u *BucketStores) getOrCreateStore(userID string) (*store.BucketStore, erro u.syncDirForUser(userID), newChunksLimiterFactory(u.limits, userID), newSeriesLimiterFactory(u.limits, userID), + store.NewBytesLimiterFactory(0), u.partitioner, u.cfg.BucketStore.BlockSyncConcurrency, false, // No need to enable backward compatibility with Thanos pre 0.8.0 queriers diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 01dc32e498..a7a5d3abd9 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -320,7 +320,10 @@ type BucketStore struct { // seriesLimiterFactory creates a new limiter used to limit the number of touched series by each Series() call, // or LabelName and LabelValues calls when used with matchers. seriesLimiterFactory SeriesLimiterFactory - partitioner Partitioner + + // bytesLimiterFactory creates a new limiter used to limit the amount of bytes fetched/touched by each Series() call. + bytesLimiterFactory BytesLimiterFactory + partitioner Partitioner filterConfig *FilterConfig advLabelSets []labelpb.ZLabelSet @@ -420,6 +423,7 @@ func NewBucketStore( dir string, chunksLimiterFactory ChunksLimiterFactory, seriesLimiterFactory SeriesLimiterFactory, + bytesLimiterFactory BytesLimiterFactory, partitioner Partitioner, blockSyncConcurrency int, enableCompatibilityLabel bool, @@ -446,6 +450,7 @@ func NewBucketStore( queryGate: gate.NewNoop(), chunksLimiterFactory: chunksLimiterFactory, seriesLimiterFactory: seriesLimiterFactory, + bytesLimiterFactory: bytesLimiterFactory, partitioner: partitioner, enableCompatibilityLabel: enableCompatibilityLabel, postingOffsetsInMemSampling: postingOffsetsInMemSampling, @@ -815,6 +820,7 @@ func blockSeries( matchers []*labels.Matcher, chunksLimiter ChunksLimiter, seriesLimiter SeriesLimiter, + bytesLimiter BytesLimiter, // Rate limiter for used bytes. skipChunks bool, minTime, maxTime int64, loadAggregates []storepb.Aggr, @@ -822,7 +828,7 @@ func blockSeries( emptyPostingsCount prometheus.Counter, calculateChunkHash bool, ) (storepb.SeriesSet, *queryStats, error) { - ps, err := indexr.ExpandedPostings(ctx, matchers) + ps, err := indexr.ExpandedPostings(ctx, matchers, bytesLimiter) if err != nil { return nil, nil, errors.Wrap(err, "expanded matching posting") } @@ -840,7 +846,7 @@ func blockSeries( // Preload all series index data. // TODO(bwplotka): Consider not keeping all series in memory all the time. // TODO(bwplotka): Do lazy loading in one step as `ExpandingPostings` method. - if err := indexr.PreloadSeries(ctx, ps); err != nil { + if err := indexr.PreloadSeries(ctx, ps, bytesLimiter); err != nil { return nil, nil, errors.Wrap(err, "preload series") } @@ -905,7 +911,7 @@ func blockSeries( return newBucketSeriesSet(res), indexr.stats, nil } - if err := chunkr.load(ctx, res, loadAggregates, calculateChunkHash); err != nil { + if err := chunkr.load(ctx, res, loadAggregates, calculateChunkHash, bytesLimiter); err != nil { return nil, nil, errors.Wrap(err, "load chunks") } @@ -1053,6 +1059,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie req.MaxTime = s.limitMaxTime(req.MaxTime) var ( + bytesLimiter = s.bytesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("bytes")) ctx = srv.Context() stats = &queryStats{} res []storepb.SeriesSet @@ -1128,6 +1135,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie blockMatchers, chunksLimiter, seriesLimiter, + bytesLimiter, req.SkipChunks, req.MinTime, req.MaxTime, req.Aggregates, @@ -1292,6 +1300,7 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq var mtx sync.Mutex var sets [][]string var seriesLimiter = s.seriesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("series")) + var bytesLimiter = s.bytesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("bytes")) for _, b := range s.blocks { b := b @@ -1350,6 +1359,7 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq reqSeriesMatchersNoExtLabels, nil, seriesLimiter, + bytesLimiter, true, req.Start, req.End, @@ -1458,6 +1468,7 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR var mtx sync.Mutex var sets [][]string var seriesLimiter = s.seriesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("series")) + var bytesLimiter = s.bytesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("bytes")) for _, b := range s.blocks { b := b @@ -1519,6 +1530,7 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR reqSeriesMatchersNoExtLabels, nil, seriesLimiter, + bytesLimiter, true, req.Start, req.End, @@ -1913,7 +1925,7 @@ func newBucketIndexReader(block *bucketBlock) *bucketIndexReader { // Reminder: A posting is a reference (represented as a uint64) to a series reference, which in turn points to the first // chunk where the series contains the matching label-value pair for a given block of data. Postings can be fetched by // single label name=value. -func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms []*labels.Matcher) ([]storage.SeriesRef, error) { +func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms []*labels.Matcher, bytesLimiter BytesLimiter) ([]storage.SeriesRef, error) { var ( postingGroups []*postingGroup allRequested = false @@ -1962,7 +1974,7 @@ func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms []*labels.M keys = append(keys, allPostingsLabel) } - fetchedPostings, err := r.fetchPostings(ctx, keys) + fetchedPostings, err := r.fetchPostings(ctx, keys, bytesLimiter) if err != nil { return nil, errors.Wrap(err, "get postings") } @@ -2102,7 +2114,7 @@ type postingPtr struct { // fetchPostings fill postings requested by posting groups. // It returns one postings for each key, in the same order. // If postings for given key is not fetched, entry at given index will be nil. -func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Label) ([]index.Postings, error) { +func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Label, bytesLimiter BytesLimiter) ([]index.Postings, error) { timer := prometheus.NewTimer(r.block.metrics.postingsFetchDuration) defer timer.ObserveDuration() @@ -2112,6 +2124,11 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab // Fetch postings from the cache with a single call. fromCache, _ := r.block.indexCache.FetchMultiPostings(ctx, r.block.meta.ULID, keys) + for _, dataFromCache := range fromCache { + if err := bytesLimiter.Reserve(uint64(len(dataFromCache))); err != nil { + return nil, errors.Wrap(err, "bytes limit exceeded while loading postings from index cache") + } + } // Iterate over all groups and fetch posting from cache. // If we have a miss, mark key to be fetched in `ptrs` slice. @@ -2174,6 +2191,15 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab return uint64(ptrs[i].ptr.Start), uint64(ptrs[i].ptr.End) }) + for _, part := range parts { + start := int64(part.Start) + length := int64(part.End) - start + + if err := bytesLimiter.Reserve(uint64(length)); err != nil { + return nil, errors.Wrap(err, "bytes limit exceeded while fetching postings") + } + } + g, ctx := errgroup.WithContext(ctx) for _, part := range parts { i, j := part.ElemRng[0], part.ElemRng[1] @@ -2320,7 +2346,7 @@ func (it *bigEndianPostings) length() int { return len(it.list) / 4 } -func (r *bucketIndexReader) PreloadSeries(ctx context.Context, ids []storage.SeriesRef) error { +func (r *bucketIndexReader) PreloadSeries(ctx context.Context, ids []storage.SeriesRef, bytesLimiter BytesLimiter) error { timer := prometheus.NewTimer(r.block.metrics.seriesFetchDuration) defer timer.ObserveDuration() @@ -2329,26 +2355,36 @@ func (r *bucketIndexReader) PreloadSeries(ctx context.Context, ids []storage.Ser fromCache, ids := r.block.indexCache.FetchMultiSeries(ctx, r.block.meta.ULID, ids) for id, b := range fromCache { r.loadedSeries[id] = b + if err := bytesLimiter.Reserve(uint64(len(b))); err != nil { + return errors.Wrap(err, "exceeded bytes limit while loading series from index cache") + } } parts := r.block.partitioner.Partition(len(ids), func(i int) (start, end uint64) { return uint64(ids[i]), uint64(ids[i] + maxSeriesSize) }) + g, ctx := errgroup.WithContext(ctx) for _, p := range parts { s, e := p.Start, p.End i, j := p.ElemRng[0], p.ElemRng[1] g.Go(func() error { - return r.loadSeries(ctx, ids[i:j], false, s, e) + return r.loadSeries(ctx, ids[i:j], false, s, e, bytesLimiter) }) } return g.Wait() } -func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []storage.SeriesRef, refetch bool, start, end uint64) error { +func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []storage.SeriesRef, refetch bool, start, end uint64, bytesLimiter BytesLimiter) error { begin := time.Now() + if bytesLimiter != nil { + if err := bytesLimiter.Reserve(uint64(end - start)); err != nil { + return errors.Wrap(err, "exceeded bytes limit while fetching series") + } + } + b, err := r.block.readIndexRange(ctx, int64(start), int64(end-start)) if err != nil { return errors.Wrap(err, "read series range") @@ -2378,7 +2414,7 @@ func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []storage.Series level.Warn(r.block.logger).Log("msg", "series size exceeded expected size; refetching", "id", id, "series length", n+int(l), "maxSeriesSize", maxSeriesSize) // Fetch plus to get the size of next one if exists. - return r.loadSeries(ctx, ids[i:], true, uint64(id), uint64(id)+uint64(n+int(l)+1)) + return r.loadSeries(ctx, ids[i:], true, uint64(id), uint64(id)+uint64(n+int(l)+1), bytesLimiter) } c = c[n : n+int(l)] r.mtx.Lock() @@ -2598,7 +2634,7 @@ func (r *bucketChunkReader) addLoad(id chunks.ChunkRef, seriesEntry, chunk int) } // load loads all added chunks and saves resulting aggrs to res. -func (r *bucketChunkReader) load(ctx context.Context, res []seriesEntry, aggrs []storepb.Aggr, calculateChunkChecksum bool) error { +func (r *bucketChunkReader) load(ctx context.Context, res []seriesEntry, aggrs []storepb.Aggr, calculateChunkChecksum bool, bytesLimiter BytesLimiter) error { g, ctx := errgroup.WithContext(ctx) for seq, pIdxs := range r.toLoad { @@ -2609,12 +2645,18 @@ func (r *bucketChunkReader) load(ctx context.Context, res []seriesEntry, aggrs [ return uint64(pIdxs[i].offset), uint64(pIdxs[i].offset) + EstimatedMaxChunkSize }) + for _, p := range parts { + if err := bytesLimiter.Reserve(uint64(p.End - p.Start)); err != nil { + return errors.Wrap(err, "bytes limit exceeded while fetching chunks") + } + } + for _, p := range parts { seq := seq p := p indices := pIdxs[p.ElemRng[0]:p.ElemRng[1]] g.Go(func() error { - return r.loadChunks(ctx, res, aggrs, seq, p, indices, calculateChunkChecksum) + return r.loadChunks(ctx, res, aggrs, seq, p, indices, calculateChunkChecksum, bytesLimiter) }) } } @@ -2623,7 +2665,7 @@ func (r *bucketChunkReader) load(ctx context.Context, res []seriesEntry, aggrs [ // loadChunks will read range [start, end] from the segment file with sequence number seq. // This data range covers chunks starting at supplied offsets. -func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, aggrs []storepb.Aggr, seq int, part Part, pIdxs []loadIdx, calculateChunkChecksum bool) error { +func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, aggrs []storepb.Aggr, seq int, part Part, pIdxs []loadIdx, calculateChunkChecksum bool, bytesLimiter BytesLimiter) error { fetchBegin := time.Now() // Get a reader for the required range. @@ -2719,6 +2761,9 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, a // Read entire chunk into new buffer. // TODO: readChunkRange call could be avoided for any chunk but last in this particular part. + if err := bytesLimiter.Reserve(uint64(chunkLen)); err != nil { + return errors.Wrap(err, "bytes limit exceeded while fetching chunks") + } nb, err := r.block.readChunkRange(ctx, seq, int64(pIdx.offset), int64(chunkLen), []byteRange{{offset: 0, length: chunkLen}}) if err != nil { return errors.Wrapf(err, "preloaded chunk too small, expecting %d, and failed to fetch full chunk", chunkLen) diff --git a/pkg/store/bucket_e2e_test.go b/pkg/store/bucket_e2e_test.go index 7e4cbda660..f2aeb9fdd6 100644 --- a/pkg/store/bucket_e2e_test.go +++ b/pkg/store/bucket_e2e_test.go @@ -152,7 +152,7 @@ func newCustomSeriesLimiterFactory(limit uint64, code codes.Code) SeriesLimiterF } } -func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, manyParts bool, chunksLimiterFactory ChunksLimiterFactory, seriesLimiterFactory SeriesLimiterFactory, relabelConfig []*relabel.Config, filterConf *FilterConfig) *storeSuite { +func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, manyParts bool, chunksLimiterFactory ChunksLimiterFactory, seriesLimiterFactory SeriesLimiterFactory, bytesLimiterFactory BytesLimiterFactory, relabelConfig []*relabel.Config, filterConf *FilterConfig) *storeSuite { series := []labels.Labels{ labels.FromStrings("a", "1", "b", "1"), labels.FromStrings("a", "1", "b", "2"), @@ -187,6 +187,7 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m dir, chunksLimiterFactory, seriesLimiterFactory, + bytesLimiterFactory, NewGapBasedPartitioner(PartitionerMaxGapSize), 20, true, @@ -486,7 +487,7 @@ func TestBucketStore_e2e(t *testing.T) { dir := t.TempDir() - s := prepareStoreWithTestBlocks(t, dir, bkt, false, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0), emptyRelabelConfig, allowAllFilterConf) + s := prepareStoreWithTestBlocks(t, dir, bkt, false, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0), NewBytesLimiterFactory(0), emptyRelabelConfig, allowAllFilterConf) if ok := t.Run("no index cache", func(t *testing.T) { s.cache.SwapWith(noopCache{}) @@ -539,7 +540,7 @@ func TestBucketStore_ManyParts_e2e(t *testing.T) { dir := t.TempDir() - s := prepareStoreWithTestBlocks(t, dir, bkt, true, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0), emptyRelabelConfig, allowAllFilterConf) + s := prepareStoreWithTestBlocks(t, dir, bkt, true, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0), NewBytesLimiterFactory(0), emptyRelabelConfig, allowAllFilterConf) indexCache, err := storecache.NewInMemoryIndexCacheWithConfig(s.logger, nil, storecache.InMemoryIndexCacheConfig{ MaxItemSize: 1e5, @@ -565,7 +566,7 @@ func TestBucketStore_TimePartitioning_e2e(t *testing.T) { // The query will fetch 2 series from 2 blocks, so we do expect to hit a total of 4 chunks. expectedChunks := uint64(2 * 2) - s := prepareStoreWithTestBlocks(t, dir, bkt, false, NewChunksLimiterFactory(expectedChunks), NewSeriesLimiterFactory(0), emptyRelabelConfig, &FilterConfig{ + s := prepareStoreWithTestBlocks(t, dir, bkt, false, NewChunksLimiterFactory(expectedChunks), NewSeriesLimiterFactory(0), NewBytesLimiterFactory(0), emptyRelabelConfig, &FilterConfig{ MinTime: minTimeDuration, MaxTime: filterMaxTime, }) @@ -644,7 +645,7 @@ func TestBucketStore_Series_ChunksLimiter_e2e(t *testing.T) { dir := t.TempDir() - s := prepareStoreWithTestBlocks(t, dir, bkt, false, newCustomChunksLimiterFactory(testData.maxChunksLimit, testData.code), newCustomSeriesLimiterFactory(testData.maxSeriesLimit, testData.code), emptyRelabelConfig, allowAllFilterConf) + s := prepareStoreWithTestBlocks(t, dir, bkt, false, newCustomChunksLimiterFactory(testData.maxChunksLimit, testData.code), newCustomSeriesLimiterFactory(testData.maxSeriesLimit, testData.code), NewBytesLimiterFactory(0), emptyRelabelConfig, allowAllFilterConf) testutil.Ok(t, s.store.SyncBlocks(ctx)) req := &storepb.SeriesRequest{ @@ -679,7 +680,7 @@ func TestBucketStore_LabelNames_e2e(t *testing.T) { dir := t.TempDir() - s := prepareStoreWithTestBlocks(t, dir, bkt, false, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0), emptyRelabelConfig, allowAllFilterConf) + s := prepareStoreWithTestBlocks(t, dir, bkt, false, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0), NewBytesLimiterFactory(0), emptyRelabelConfig, allowAllFilterConf) s.cache.SwapWith(noopCache{}) mint, maxt := s.store.TimeRange() @@ -779,7 +780,7 @@ func TestBucketStore_LabelValues_e2e(t *testing.T) { dir := t.TempDir() - s := prepareStoreWithTestBlocks(t, dir, bkt, false, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0), emptyRelabelConfig, allowAllFilterConf) + s := prepareStoreWithTestBlocks(t, dir, bkt, false, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0), NewBytesLimiterFactory(0), emptyRelabelConfig, allowAllFilterConf) s.cache.SwapWith(noopCache{}) mint, maxt := s.store.TimeRange() diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index f134195b15..a186b32376 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -648,6 +648,7 @@ func TestBucketStore_Info(t *testing.T) { dir, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0), + NewBytesLimiterFactory(0), NewGapBasedPartitioner(PartitionerMaxGapSize), 20, true, @@ -889,6 +890,7 @@ func testSharding(t *testing.T, reuseDisk string, bkt objstore.Bucket, all ...ul dir, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0), + NewBytesLimiterFactory(0), NewGapBasedPartitioner(PartitionerMaxGapSize), 20, true, @@ -1012,7 +1014,7 @@ func TestReadIndexCache_LoadSeries(t *testing.T) { } // Success with no refetches. - testutil.Ok(t, r.loadSeries(context.TODO(), []storage.SeriesRef{2, 13, 24}, false, 2, 100)) + testutil.Ok(t, r.loadSeries(context.TODO(), []storage.SeriesRef{2, 13, 24}, false, 2, 100, NewBytesLimiterFactory(0)(nil))) testutil.Equals(t, map[storage.SeriesRef][]byte{ 2: []byte("aaaaaaaaaa"), 13: []byte("bbbbbbbbbb"), @@ -1022,7 +1024,7 @@ func TestReadIndexCache_LoadSeries(t *testing.T) { // Success with 2 refetches. r.loadedSeries = map[storage.SeriesRef][]byte{} - testutil.Ok(t, r.loadSeries(context.TODO(), []storage.SeriesRef{2, 13, 24}, false, 2, 15)) + testutil.Ok(t, r.loadSeries(context.TODO(), []storage.SeriesRef{2, 13, 24}, false, 2, 15, NewBytesLimiterFactory(0)(nil))) testutil.Equals(t, map[storage.SeriesRef][]byte{ 2: []byte("aaaaaaaaaa"), 13: []byte("bbbbbbbbbb"), @@ -1032,7 +1034,7 @@ func TestReadIndexCache_LoadSeries(t *testing.T) { // Success with refetch on first element. r.loadedSeries = map[storage.SeriesRef][]byte{} - testutil.Ok(t, r.loadSeries(context.TODO(), []storage.SeriesRef{2}, false, 2, 5)) + testutil.Ok(t, r.loadSeries(context.TODO(), []storage.SeriesRef{2}, false, 2, 5, NewBytesLimiterFactory(0)(nil))) testutil.Equals(t, map[storage.SeriesRef][]byte{ 2: []byte("aaaaaaaaaa"), }, r.loadedSeries) @@ -1046,7 +1048,7 @@ func TestReadIndexCache_LoadSeries(t *testing.T) { testutil.Ok(t, bkt.Upload(context.Background(), filepath.Join(b.meta.ULID.String(), block.IndexFilename), bytes.NewReader(buf.Get()))) // Fail, but no recursion at least. - testutil.NotOk(t, r.loadSeries(context.TODO(), []storage.SeriesRef{2, 13, 24}, false, 1, 15)) + testutil.NotOk(t, r.loadSeries(context.TODO(), []storage.SeriesRef{2, 13, 24}, false, 1, 15, NewBytesLimiterFactory(0)(nil))) } func TestBucketIndexReader_ExpandedPostings(t *testing.T) { @@ -1225,7 +1227,7 @@ func benchmarkExpandedPostings( t.ResetTimer() for i := 0; i < t.N(); i++ { - p, err := indexr.ExpandedPostings(context.Background(), c.matchers) + p, err := indexr.ExpandedPostings(context.Background(), c.matchers, NewBytesLimiterFactory(0)(nil)) testutil.Ok(t, err) testutil.Equals(t, c.expectedLen, len(p)) } @@ -1333,6 +1335,7 @@ func benchBucketSeries(t testutil.TB, skipChunk bool, samplesPerSeries, totalSer tmpDir, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0), + NewBytesLimiterFactory(0), NewGapBasedPartitioner(PartitionerMaxGapSize), 1, false, @@ -1555,6 +1558,7 @@ func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) { queryGate: gate.NewNoop(), chunksLimiterFactory: NewChunksLimiterFactory(0), seriesLimiterFactory: NewSeriesLimiterFactory(0), + bytesLimiterFactory: NewBytesLimiterFactory(0), } t.Run("invoke series for one block. Fill the cache on the way.", func(t *testing.T) { @@ -1702,6 +1706,7 @@ func TestSeries_ErrorUnmarshallingRequestHints(t *testing.T) { tmpDir, NewChunksLimiterFactory(10000/MaxSamplesPerChunk), NewSeriesLimiterFactory(0), + NewBytesLimiterFactory(0), NewGapBasedPartitioner(PartitionerMaxGapSize), 10, false, @@ -1792,6 +1797,7 @@ func TestSeries_BlockWithMultipleChunks(t *testing.T) { tmpDir, NewChunksLimiterFactory(100000/MaxSamplesPerChunk), NewSeriesLimiterFactory(0), + NewBytesLimiterFactory(0), NewGapBasedPartitioner(PartitionerMaxGapSize), 10, false, @@ -1973,6 +1979,7 @@ func setupStoreForHintsTest(t *testing.T) (testutil.TB, *BucketStore, []*storepb tmpDir, NewChunksLimiterFactory(10000/MaxSamplesPerChunk), NewSeriesLimiterFactory(0), + NewBytesLimiterFactory(0), NewGapBasedPartitioner(PartitionerMaxGapSize), 10, false, @@ -2188,6 +2195,7 @@ func TestSeries_ChuncksHaveHashRepresentation(t *testing.T) { tmpDir, NewChunksLimiterFactory(100000/MaxSamplesPerChunk), NewSeriesLimiterFactory(0), + NewBytesLimiterFactory(0), NewGapBasedPartitioner(PartitionerMaxGapSize), 10, false, @@ -2439,7 +2447,7 @@ func benchmarkBlockSeriesWithConcurrency(b *testing.B, concurrency int, blockMet indexReader := blk.indexReader() chunkReader := blk.chunkReader() - seriesSet, _, err := blockSeries(ctx, nil, indexReader, chunkReader, matchers, chunksLimiter, seriesLimiter, req.SkipChunks, req.MinTime, req.MaxTime, req.Aggregates, nil, dummyCounter, false) + seriesSet, _, err := blockSeries(ctx, nil, indexReader, chunkReader, matchers, chunksLimiter, seriesLimiter, NewBytesLimiterFactory(0)(nil), req.SkipChunks, req.MinTime, req.MaxTime, req.Aggregates, nil, dummyCounter, false) testutil.Ok(b, err) // Ensure at least 1 series has been returned (as expected). diff --git a/pkg/store/limiter.go b/pkg/store/limiter.go index 266dbbf3b2..6229b1a383 100644 --- a/pkg/store/limiter.go +++ b/pkg/store/limiter.go @@ -6,6 +6,7 @@ package store import ( "sync" + "github.com/alecthomas/units" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "go.uber.org/atomic" @@ -25,6 +26,13 @@ type SeriesLimiter interface { Reserve(num uint64) error } +type BytesLimiter interface { + // Reserve bytes out of the total amount of bytes enforced by the limiter. + // Returns an error if the limit has been exceeded. This function must be + // goroutine safe. + Reserve(num uint64) error +} + // ChunksLimiterFactory is used to create a new ChunksLimiter. The factory is useful for // projects depending on Thanos (eg. Cortex) which have dynamic limits. type ChunksLimiterFactory func(failedCounter prometheus.Counter) ChunksLimiter @@ -32,6 +40,9 @@ type ChunksLimiterFactory func(failedCounter prometheus.Counter) ChunksLimiter // SeriesLimiterFactory is used to create a new SeriesLimiter. type SeriesLimiterFactory func(failedCounter prometheus.Counter) SeriesLimiter +// BytesLimiterFactory is used to create a new BytesLimiter. +type BytesLimiterFactory func(failedCounter prometheus.Counter) BytesLimiter + // Limiter is a simple mechanism for checking if something has passed a certain threshold. type Limiter struct { limit uint64 @@ -49,6 +60,9 @@ func NewLimiter(limit uint64, ctr prometheus.Counter) *Limiter { // Reserve implements ChunksLimiter. func (l *Limiter) Reserve(num uint64) error { + if l == nil { + return nil + } if l.limit == 0 { return nil } @@ -74,3 +88,10 @@ func NewSeriesLimiterFactory(limit uint64) SeriesLimiterFactory { return NewLimiter(limit, failedCounter) } } + +// NewSeriesLimiterFactory makes a new NewSeriesLimiterFactory with a static limit. +func NewBytesLimiterFactory(limit units.Base2Bytes) BytesLimiterFactory { + return func(failedCounter prometheus.Counter) BytesLimiter { + return NewLimiter(uint64(limit), failedCounter) + } +} diff --git a/test/e2e/store_gateway_test.go b/test/e2e/store_gateway_test.go index 7d292f7ceb..2f10e73012 100644 --- a/test/e2e/store_gateway_test.go +++ b/test/e2e/store_gateway_test.go @@ -10,6 +10,7 @@ import ( "os" "path" "path/filepath" + "strings" "testing" "time" @@ -29,6 +30,7 @@ import ( "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/promclient" + "github.com/thanos-io/thanos/pkg/runutil" "github.com/thanos-io/thanos/pkg/testutil" "github.com/thanos-io/thanos/pkg/testutil/e2eutil" "github.com/thanos-io/thanos/test/e2e/e2ethanos" @@ -501,3 +503,140 @@ metafile_content_ttl: 0s` testutil.Equals(t, 200, resp.StatusCode) }) } + +func TestStoreGatewayBytesLimit(t *testing.T) { + t.Parallel() + + e, err := e2e.NewDockerEnvironment("store-limit") + testutil.Ok(t, err) + t.Cleanup(e2ethanos.CleanScenario(t, e)) + + const bucket = "store-gateway-test" + m := e2ethanos.NewMinio(e, "thanos-minio", bucket) + testutil.Ok(t, e2e.StartAndWaitReady(m)) + + store1 := e2ethanos.NewStoreGW( + e, + "1", + client.BucketConfig{ + Type: client.S3, + Config: e2ethanos.NewS3Config(bucket, m.InternalEndpoint("https"), m.InternalDir()), + }, + "", + []string{"--store.grpc.downloaded-bytes-limit=1B"}, + ) + + store2 := e2ethanos.NewStoreGW( + e, + "2", + client.BucketConfig{ + Type: client.S3, + Config: e2ethanos.NewS3Config(bucket, m.InternalEndpoint("https"), m.InternalDir()), + }, + "", + []string{"--store.grpc.downloaded-bytes-limit=100B"}, + ) + store3 := e2ethanos.NewStoreGW( + e, + "3", + client.BucketConfig{ + Type: client.S3, + Config: e2ethanos.NewS3Config(bucket, m.InternalEndpoint("https"), m.InternalDir()), + }, + "", + []string{"--store.grpc.downloaded-bytes-limit=196627B"}, + ) + + testutil.Ok(t, e2e.StartAndWaitReady(store1, store2, store3)) + + q1 := e2ethanos.NewQuerierBuilder(e, "1", store1.InternalEndpoint("grpc")).Init() + q2 := e2ethanos.NewQuerierBuilder(e, "2", store2.InternalEndpoint("grpc")).Init() + q3 := e2ethanos.NewQuerierBuilder(e, "3", store3.InternalEndpoint("grpc")).Init() + testutil.Ok(t, e2e.StartAndWaitReady(q1, q2, q3)) + + dir := filepath.Join(e.SharedDir(), "tmp") + testutil.Ok(t, os.MkdirAll(filepath.Join(e.SharedDir(), dir), os.ModePerm)) + + series := []labels.Labels{labels.FromStrings("a", "1", "b", "2")} + extLset := labels.FromStrings("ext1", "value1", "replica", "1") + extLset2 := labels.FromStrings("ext1", "value1", "replica", "2") + extLset3 := labels.FromStrings("ext1", "value2", "replica", "3") + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) + t.Cleanup(cancel) + + now := time.Now() + id1, err := e2eutil.CreateBlockWithBlockDelay(ctx, dir, series, 10, timestamp.FromTime(now), timestamp.FromTime(now.Add(2*time.Hour)), 30*time.Minute, extLset, 0, metadata.NoneFunc) + testutil.Ok(t, err) + id2, err := e2eutil.CreateBlockWithBlockDelay(ctx, dir, series, 10, timestamp.FromTime(now), timestamp.FromTime(now.Add(2*time.Hour)), 30*time.Minute, extLset2, 0, metadata.NoneFunc) + testutil.Ok(t, err) + id3, err := e2eutil.CreateBlockWithBlockDelay(ctx, dir, series, 10, timestamp.FromTime(now), timestamp.FromTime(now.Add(2*time.Hour)), 30*time.Minute, extLset3, 0, metadata.NoneFunc) + testutil.Ok(t, err) + id4, err := e2eutil.CreateBlock(ctx, dir, series, 10, timestamp.FromTime(now), timestamp.FromTime(now.Add(2*time.Hour)), extLset, 0, metadata.NoneFunc) + testutil.Ok(t, err) + l := log.NewLogfmtLogger(os.Stdout) + bkt, err := s3.NewBucketWithConfig(l, + e2ethanos.NewS3Config(bucket, m.Endpoint("https"), m.Dir()), "test-feed") + testutil.Ok(t, err) + + testutil.Ok(t, objstore.UploadDir(ctx, l, bkt, path.Join(dir, id1.String()), id1.String())) + testutil.Ok(t, objstore.UploadDir(ctx, l, bkt, path.Join(dir, id2.String()), id2.String())) + testutil.Ok(t, objstore.UploadDir(ctx, l, bkt, path.Join(dir, id3.String()), id3.String())) + testutil.Ok(t, objstore.UploadDir(ctx, l, bkt, path.Join(dir, id4.String()), id4.String())) + + // Wait for store to sync blocks. + testutil.Ok(t, store1.WaitSumMetrics(e2emon.Equals(4), "thanos_blocks_meta_synced")) + testutil.Ok(t, store2.WaitSumMetrics(e2emon.Equals(4), "thanos_blocks_meta_synced")) + testutil.Ok(t, store3.WaitSumMetrics(e2emon.Equals(4), "thanos_blocks_meta_synced")) + + t.Run("Series() limits", func(t *testing.T) { + + testutil.Ok(t, runutil.RetryWithLog(log.NewLogfmtLogger(os.Stdout), 5*time.Second, ctx.Done(), func() error { + _, err := simpleInstantQuery(t, + ctx, + q1.Endpoint("http"), + func() string { return testQuery }, + time.Now, + promclient.QueryOptions{Deduplicate: true}, 0) + if err != nil { + if strings.Contains(err.Error(), "expanded matching posting: get postings: bytes limit exceeded while fetching postings: limit 1 violated") { + return nil + } + return err + } + return fmt.Errorf("expected an error") + })) + + testutil.Ok(t, runutil.RetryWithLog(log.NewLogfmtLogger(os.Stdout), 5*time.Second, ctx.Done(), func() error { + _, err := simpleInstantQuery(t, + ctx, + q2.Endpoint("http"), + func() string { return testQuery }, + time.Now, + promclient.QueryOptions{Deduplicate: true}, 0) + if err != nil { + if strings.Contains(err.Error(), "preload series: exceeded bytes limit while fetching series: limit 100 violated") { + return nil + } + return err + } + return fmt.Errorf("expected an error") + })) + + testutil.Ok(t, runutil.RetryWithLog(log.NewLogfmtLogger(os.Stdout), 5*time.Second, ctx.Done(), func() error { + _, err := simpleInstantQuery(t, + ctx, + q3.Endpoint("http"), + func() string { return testQuery }, + time.Now, + promclient.QueryOptions{Deduplicate: true}, 0) + if err != nil { + if strings.Contains(err.Error(), "load chunks: bytes limit exceeded while fetching chunks: limit 196627 violated") { + return nil + } + return err + } + return fmt.Errorf("expected an error") + })) + }) +}