diff --git a/pkg/query/endpointset.go b/pkg/query/endpointset.go index 9bbbc6c5a0d..a92bcf39b61 100644 --- a/pkg/query/endpointset.go +++ b/pkg/query/endpointset.go @@ -13,6 +13,8 @@ import ( "time" "unicode/utf8" + "github.com/thanos-io/thanos/pkg/api/query/querypb" + "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/pkg/errors" @@ -20,8 +22,6 @@ import ( "github.com/prometheus/prometheus/model/labels" "google.golang.org/grpc" - "github.com/thanos-io/thanos/pkg/api/query/querypb" - "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/exemplars/exemplarspb" "github.com/thanos-io/thanos/pkg/info/infopb" diff --git a/pkg/receive/multitsdb.go b/pkg/receive/multitsdb.go index b2a2a9af0e9..b5e7e6b9e8f 100644 --- a/pkg/receive/multitsdb.go +++ b/pkg/receive/multitsdb.go @@ -58,6 +58,9 @@ type MultiTSDB struct { labels labels.Labels bucket objstore.Bucket + lmx sync.Mutex + labelNamesSet stringset.Set + mtx *sync.RWMutex tenants map[string]*tenant allowOutOfOrderUpload bool @@ -94,6 +97,7 @@ func NewMultiTSDB( bucket: bucket, allowOutOfOrderUpload: allowOutOfOrderUpload, hashFunc: hashFunc, + labelNamesSet: stringset.AllStrings(), } } @@ -882,9 +886,28 @@ func (t *MultiTSDB) UpdateLabelNames(ctx context.Context) { t.mtx.RLock() defer t.mtx.RUnlock() - for _, t := range t.tenants { - t.storeTSDB.UpdateLabelNamesSet(ctx) + newSet := stringset.New() + for _, tenant := range t.tenants { + db := tenant.store() + if db == nil { + continue + } + names, err := db.LabelNames(ctx, &storepb.LabelNamesRequest{}) + if err != nil { + level.Debug(t.logger).Log("msg", "failed to get label names", "err", err) + t.lmx.Lock() + t.labelNamesSet = stringset.AllStrings() + t.lmx.Unlock() + return + } + for _, name := range names.Names { + newSet.Insert(name) + } } + + t.lmx.Lock() + t.labelNamesSet = newSet + t.lmx.Unlock() } // extendLabels extends external labels of the initial label set. diff --git a/pkg/store/recover.go b/pkg/store/recover.go index c453c3bec3c..ffe7e401369 100644 --- a/pkg/store/recover.go +++ b/pkg/store/recover.go @@ -24,7 +24,7 @@ func NewRecoverableStoreServer(logger log.Logger, storeServer storepb.StoreServe } func (r *recoverableStoreServer) Series(request *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error { - defer r.recover(srv) + //defer r.recover(srv) return r.StoreServer.Series(request, srv) } diff --git a/pkg/store/tsdb.go b/pkg/store/tsdb.go index c62551a9cac..5bbb469e98e 100644 --- a/pkg/store/tsdb.go +++ b/pkg/store/tsdb.go @@ -25,7 +25,6 @@ import ( "github.com/thanos-io/thanos/pkg/runutil" "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" - "github.com/thanos-io/thanos/pkg/stringset" ) const RemoteReadFrameLimit = 1048576 @@ -45,9 +44,6 @@ type TSDBStore struct { buffers sync.Pool maxBytesPerFrame int - lmx sync.RWMutex - labelNamesSet stringset.Set - extLset labels.Labels mtx sync.RWMutex } @@ -209,12 +205,6 @@ func (s *TSDBStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSer extLsetToRemove[lbl] = struct{}{} } - if s.LabelNamesSet().HasAny(r.WithoutReplicaLabels) { - rs := &resortingServer{Store_SeriesServer: srv} - defer rs.Flush() - srv = rs - } - finalExtLset := rmLabels(s.extLset.Copy(), extLsetToRemove) // Stream at most one series per frame; series may be split over multiple frames according to maxBytesInFrame. for set.Next() { @@ -378,22 +368,3 @@ func (s *TSDBStore) LabelValues(ctx context.Context, r *storepb.LabelValuesReque return &storepb.LabelValuesResponse{Values: values}, nil } - -func (s *TSDBStore) LabelNamesSet() stringset.Set { - s.lmx.RLock() - defer s.lmx.RUnlock() - return s.labelNamesSet -} - -func (s *TSDBStore) UpdateLabelNamesSet(ctx context.Context) { - s.lmx.Lock() - defer s.lmx.Unlock() - - labelNames, err := s.LabelNames(ctx, &storepb.LabelNamesRequest{}) - if err != nil { - s.labelNamesSet = stringset.AllStrings() - return - } - - s.labelNamesSet = stringset.NewFromStrings(labelNames.Names...) -}