diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index ed0cc25e1de..ea70223b7aa 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -1027,6 +1027,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie req.MaxTime = s.limitMaxTime(req.MaxTime) var ( + bytesLimiter BytesLimiter ctx = srv.Context() stats = &queryStats{} res []storepb.SeriesSet @@ -1036,9 +1037,12 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie reqBlockMatchers []*labels.Matcher chunksLimiter = s.chunksLimiterFactory(s.metrics.queriesDropped.WithLabelValues("chunks")) seriesLimiter = s.seriesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("series")) - bytesLimiter = s.bytesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("bytes")) ) + if s.bytesLimiterFactory != nil { + bytesLimiter = s.bytesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("bytes")) + } + if req.Hints != nil { reqHints := &hintspb.SeriesRequestHints{} if err := types.UnmarshalAny(req.Hints, reqHints); err != nil { @@ -2155,12 +2159,14 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab return uint64(ptrs[i].ptr.Start), uint64(ptrs[i].ptr.End) }) - for _, part := range parts { - start := int64(part.Start) - length := int64(part.End) - start + if bytesLimiter != nil { + for _, part := range parts { + start := int64(part.Start) + length := int64(part.End) - start - if err := bytesLimiter.Reserve(uint64(length)); err != nil { - return nil, errors.Wrap(err, "bytes limit exceeded while fetching postings") + if err := bytesLimiter.Reserve(uint64(length)); err != nil { + return nil, errors.Wrap(err, "bytes limit exceeded while fetching postings") + } } } diff --git a/pkg/store/bucket_e2e_test.go b/pkg/store/bucket_e2e_test.go index 7e4cbda660a..c34f08b1985 100644 --- a/pkg/store/bucket_e2e_test.go +++ b/pkg/store/bucket_e2e_test.go @@ -152,7 +152,7 @@ func newCustomSeriesLimiterFactory(limit uint64, code codes.Code) SeriesLimiterF } } -func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, manyParts bool, chunksLimiterFactory ChunksLimiterFactory, seriesLimiterFactory SeriesLimiterFactory, relabelConfig []*relabel.Config, filterConf *FilterConfig) *storeSuite { +func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, manyParts bool, chunksLimiterFactory ChunksLimiterFactory, seriesLimiterFactory SeriesLimiterFactory, bytesLimiterFactory BytesLimiterFactory, relabelConfig []*relabel.Config, filterConf *FilterConfig) *storeSuite { series := []labels.Labels{ labels.FromStrings("a", "1", "b", "1"), labels.FromStrings("a", "1", "b", "2"), @@ -198,6 +198,7 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m WithIndexCache(s.cache), WithFilterConfig(filterConf), WithRegistry(reg), + WithBytesLimiterFactory(bytesLimiterFactory), ) testutil.Ok(t, err) defer func() { testutil.Ok(t, store.Close()) }() @@ -486,7 +487,7 @@ func TestBucketStore_e2e(t *testing.T) { dir := t.TempDir() - s := prepareStoreWithTestBlocks(t, dir, bkt, false, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0), emptyRelabelConfig, allowAllFilterConf) + s := prepareStoreWithTestBlocks(t, dir, bkt, false, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0), NewBytesLimiterFactory(0), emptyRelabelConfig, allowAllFilterConf) if ok := t.Run("no index cache", func(t *testing.T) { s.cache.SwapWith(noopCache{}) @@ -539,7 +540,7 @@ func TestBucketStore_ManyParts_e2e(t *testing.T) { dir := t.TempDir() - s := prepareStoreWithTestBlocks(t, dir, bkt, true, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0), emptyRelabelConfig, allowAllFilterConf) + s := prepareStoreWithTestBlocks(t, dir, bkt, true, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0), NewBytesLimiterFactory(0), emptyRelabelConfig, allowAllFilterConf) indexCache, err := storecache.NewInMemoryIndexCacheWithConfig(s.logger, nil, storecache.InMemoryIndexCacheConfig{ MaxItemSize: 1e5, @@ -565,7 +566,7 @@ func TestBucketStore_TimePartitioning_e2e(t *testing.T) { // The query will fetch 2 series from 2 blocks, so we do expect to hit a total of 4 chunks. expectedChunks := uint64(2 * 2) - s := prepareStoreWithTestBlocks(t, dir, bkt, false, NewChunksLimiterFactory(expectedChunks), NewSeriesLimiterFactory(0), emptyRelabelConfig, &FilterConfig{ + s := prepareStoreWithTestBlocks(t, dir, bkt, false, NewChunksLimiterFactory(expectedChunks), NewSeriesLimiterFactory(0), NewBytesLimiterFactory(0), emptyRelabelConfig, &FilterConfig{ MinTime: minTimeDuration, MaxTime: filterMaxTime, }) @@ -644,7 +645,7 @@ func TestBucketStore_Series_ChunksLimiter_e2e(t *testing.T) { dir := t.TempDir() - s := prepareStoreWithTestBlocks(t, dir, bkt, false, newCustomChunksLimiterFactory(testData.maxChunksLimit, testData.code), newCustomSeriesLimiterFactory(testData.maxSeriesLimit, testData.code), emptyRelabelConfig, allowAllFilterConf) + s := prepareStoreWithTestBlocks(t, dir, bkt, false, newCustomChunksLimiterFactory(testData.maxChunksLimit, testData.code), newCustomSeriesLimiterFactory(testData.maxSeriesLimit, testData.code), NewBytesLimiterFactory(0), emptyRelabelConfig, allowAllFilterConf) testutil.Ok(t, s.store.SyncBlocks(ctx)) req := &storepb.SeriesRequest{ @@ -679,7 +680,7 @@ func TestBucketStore_LabelNames_e2e(t *testing.T) { dir := t.TempDir() - s := prepareStoreWithTestBlocks(t, dir, bkt, false, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0), emptyRelabelConfig, allowAllFilterConf) + s := prepareStoreWithTestBlocks(t, dir, bkt, false, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0), NewBytesLimiterFactory(0), emptyRelabelConfig, allowAllFilterConf) s.cache.SwapWith(noopCache{}) mint, maxt := s.store.TimeRange() @@ -779,7 +780,7 @@ func TestBucketStore_LabelValues_e2e(t *testing.T) { dir := t.TempDir() - s := prepareStoreWithTestBlocks(t, dir, bkt, false, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0), emptyRelabelConfig, allowAllFilterConf) + s := prepareStoreWithTestBlocks(t, dir, bkt, false, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0), NewBytesLimiterFactory(0), emptyRelabelConfig, allowAllFilterConf) s.cache.SwapWith(noopCache{}) mint, maxt := s.store.TimeRange() diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index a4693ea1866..3ab60b96b1c 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -1010,7 +1010,7 @@ func TestReadIndexCache_LoadSeries(t *testing.T) { } // Success with no refetches. - testutil.Ok(t, r.loadSeries(context.TODO(), []storage.SeriesRef{2, 13, 24}, false, 2, 100, nil)) + testutil.Ok(t, r.loadSeries(context.TODO(), []storage.SeriesRef{2, 13, 24}, false, 2, 100, NewBytesLimiterFactory(0)(nil))) testutil.Equals(t, map[storage.SeriesRef][]byte{ 2: []byte("aaaaaaaaaa"), 13: []byte("bbbbbbbbbb"), @@ -1020,7 +1020,7 @@ func TestReadIndexCache_LoadSeries(t *testing.T) { // Success with 2 refetches. r.loadedSeries = map[storage.SeriesRef][]byte{} - testutil.Ok(t, r.loadSeries(context.TODO(), []storage.SeriesRef{2, 13, 24}, false, 2, 15, nil)) + testutil.Ok(t, r.loadSeries(context.TODO(), []storage.SeriesRef{2, 13, 24}, false, 2, 15, NewBytesLimiterFactory(0)(nil))) testutil.Equals(t, map[storage.SeriesRef][]byte{ 2: []byte("aaaaaaaaaa"), 13: []byte("bbbbbbbbbb"), @@ -1030,7 +1030,7 @@ func TestReadIndexCache_LoadSeries(t *testing.T) { // Success with refetch on first element. r.loadedSeries = map[storage.SeriesRef][]byte{} - testutil.Ok(t, r.loadSeries(context.TODO(), []storage.SeriesRef{2}, false, 2, 5, nil)) + testutil.Ok(t, r.loadSeries(context.TODO(), []storage.SeriesRef{2}, false, 2, 5, NewBytesLimiterFactory(0)(nil))) testutil.Equals(t, map[storage.SeriesRef][]byte{ 2: []byte("aaaaaaaaaa"), }, r.loadedSeries) @@ -1044,7 +1044,7 @@ func TestReadIndexCache_LoadSeries(t *testing.T) { testutil.Ok(t, bkt.Upload(context.Background(), filepath.Join(b.meta.ULID.String(), block.IndexFilename), bytes.NewReader(buf.Get()))) // Fail, but no recursion at least. - testutil.NotOk(t, r.loadSeries(context.TODO(), []storage.SeriesRef{2, 13, 24}, false, 1, 15, nil)) + testutil.NotOk(t, r.loadSeries(context.TODO(), []storage.SeriesRef{2, 13, 24}, false, 1, 15, NewBytesLimiterFactory(0)(nil))) } func TestBucketIndexReader_ExpandedPostings(t *testing.T) { diff --git a/test/e2e/store_gateway_test.go b/test/e2e/store_gateway_test.go index 391d4e0d0ff..2f10e730127 100644 --- a/test/e2e/store_gateway_test.go +++ b/test/e2e/store_gateway_test.go @@ -604,7 +604,7 @@ func TestStoreGatewayBytesLimit(t *testing.T) { } return err } - return nil + return fmt.Errorf("expected an error") })) testutil.Ok(t, runutil.RetryWithLog(log.NewLogfmtLogger(os.Stdout), 5*time.Second, ctx.Done(), func() error { @@ -620,7 +620,7 @@ func TestStoreGatewayBytesLimit(t *testing.T) { } return err } - return nil + return fmt.Errorf("expected an error") })) testutil.Ok(t, runutil.RetryWithLog(log.NewLogfmtLogger(os.Stdout), 5*time.Second, ctx.Done(), func() error { @@ -636,7 +636,7 @@ func TestStoreGatewayBytesLimit(t *testing.T) { } return err } - return nil + return fmt.Errorf("expected an error") })) }) }