Skip to content

Commit

Permalink
Fix multitsdb
Browse files Browse the repository at this point in the history
Signed-off-by: Filip Petkovski <[email protected]>
  • Loading branch information
fpetkovski committed Jul 28, 2023
1 parent 66dd816 commit 4fb3558
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 34 deletions.
4 changes: 2 additions & 2 deletions pkg/query/endpointset.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@ import (
"time"
"unicode/utf8"

"github.com/thanos-io/thanos/pkg/api/query/querypb"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/labels"
"google.golang.org/grpc"

"github.com/thanos-io/thanos/pkg/api/query/querypb"

"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/exemplars/exemplarspb"
"github.com/thanos-io/thanos/pkg/info/infopb"
Expand Down
27 changes: 25 additions & 2 deletions pkg/receive/multitsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ type MultiTSDB struct {
labels labels.Labels
bucket objstore.Bucket

lmx sync.Mutex
labelNamesSet stringset.Set

mtx *sync.RWMutex
tenants map[string]*tenant
allowOutOfOrderUpload bool
Expand Down Expand Up @@ -94,6 +97,7 @@ func NewMultiTSDB(
bucket: bucket,
allowOutOfOrderUpload: allowOutOfOrderUpload,
hashFunc: hashFunc,
labelNamesSet: stringset.AllStrings(),
}
}

Expand Down Expand Up @@ -882,9 +886,28 @@ func (t *MultiTSDB) UpdateLabelNames(ctx context.Context) {
t.mtx.RLock()
defer t.mtx.RUnlock()

for _, t := range t.tenants {
t.storeTSDB.UpdateLabelNamesSet(ctx)
newSet := stringset.New()
for _, tenant := range t.tenants {
db := tenant.store()
if db == nil {
continue
}
names, err := db.LabelNames(ctx, &storepb.LabelNamesRequest{})
if err != nil {
level.Debug(t.logger).Log("msg", "failed to get label names", "err", err)
t.lmx.Lock()
t.labelNamesSet = stringset.AllStrings()
t.lmx.Unlock()
return
}
for _, name := range names.Names {
newSet.Insert(name)
}
}

t.lmx.Lock()
t.labelNamesSet = newSet
t.lmx.Unlock()
}

// extendLabels extends external labels of the initial label set.
Expand Down
2 changes: 1 addition & 1 deletion pkg/store/recover.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func NewRecoverableStoreServer(logger log.Logger, storeServer storepb.StoreServe
}

func (r *recoverableStoreServer) Series(request *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error {
defer r.recover(srv)
//defer r.recover(srv)
return r.StoreServer.Series(request, srv)
}

Expand Down
29 changes: 0 additions & 29 deletions pkg/store/tsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/stringset"
)

const RemoteReadFrameLimit = 1048576
Expand All @@ -45,9 +44,6 @@ type TSDBStore struct {
buffers sync.Pool
maxBytesPerFrame int

lmx sync.RWMutex
labelNamesSet stringset.Set

extLset labels.Labels
mtx sync.RWMutex
}
Expand Down Expand Up @@ -209,12 +205,6 @@ func (s *TSDBStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSer
extLsetToRemove[lbl] = struct{}{}
}

if s.LabelNamesSet().HasAny(r.WithoutReplicaLabels) {
rs := &resortingServer{Store_SeriesServer: srv}
defer rs.Flush()
srv = rs
}

finalExtLset := rmLabels(s.extLset.Copy(), extLsetToRemove)
// Stream at most one series per frame; series may be split over multiple frames according to maxBytesInFrame.
for set.Next() {
Expand Down Expand Up @@ -378,22 +368,3 @@ func (s *TSDBStore) LabelValues(ctx context.Context, r *storepb.LabelValuesReque

return &storepb.LabelValuesResponse{Values: values}, nil
}

func (s *TSDBStore) LabelNamesSet() stringset.Set {
s.lmx.RLock()
defer s.lmx.RUnlock()
return s.labelNamesSet
}

func (s *TSDBStore) UpdateLabelNamesSet(ctx context.Context) {
s.lmx.Lock()
defer s.lmx.Unlock()

labelNames, err := s.LabelNames(ctx, &storepb.LabelNamesRequest{})
if err != nil {
s.labelNamesSet = stringset.AllStrings()
return
}

s.labelNamesSet = stringset.NewFromStrings(labelNames.Names...)
}

0 comments on commit 4fb3558

Please sign in to comment.