Skip to content

Commit

Permalink
e2e: hard fail on no error
Browse files Browse the repository at this point in the history
We always expect an error here.

Signed-off-by: Giedrius Statkevičius <[email protected]>
  • Loading branch information
GiedriusS committed Oct 20, 2022
1 parent d50ec5a commit 29b449e
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 25 deletions.
32 changes: 21 additions & 11 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -1027,6 +1027,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
req.MaxTime = s.limitMaxTime(req.MaxTime)

var (
bytesLimiter BytesLimiter
ctx = srv.Context()
stats = &queryStats{}
res []storepb.SeriesSet
Expand All @@ -1036,9 +1037,12 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
reqBlockMatchers []*labels.Matcher
chunksLimiter = s.chunksLimiterFactory(s.metrics.queriesDropped.WithLabelValues("chunks"))
seriesLimiter = s.seriesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("series"))
bytesLimiter = s.bytesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("bytes"))
)

if s.bytesLimiterFactory != nil {
bytesLimiter = s.bytesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("bytes"))
}

if req.Hints != nil {
reqHints := &hintspb.SeriesRequestHints{}
if err := types.UnmarshalAny(req.Hints, reqHints); err != nil {
Expand Down Expand Up @@ -2155,12 +2159,14 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab
return uint64(ptrs[i].ptr.Start), uint64(ptrs[i].ptr.End)
})

