diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 775210e3a1..6c38ee81a7 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -664,7 +664,7 @@ func debugFoundBlockSetOverview(logger log.Logger, mint, maxt int64, lset labels // TODO(bwplotka): It buffers all chunks in memory and only then streams to client. // 1. Either count chunk sizes and error out too big query. // 2. Stream posting -> series -> chunk all together. -func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error { +func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_SeriesServer) (err error) { matchers, err := translateMatchers(req.Matchers) if err != nil { return status.Error(codes.InvalidArgument, err.Error()) @@ -731,6 +731,25 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie s.mtx.RUnlock() + defer func() { + s.metrics.seriesDataTouched.WithLabelValues("postings").Observe(float64(stats.postingsTouched)) + s.metrics.seriesDataFetched.WithLabelValues("postings").Observe(float64(stats.postingsFetched)) + s.metrics.seriesDataSizeTouched.WithLabelValues("postings").Observe(float64(stats.postingsTouchedSizeSum)) + s.metrics.seriesDataSizeFetched.WithLabelValues("postings").Observe(float64(stats.postingsFetchedSizeSum)) + s.metrics.seriesDataTouched.WithLabelValues("series").Observe(float64(stats.seriesTouched)) + s.metrics.seriesDataFetched.WithLabelValues("series").Observe(float64(stats.seriesFetched)) + s.metrics.seriesDataSizeTouched.WithLabelValues("series").Observe(float64(stats.seriesTouchedSizeSum)) + s.metrics.seriesDataSizeFetched.WithLabelValues("series").Observe(float64(stats.seriesFetchedSizeSum)) + s.metrics.seriesDataTouched.WithLabelValues("chunks").Observe(float64(stats.chunksTouched)) + s.metrics.seriesDataFetched.WithLabelValues("chunks").Observe(float64(stats.chunksFetched)) + s.metrics.seriesDataSizeTouched.WithLabelValues("chunks").Observe(float64(stats.chunksTouchedSizeSum)) + s.metrics.seriesDataSizeFetched.WithLabelValues("chunks").Observe(float64(stats.chunksFetchedSizeSum)) + s.metrics.resultSeriesCount.Observe(float64(stats.mergedSeriesCount)) + + level.Debug(s.logger).Log("msg", "stats query processed", + "stats", fmt.Sprintf("%+v", stats), "err", err) + }() + // Concurrently get data from all blocks. { span, _ := tracing.StartSpan(srv.Context(), "bucket_store_preload_all") @@ -775,24 +794,6 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie stats.mergeDuration = time.Since(begin) s.metrics.seriesMergeDuration.Observe(stats.mergeDuration.Seconds()) } - - s.metrics.seriesDataTouched.WithLabelValues("postings").Observe(float64(stats.postingsTouched)) - s.metrics.seriesDataFetched.WithLabelValues("postings").Observe(float64(stats.postingsFetched)) - s.metrics.seriesDataSizeTouched.WithLabelValues("postings").Observe(float64(stats.postingsTouchedSizeSum)) - s.metrics.seriesDataSizeFetched.WithLabelValues("postings").Observe(float64(stats.postingsFetchedSizeSum)) - s.metrics.seriesDataTouched.WithLabelValues("series").Observe(float64(stats.seriesTouched)) - s.metrics.seriesDataFetched.WithLabelValues("series").Observe(float64(stats.seriesFetched)) - s.metrics.seriesDataSizeTouched.WithLabelValues("series").Observe(float64(stats.seriesTouchedSizeSum)) - s.metrics.seriesDataSizeFetched.WithLabelValues("series").Observe(float64(stats.seriesFetchedSizeSum)) - s.metrics.seriesDataTouched.WithLabelValues("chunks").Observe(float64(stats.chunksTouched)) - s.metrics.seriesDataFetched.WithLabelValues("chunks").Observe(float64(stats.chunksFetched)) - s.metrics.seriesDataSizeTouched.WithLabelValues("chunks").Observe(float64(stats.chunksTouchedSizeSum)) - s.metrics.seriesDataSizeFetched.WithLabelValues("chunks").Observe(float64(stats.chunksFetchedSizeSum)) - s.metrics.resultSeriesCount.Observe(float64(stats.mergedSeriesCount)) - - level.Debug(s.logger).Log("msg", "series query processed", - "stats", fmt.Sprintf("%+v", stats)) - return nil } @@ -1239,6 +1240,13 @@ func (p *postingGroup) Postings() index.Postings { return index.EmptyPostings() } + for i, posting := range p.postings { + if posting == nil { + // This should not happen. Debug for https://github.com/improbable-eng/thanos/issues/874. + return index.ErrPostings(errors.Errorf("at least one of %d postings is nil for %s. It was never fetched.", i, p.keys[i])) + } + } + return p.aggregate(p.postings) } @@ -1327,6 +1335,7 @@ func (r *bucketIndexReader) fetchPostings(groups []*postingGroup) error { continue } + r.stats.postingsToFetch++ ptrs = append(ptrs, postingPtr{ptr: ptr, groupID: i, keyID: j}) } } @@ -1342,13 +1351,13 @@ func (r *bucketIndexReader) fetchPostings(groups []*postingGroup) error { }) var g run.Group - for _, p := range parts { + for _, part := range parts { ctx, cancel := context.WithCancel(r.ctx) - i, j := p.elemRng[0], p.elemRng[1] + i, j := part.elemRng[0], part.elemRng[1] - start := int64(p.start) + start := int64(part.start) // We assume index does not have any ptrs that has 0 length. - length := int64(p.end) - start + length := int64(part.end) - start // Fetch from object storage concurrently and update stats and posting list. g.Add(func() error { @@ -1697,6 +1706,7 @@ type queryStats struct { postingsTouched int postingsTouchedSizeSum int + postingsToFetch int postingsFetched int postingsFetchedSizeSum int postingsFetchCount int