diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 68397d7503e..87dd541dc35 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -59,7 +59,6 @@ import ( "github.com/thanos-io/thanos/pkg/store/hintspb" "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" - "github.com/thanos-io/thanos/pkg/store/storepb/testutil" "github.com/thanos-io/thanos/pkg/stringset" "github.com/thanos-io/thanos/pkg/strutil" "github.com/thanos-io/thanos/pkg/tenancy" @@ -1254,9 +1253,7 @@ func debugFoundBlockSetOverview(logger log.Logger, mint, maxt, maxResolutionMill } // Series implements the storepb.StoreServer interface. -func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store_SeriesServer) (err error) { - srv := newFlushableServer(seriesSrv, s.LabelNamesSet(), req.WithoutReplicaLabels) - +func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_SeriesServer) (err error) { if s.queryGate != nil { tracing.DoInSpan(srv.Context(), "store_query_gate_ismyturn", func(ctx context.Context) { err = s.queryGate.Start(srv.Context()) @@ -1377,27 +1374,42 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store return errors.Wrapf(err, "fetch postings for block %s", blk.meta.ULID) } - labelsToRemove := make(map[string]struct{}) - for _, replicaLabel := range req.WithoutReplicaLabels { - labelsToRemove[replicaLabel] = struct{}{} + var resp respSet + if s.labelNamesSet.HasAny(req.WithoutReplicaLabels) { + labelsToRemove := make(map[string]struct{}) + for _, replicaLabel := range req.WithoutReplicaLabels { + labelsToRemove[replicaLabel] = struct{}{} + } + resp = newEagerRespSet( + srv.Context(), + span, + 10*time.Minute, + blk.meta.ULID.String(), + []labels.Labels{blk.extLset}, + onClose, + blockClient, + shardMatcher, + false, + s.metrics.emptyPostingCount, + labelsToRemove, + ) + } else { + resp = newLazyRespSet( + srv.Context(), + span, + 10*time.Minute, + blk.meta.ULID.String(), + []labels.Labels{blk.extLset}, + onClose, + blockClient, + shardMatcher, + false, + s.metrics.emptyPostingCount, + ) } - // TODO: dont use test client here - part := newEagerRespSet( - srv.Context(), - span, - 10*time.Minute, - storetestutil.TestClient{ExtLset: []labels.Labels{blk.extLset}}, - onClose, - blockClient, - shardMatcher, - false, - s.metrics.emptyPostingCount, - labelsToRemove, - ) - mtx.Lock() - respSets = append(respSets, part) + respSets = append(respSets, resp) mtx.Unlock() return nil @@ -1514,10 +1526,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store } } - if err != nil { - return err - } - return srv.Flush() + return err } func chunksSize(chks []storepb.AggrChunk) (size int) { diff --git a/pkg/store/proxy_heap.go b/pkg/store/proxy_heap.go index d5cc9406377..fb17062f39c 100644 --- a/pkg/store/proxy_heap.go +++ b/pkg/store/proxy_heap.go @@ -605,7 +605,8 @@ func newAsyncRespSet( seriesCtx, span, frameTimeout, - st, + st.String(), + st.LabelSets(), closeSeries, cl, shardMatcher, @@ -639,12 +640,14 @@ type eagerRespSet struct { ctx context.Context closeSeries context.CancelFunc - st Client frameTimeout time.Duration shardMatcher *storepb.ShardMatcher removeLabels map[string]struct{} - storeLabels map[string]struct{} + + storeName string + storeLabels map[string]struct{} + storeLabelSets []labels.Labels // Internal bookkeeping. bufferedResponses []*storepb.SeriesResponse @@ -656,7 +659,8 @@ func newEagerRespSet( ctx context.Context, span opentracing.Span, frameTimeout time.Duration, - st Client, + storeName string, + storeLabelSets []labels.Labels, closeSeries context.CancelFunc, cl storepb.Store_SeriesClient, shardMatcher *storepb.ShardMatcher, @@ -666,7 +670,6 @@ func newEagerRespSet( ) respSet { ret := &eagerRespSet{ span: span, - st: st, closeSeries: closeSeries, cl: cl, frameTimeout: frameTimeout, @@ -675,9 +678,11 @@ func newEagerRespSet( wg: &sync.WaitGroup{}, shardMatcher: shardMatcher, removeLabels: removeLabels, + storeName: storeName, + storeLabelSets: storeLabelSets, } ret.storeLabels = make(map[string]struct{}) - for _, ls := range st.LabelSets() { + for _, ls := range storeLabelSets { for _, l := range ls { ret.storeLabels[l.Name] = struct{}{} } @@ -686,7 +691,7 @@ func newEagerRespSet( ret.wg.Add(1) // Start a goroutine and immediately buffer everything. - go func(st Client, l *eagerRespSet) { + go func(l *eagerRespSet) { seriesStats := &storepb.SeriesStatsCounter{} bytesProcessed := 0 @@ -715,7 +720,7 @@ func newEagerRespSet( select { case <-l.ctx.Done(): - err := errors.Wrapf(l.ctx.Err(), "failed to receive any data from %s", st.String()) + err := errors.Wrapf(l.ctx.Err(), "failed to receive any data from %s", storeName) l.bufferedResponses = append(l.bufferedResponses, storepb.NewWarnSeriesResponse(err)) l.span.SetTag("err", err.Error()) return false @@ -731,9 +736,9 @@ func newEagerRespSet( // Most likely the per-Recv timeout has been reached. // There's a small race between canceling and the Recv() // but this is most likely true. - rerr = errors.Wrapf(err, "failed to receive any data in %s from %s", l.frameTimeout, st.String()) + rerr = errors.Wrapf(err, "failed to receive any data in %s from %s", l.frameTimeout, storeName) } else { - rerr = errors.Wrapf(err, "receive series from %s", st.String()) + rerr = errors.Wrapf(err, "receive series from %s", storeName) } l.bufferedResponses = append(l.bufferedResponses, storepb.NewWarnSeriesResponse(rerr)) l.span.SetTag("err", rerr.Error()) @@ -773,7 +778,7 @@ func newEagerRespSet( sortWithoutLabels(l.bufferedResponses, l.removeLabels) } - }(st, ret) + }(ret) return ret } @@ -873,11 +878,11 @@ func (l *eagerRespSet) Empty() bool { } func (l *eagerRespSet) StoreID() string { - return l.st.String() + return l.storeName } func (l *eagerRespSet) Labelset() string { - return labelpb.PromLabelSetsToString(l.st.LabelSets()) + return labelpb.PromLabelSetsToString(l.storeLabelSets) } func (l *eagerRespSet) StoreLabels() map[string]struct{} {