for _, part := range parts {
start := int64(part.Start)
length := int64(part.End) - start
if bytesLimiter != nil {
for _, part := range parts {
start := int64(part.Start)
length := int64(part.End) - start

if err := bytesLimiter.Reserve(uint64(length)); err != nil {
return nil, errors.Wrap(err, "bytes limit exceeded while fetching postings")
if err := bytesLimiter.Reserve(uint64(length)); err != nil {
return nil, errors.Wrap(err, "bytes limit exceeded while fetching postings")
}
}
}

Expand Down Expand Up @@ -2343,8 +2349,10 @@ func (r *bucketIndexReader) PreloadSeries(ctx context.Context, ids []storage.Ser
func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []storage.SeriesRef, refetch bool, start, end uint64, bytesLimiter BytesLimiter) error {
begin := time.Now()

if err := bytesLimiter.Reserve(uint64(end - start)); err != nil {
return errors.Wrap(err, "exceeded bytes limit while fetching series")
if bytesLimiter != nil {
if err := bytesLimiter.Reserve(uint64(end - start)); err != nil {
return errors.Wrap(err, "exceeded bytes limit while fetching series")
}
}

b, err := r.block.readIndexRange(ctx, int64(start), int64(end-start))
Expand Down Expand Up @@ -2607,9 +2615,11 @@ func (r *bucketChunkReader) load(ctx context.Context, res []seriesEntry, aggrs [
return uint64(pIdxs[i].offset), uint64(pIdxs[i].offset) + EstimatedMaxChunkSize
})

for _, p := range parts {
if err := bytesLimiter.Reserve(uint64(p.End - p.Start)); err != nil {
return errors.Wrap(err, "bytes limit exceeded while fetching chunks")
if bytesLimiter != nil {
for _, p := range parts {
if err := bytesLimiter.Reserve(uint64(p.End - p.Start)); err != nil {
return errors.Wrap(err, "bytes limit exceeded while fetching chunks")
}
}
}

Expand Down
15 changes: 8 additions & 7 deletions pkg/store/bucket_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func newCustomSeriesLimiterFactory(limit uint64, code codes.Code) SeriesLimiterF
}
}

func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, manyParts bool, chunksLimiterFactory ChunksLimiterFactory, seriesLimiterFactory SeriesLimiterFactory, relabelConfig []*relabel.Config, filterConf *FilterConfig) *storeSuite {
func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, manyParts bool, chunksLimiterFactory ChunksLimiterFactory, seriesLimiterFactory SeriesLimiterFactory, bytesLimiterFactory BytesLimiterFactory, relabelConfig []*relabel.Config, filterConf *FilterConfig) *storeSuite {
series := []labels.Labels{
labels.FromStrings("a", "1", "b", "1"),
labels.FromStrings("a", "1", "b", "2"),
Expand Down Expand Up @@ -198,6 +198,7 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m
WithIndexCache(s.cache),
WithFilterConfig(filterConf),
WithRegistry(reg),
WithBytesLimiterFactory(bytesLimiterFactory),
)
testutil.Ok(t, err)
defer func() { testutil.Ok(t, store.Close()) }()
Expand Down Expand Up @@ -486,7 +487,7 @@ func TestBucketStore_e2e(t *testing.T) {

dir := t.TempDir()

s := prepareStoreWithTestBlocks(t, dir, bkt, false, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0), emptyRelabelConfig, allowAllFilterConf)
s := prepareStoreWithTestBlocks(t, dir, bkt, false, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0), NewBytesLimiterFactory(0), emptyRelabelConfig, allowAllFilterConf)

if ok := t.Run("no index cache", func(t *testing.T) {
s.cache.SwapWith(noopCache{})
Expand Down Expand Up @@ -539,7 +540,7 @@ func TestBucketStore_ManyParts_e2e(t *testing.T) {

dir := t.TempDir()

s := prepareStoreWithTestBlocks(t, dir, bkt, true, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0), emptyRelabelConfig, allowAllFilterConf)
s := prepareStoreWithTestBlocks(t, dir, bkt, true, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0), NewBytesLimiterFactory(0), emptyRelabelConfig, allowAllFilterConf)

indexCache, err := storecache.NewInMemoryIndexCacheWithConfig(s.logger, nil, storecache.InMemoryIndexCacheConfig{
MaxItemSize: 1e5,
Expand All @@ -565,7 +566,7 @@ func TestBucketStore_TimePartitioning_e2e(t *testing.T) {
// The query will fetch 2 series from 2 blocks, so we do expect to hit a total of 4 chunks.
expectedChunks := uint64(2 * 2)

s := prepareStoreWithTestBlocks(t, dir, bkt, false, NewChunksLimiterFactory(expectedChunks), NewSeriesLimiterFactory(0), emptyRelabelConfig, &FilterConfig{
s := prepareStoreWithTestBlocks(t, dir, bkt, false, NewChunksLimiterFactory(expectedChunks), NewSeriesLimiterFactory(0), NewBytesLimiterFactory(0), emptyRelabelConfig, &FilterConfig{
MinTime: minTimeDuration,
MaxTime: filterMaxTime,
})
Expand Down Expand Up @@ -644,7 +645,7 @@ func TestBucketStore_Series_ChunksLimiter_e2e(t *testing.T) {

dir := t.TempDir()

s := prepareStoreWithTestBlocks(t, dir, bkt, false, newCustomChunksLimiterFactory(testData.maxChunksLimit, testData.code), newCustomSeriesLimiterFactory(testData.maxSeriesLimit, testData.code), emptyRelabelConfig, allowAllFilterConf)
s := prepareStoreWithTestBlocks(t, dir, bkt, false, newCustomChunksLimiterFactory(testData.maxChunksLimit, testData.code), newCustomSeriesLimiterFactory(testData.maxSeriesLimit, testData.code), NewBytesLimiterFactory(0), emptyRelabelConfig, allowAllFilterConf)
testutil.Ok(t, s.store.SyncBlocks(ctx))

req := &storepb.SeriesRequest{
Expand Down Expand Up @@ -679,7 +680,7 @@ func TestBucketStore_LabelNames_e2e(t *testing.T) {

dir := t.TempDir()

s := prepareStoreWithTestBlocks(t, dir, bkt, false, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0), emptyRelabelConfig, allowAllFilterConf)
s := prepareStoreWithTestBlocks(t, dir, bkt, false, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0), NewBytesLimiterFactory(0), emptyRelabelConfig, allowAllFilterConf)
s.cache.SwapWith(noopCache{})

mint, maxt := s.store.TimeRange()
Expand Down Expand Up @@ -779,7 +780,7 @@ func TestBucketStore_LabelValues_e2e(t *testing.T) {

dir := t.TempDir()

s := prepareStoreWithTestBlocks(t, dir, bkt, false, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0), emptyRelabelConfig, allowAllFilterConf)
s := prepareStoreWithTestBlocks(t, dir, bkt, false, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0), NewBytesLimiterFactory(0), emptyRelabelConfig, allowAllFilterConf)
s.cache.SwapWith(noopCache{})

mint, maxt := s.store.TimeRange()
Expand Down
8 changes: 4 additions & 4 deletions pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1010,7 +1010,7 @@ func TestReadIndexCache_LoadSeries(t *testing.T) {
}

// Success with no refetches.
testutil.Ok(t, r.loadSeries(context.TODO(), []storage.SeriesRef{2, 13, 24}, false, 2, 100, nil))
testutil.Ok(t, r.loadSeries(context.TODO(), []storage.SeriesRef{2, 13, 24}, false, 2, 100, NewBytesLimiterFactory(0)(nil)))
testutil.Equals(t, map[storage.SeriesRef][]byte{
2: []byte("aaaaaaaaaa"),
13: []byte("bbbbbbbbbb"),
Expand All @@ -1020,7 +1020,7 @@ func TestReadIndexCache_LoadSeries(t *testing.T) {

// Success with 2 refetches.
r.loadedSeries = map[storage.SeriesRef][]byte{}
testutil.Ok(t, r.loadSeries(context.TODO(), []storage.SeriesRef{2, 13, 24}, false, 2, 15, nil))
testutil.Ok(t, r.loadSeries(context.TODO(), []storage.SeriesRef{2, 13, 24}, false, 2, 15, NewBytesLimiterFactory(0)(nil)))
testutil.Equals(t, map[storage.SeriesRef][]byte{
2: []byte("aaaaaaaaaa"),
13: []byte("bbbbbbbbbb"),
Expand All @@ -1030,7 +1030,7 @@ func TestReadIndexCache_LoadSeries(t *testing.T) {

// Success with refetch on first element.
r.loadedSeries = map[storage.SeriesRef][]byte{}
testutil.Ok(t, r.loadSeries(context.TODO(), []storage.SeriesRef{2}, false, 2, 5, nil))
testutil.Ok(t, r.loadSeries(context.TODO(), []storage.SeriesRef{2}, false, 2, 5, NewBytesLimiterFactory(0)(nil)))
testutil.Equals(t, map[storage.SeriesRef][]byte{
2: []byte("aaaaaaaaaa"),
}, r.loadedSeries)
Expand All @@ -1044,7 +1044,7 @@ func TestReadIndexCache_LoadSeries(t *testing.T) {
testutil.Ok(t, bkt.Upload(context.Background(), filepath.Join(b.meta.ULID.String(), block.IndexFilename), bytes.NewReader(buf.Get())))

// Fail, but no recursion at least.
testutil.NotOk(t, r.loadSeries(context.TODO(), []storage.SeriesRef{2, 13, 24}, false, 1, 15, nil))
testutil.NotOk(t, r.loadSeries(context.TODO(), []storage.SeriesRef{2, 13, 24}, false, 1, 15, NewBytesLimiterFactory(0)(nil)))
}

func TestBucketIndexReader_ExpandedPostings(t *testing.T) {
Expand Down
6 changes: 3 additions & 3 deletions test/e2e/store_gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -604,7 +604,7 @@ func TestStoreGatewayBytesLimit(t *testing.T) {
}
return err
}
return nil
return fmt.Errorf("expected an error")
}))

testutil.Ok(t, runutil.RetryWithLog(log.NewLogfmtLogger(os.Stdout), 5*time.Second, ctx.Done(), func() error {
Expand All @@ -620,7 +620,7 @@ func TestStoreGatewayBytesLimit(t *testing.T) {
}
return err
}
return nil
return fmt.Errorf("expected an error")
}))

testutil.Ok(t, runutil.RetryWithLog(log.NewLogfmtLogger(os.Stdout), 5*time.Second, ctx.Done(), func() error {
Expand All @@ -636,7 +636,7 @@ func TestStoreGatewayBytesLimit(t *testing.T) {
}
return err
}
return nil
return fmt.Errorf("expected an error")
}))
})
}

0 comments on commit 29b449e

Please sign in to comment.