Skip to content

Commit

Permalink
Store: fix block dedup
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Hoffmann <[email protected]>
  • Loading branch information
MichaHoffmann committed Sep 4, 2023
1 parent e397769 commit 079539a
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 39 deletions.
61 changes: 35 additions & 26 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ import (
"github.com/thanos-io/thanos/pkg/store/hintspb"
"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/store/storepb/testutil"
"github.com/thanos-io/thanos/pkg/stringset"
"github.com/thanos-io/thanos/pkg/strutil"
"github.com/thanos-io/thanos/pkg/tenancy"
Expand Down Expand Up @@ -1254,9 +1253,7 @@ func debugFoundBlockSetOverview(logger log.Logger, mint, maxt, maxResolutionMill
}

// Series implements the storepb.StoreServer interface.
func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store_SeriesServer) (err error) {
srv := newFlushableServer(seriesSrv, s.LabelNamesSet(), req.WithoutReplicaLabels)

func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_SeriesServer) (err error) {
if s.queryGate != nil {
tracing.DoInSpan(srv.Context(), "store_query_gate_ismyturn", func(ctx context.Context) {
err = s.queryGate.Start(srv.Context())
Expand Down Expand Up @@ -1377,27 +1374,42 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store
return errors.Wrapf(err, "fetch postings for block %s", blk.meta.ULID)
}

labelsToRemove := make(map[string]struct{})
for _, replicaLabel := range req.WithoutReplicaLabels {
labelsToRemove[replicaLabel] = struct{}{}
var resp respSet
if s.labelNamesSet.HasAny(req.WithoutReplicaLabels) {
labelsToRemove := make(map[string]struct{})
for _, replicaLabel := range req.WithoutReplicaLabels {
labelsToRemove[replicaLabel] = struct{}{}
}
resp = newEagerRespSet(
srv.Context(),
span,
10*time.Minute,
blk.meta.ULID.String(),
[]labels.Labels{blk.extLset},
onClose,
blockClient,
shardMatcher,
false,
s.metrics.emptyPostingCount,
labelsToRemove,
)
} else {
resp = newLazyRespSet(
srv.Context(),
span,
10*time.Minute,
blk.meta.ULID.String(),
[]labels.Labels{blk.extLset},
onClose,
blockClient,
shardMatcher,
false,
s.metrics.emptyPostingCount,
)
}

// TODO: dont use test client here
part := newEagerRespSet(
srv.Context(),
span,
10*time.Minute,
storetestutil.TestClient{ExtLset: []labels.Labels{blk.extLset}},
onClose,
blockClient,
shardMatcher,
false,
s.metrics.emptyPostingCount,
labelsToRemove,
)

mtx.Lock()
respSets = append(respSets, part)
respSets = append(respSets, resp)
mtx.Unlock()

return nil
Expand Down Expand Up @@ -1514,10 +1526,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store
}
}

if err != nil {
return err
}
return srv.Flush()
return err
}

func chunksSize(chks []storepb.AggrChunk) (size int) {
Expand Down
31 changes: 18 additions & 13 deletions pkg/store/proxy_heap.go
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,8 @@ func newAsyncRespSet(
seriesCtx,
span,
frameTimeout,
st,
st.String(),
st.LabelSets(),
closeSeries,
cl,
shardMatcher,
Expand Down Expand Up @@ -639,12 +640,14 @@ type eagerRespSet struct {
ctx context.Context

closeSeries context.CancelFunc
st Client
frameTimeout time.Duration

shardMatcher *storepb.ShardMatcher
removeLabels map[string]struct{}
storeLabels map[string]struct{}

storeName string
storeLabels map[string]struct{}
storeLabelSets []labels.Labels

// Internal bookkeeping.
bufferedResponses []*storepb.SeriesResponse
Expand All @@ -656,7 +659,8 @@ func newEagerRespSet(
ctx context.Context,
span opentracing.Span,
frameTimeout time.Duration,
st Client,
storeName string,
storeLabelSets []labels.Labels,
closeSeries context.CancelFunc,
cl storepb.Store_SeriesClient,
shardMatcher *storepb.ShardMatcher,
Expand All @@ -666,7 +670,6 @@ func newEagerRespSet(
) respSet {
ret := &eagerRespSet{
span: span,
st: st,
closeSeries: closeSeries,
cl: cl,
frameTimeout: frameTimeout,
Expand All @@ -675,9 +678,11 @@ func newEagerRespSet(
wg: &sync.WaitGroup{},
shardMatcher: shardMatcher,
removeLabels: removeLabels,
storeName: storeName,
storeLabelSets: storeLabelSets,
}
ret.storeLabels = make(map[string]struct{})
for _, ls := range st.LabelSets() {
for _, ls := range storeLabelSets {
for _, l := range ls {
ret.storeLabels[l.Name] = struct{}{}
}
Expand All @@ -686,7 +691,7 @@ func newEagerRespSet(
ret.wg.Add(1)

// Start a goroutine and immediately buffer everything.
go func(st Client, l *eagerRespSet) {
go func(l *eagerRespSet) {
seriesStats := &storepb.SeriesStatsCounter{}
bytesProcessed := 0

Expand Down Expand Up @@ -715,7 +720,7 @@ func newEagerRespSet(

select {
case <-l.ctx.Done():
err := errors.Wrapf(l.ctx.Err(), "failed to receive any data from %s", st.String())
err := errors.Wrapf(l.ctx.Err(), "failed to receive any data from %s", storeName)
l.bufferedResponses = append(l.bufferedResponses, storepb.NewWarnSeriesResponse(err))
l.span.SetTag("err", err.Error())
return false
Expand All @@ -731,9 +736,9 @@ func newEagerRespSet(
// Most likely the per-Recv timeout has been reached.
// There's a small race between canceling and the Recv()
// but this is most likely true.
rerr = errors.Wrapf(err, "failed to receive any data in %s from %s", l.frameTimeout, st.String())
rerr = errors.Wrapf(err, "failed to receive any data in %s from %s", l.frameTimeout, storeName)
} else {
rerr = errors.Wrapf(err, "receive series from %s", st.String())
rerr = errors.Wrapf(err, "receive series from %s", storeName)
}
l.bufferedResponses = append(l.bufferedResponses, storepb.NewWarnSeriesResponse(rerr))
l.span.SetTag("err", rerr.Error())
Expand Down Expand Up @@ -773,7 +778,7 @@ func newEagerRespSet(
sortWithoutLabels(l.bufferedResponses, l.removeLabels)
}

}(st, ret)
}(ret)

return ret
}
Expand Down Expand Up @@ -873,11 +878,11 @@ func (l *eagerRespSet) Empty() bool {
}

func (l *eagerRespSet) StoreID() string {
return l.st.String()
return l.storeName
}

func (l *eagerRespSet) Labelset() string {
return labelpb.PromLabelSetsToString(l.st.LabelSets())
return labelpb.PromLabelSetsToString(l.storeLabelSets)
}

func (l *eagerRespSet) StoreLabels() map[string]struct{} {
Expand Down

0 comments on commit 079539a

Please sign in to comment.