diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index 8d37f42c231..5b699d264dc 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -366,21 +366,6 @@ func runReceive( grpcserver.WithTLSConfig(tlsCfg), ) - ctx, cancel := context.WithCancel(context.Background()) - level.Debug(logger).Log("msg", "setting up periodic update for label names") - g.Add(func() error { - return runutil.Repeat(10*time.Second, ctx.Done(), func() error { - level.Debug(logger).Log("msg", "Starting label names update") - - dbs.UpdateLabelNames(ctx) - - level.Debug(logger).Log("msg", "Finished label names update") - return nil - }) - }, func(err error) { - cancel() - }) - g.Add( func() error { level.Info(logger).Log("msg", "listening for StoreAPI and WritableStoreAPI gRPC", "address", conf.grpcConfig.bindAddress) diff --git a/cmd/thanos/rule.go b/cmd/thanos/rule.go index ddbb468de64..63f32a6beaa 100644 --- a/cmd/thanos/rule.go +++ b/cmd/thanos/rule.go @@ -642,21 +642,6 @@ func runRule( ) storeServer := store.NewLimitedStoreServer(store.NewInstrumentedStoreServer(reg, tsdbStore), reg, conf.storeRateLimits) options = append(options, grpcserver.WithServer(store.RegisterStoreServer(storeServer, logger))) - - ctx, cancel := context.WithCancel(context.Background()) - level.Debug(logger).Log("msg", "setting up periodic update for label names") - g.Add(func() error { - return runutil.Repeat(10*time.Second, ctx.Done(), func() error { - level.Debug(logger).Log("msg", "Starting label names update") - - tsdbStore.UpdateLabelNames(ctx) - - level.Debug(logger).Log("msg", "Finished label names update") - return nil - }) - }, func(err error) { - cancel() - }) } options = append(options, grpcserver.WithServer( diff --git a/cmd/thanos/sidecar.go b/cmd/thanos/sidecar.go index 968a09ee9cb..27cf759b2ab 100644 --- a/cmd/thanos/sidecar.go +++ b/cmd/thanos/sidecar.go @@ -48,7 +48,6 @@ import ( "github.com/thanos-io/thanos/pkg/shipper" "github.com/thanos-io/thanos/pkg/store" "github.com/thanos-io/thanos/pkg/store/labelpb" - "github.com/thanos-io/thanos/pkg/stringset" "github.com/thanos-io/thanos/pkg/targets" "github.com/thanos-io/thanos/pkg/tls" ) @@ -113,9 +112,8 @@ func runSidecar( mint: conf.limitMinTime.PrometheusTimestamp(), maxt: math.MaxInt64, - limitMinTime: conf.limitMinTime, - client: promclient.NewWithTracingClient(logger, httpClient, "thanos-sidecar"), - labelNamesSet: stringset.AllStrings(), + limitMinTime: conf.limitMinTime, + client: promclient.NewWithTracingClient(logger, httpClient, "thanos-sidecar"), } confContentYaml, err := conf.objStore.Content() @@ -239,19 +237,6 @@ func runSidecar( }, func(error) { cancel() }) - - g.Add(func() error { - return runutil.Repeat(10*time.Second, ctx.Done(), func() error { - level.Debug(logger).Log("msg", "Starting label names update") - - m.UpdateLabelNames(context.Background()) - - level.Debug(logger).Log("msg", "Finished label names update") - return nil - }) - }, func(err error) { - cancel() - }) } { ctx, cancel := context.WithCancel(context.Background()) @@ -264,7 +249,7 @@ func runSidecar( { c := promclient.NewWithTracingClient(logger, httpClient, httpconfig.ThanosUserAgent) - promStore, err := store.NewPrometheusStore(logger, reg, c, conf.prometheus.url, component.Sidecar, m.Labels, m.Timestamps, m.LabelNamesSet, m.Version) + promStore, err := store.NewPrometheusStore(logger, reg, c, conf.prometheus.url, component.Sidecar, m.Labels, m.Timestamps, m.Version) if err != nil { return errors.Wrap(err, "create Prometheus store") } @@ -434,8 +419,6 @@ type promMetadata struct { limitMinTime thanosmodel.TimeOrDurationValue client *promclient.Client - - labelNamesSet stringset.Set } func (s *promMetadata) UpdateLabels(ctx context.Context) error { @@ -463,30 +446,6 @@ func (s *promMetadata) UpdateTimestamps(mint, maxt int64) { s.maxt = maxt } -func (s *promMetadata) UpdateLabelNames(ctx context.Context) { - mint, _ := s.Timestamps() - labelNames, err := s.client.LabelNamesInGRPC(ctx, s.promURL, nil, mint, time.Now().UnixMilli()) - if err != nil { - s.mtx.Lock() - defer s.mtx.Unlock() - - s.labelNamesSet = stringset.AllStrings() - return - } - - filter := stringset.NewFromStrings(labelNames...) - s.mtx.Lock() - s.labelNamesSet = filter - s.mtx.Unlock() -} - -func (s *promMetadata) LabelNamesSet() stringset.Set { - s.mtx.Lock() - defer s.mtx.Unlock() - - return s.labelNamesSet -} - func (s *promMetadata) Labels() labels.Labels { s.mtx.Lock() defer s.mtx.Unlock() diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index 9ddfbd89b76..4dcb9a7bcb7 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -496,23 +496,6 @@ func runStore( }) } - { - ctx, cancel := context.WithCancel(context.Background()) - level.Debug(logger).Log("msg", "setting up periodic update for label names") - g.Add(func() error { - return runutil.Repeat(10*time.Second, ctx.Done(), func() error { - level.Debug(logger).Log("msg", "Starting label names update") - - bs.UpdateLabelNames() - - level.Debug(logger).Log("msg", "Finished label names update") - return nil - }) - }, func(err error) { - cancel() - }) - - } // Add bucket UI for loaded blocks. { ins := extpromhttp.NewInstrumentationMiddleware(reg, nil) diff --git a/pkg/receive/multitsdb.go b/pkg/receive/multitsdb.go index 4dadb97343d..509e9f35351 100644 --- a/pkg/receive/multitsdb.go +++ b/pkg/receive/multitsdb.go @@ -867,19 +867,6 @@ func (t *MultiTSDB) extractTenantsLabels(tenantID string, initialLset labels.Lab return initialLset, nil } -func (t *MultiTSDB) UpdateLabelNames(ctx context.Context) { - t.mtx.RLock() - defer t.mtx.RUnlock() - - for _, tenant := range t.tenants { - db := tenant.storeTSDB - if db == nil { - continue - } - db.UpdateLabelNames(ctx) - } -} - // extendLabels extends external labels of the initial label set. // If an external label shares same name with a label in the initial label set, // use the label in the initial label set and inform user about it. diff --git a/pkg/store/acceptance_test.go b/pkg/store/acceptance_test.go index d0a1bcfaf43..52298f40d1a 100644 --- a/pkg/store/acceptance_test.go +++ b/pkg/store/acceptance_test.go @@ -28,7 +28,6 @@ import ( "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/promclient" "github.com/thanos-io/thanos/pkg/store/storepb" - "github.com/thanos-io/thanos/pkg/stringset" "github.com/thanos-io/thanos/pkg/testutil/custom" "github.com/thanos-io/thanos/pkg/testutil/e2eutil" ) @@ -819,7 +818,6 @@ func TestPrometheusStore_Acceptance(t *testing.T) { promStore, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, func() labels.Labels { return extLset }, func() (int64, int64) { return timestamp.FromTime(minTime), timestamp.FromTime(maxTime) }, - func() stringset.Set { return stringset.AllStrings() }, func() string { return version }) testutil.Ok(tt, err) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index e010f756002..8b12e13e4f0 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -59,7 +59,6 @@ import ( "github.com/thanos-io/thanos/pkg/store/hintspb" "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" - "github.com/thanos-io/thanos/pkg/stringset" "github.com/thanos-io/thanos/pkg/strutil" "github.com/thanos-io/thanos/pkg/tenancy" "github.com/thanos-io/thanos/pkg/tracing" @@ -366,9 +365,6 @@ type BucketStore struct { enableChunkHashCalculation bool - bmtx sync.Mutex - labelNamesSet stringset.Set - blockEstimatedMaxSeriesFunc BlockEstimator blockEstimatedMaxChunkFunc BlockEstimator } @@ -515,7 +511,6 @@ func NewBucketStore( enableSeriesResponseHints: enableSeriesResponseHints, enableChunkHashCalculation: enableChunkHashCalculation, seriesBatchSize: SeriesBatchSize, - labelNamesSet: stringset.AllStrings(), } for _, option := range options { @@ -1696,38 +1691,6 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq }, nil } -func (s *BucketStore) UpdateLabelNames() { - s.mtx.RLock() - defer s.mtx.RUnlock() - - newSet := stringset.New() - for _, b := range s.blocks { - labelNames, err := b.indexHeaderReader.LabelNames() - if err != nil { - level.Warn(s.logger).Log("msg", "error getting label names", "block", b.meta.ULID, "err", err.Error()) - s.updateLabelNamesSet(stringset.AllStrings()) - return - } - for _, l := range labelNames { - newSet.Insert(l) - } - } - s.updateLabelNamesSet(newSet) -} - -func (s *BucketStore) updateLabelNamesSet(newSet stringset.Set) { - s.bmtx.Lock() - s.labelNamesSet = newSet - s.bmtx.Unlock() -} - -func (b *BucketStore) LabelNamesSet() stringset.Set { - b.bmtx.Lock() - defer b.bmtx.Unlock() - - return b.labelNamesSet -} - func (b *bucketBlock) FilterExtLabelsMatchers(matchers []*labels.Matcher) ([]*labels.Matcher, bool) { // We filter external labels from matchers so we won't try to match series on them. var result []*labels.Matcher diff --git a/pkg/store/bucket_e2e_test.go b/pkg/store/bucket_e2e_test.go index 7262dee1554..ebd1ffa7095 100644 --- a/pkg/store/bucket_e2e_test.go +++ b/pkg/store/bucket_e2e_test.go @@ -779,30 +779,6 @@ func TestBucketStore_LabelNames_e2e(t *testing.T) { }) } -func TestBucketStore_LabelNamesSet_e2e(t *testing.T) { - objtesting.ForeachStore(t, func(t *testing.T, bkt objstore.Bucket) { - dir := t.TempDir() - - s := prepareStoreWithTestBlocks(t, dir, bkt, false, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0), NewBytesLimiterFactory(0), emptyRelabelConfig, allowAllFilterConf) - s.cache.SwapWith(noopCache{}) - - mint, maxt := s.store.TimeRange() - testutil.Equals(t, s.minTime, mint) - testutil.Equals(t, s.maxTime, maxt) - - s.store.UpdateLabelNames() - for _, b := range s.store.blocks { - waitTimeout(t, &b.pendingReaders, 5*time.Second) - } - - filter := s.store.LabelNamesSet() - for _, n := range []string{"a", "b", "c"} { - testutil.Assert(t, filter.Has(n), "expected filter to have %s", n) - } - testutil.Equals(t, 3, filter.Count()) - }) -} - func TestBucketStore_LabelNames_SeriesLimiter_e2e(t *testing.T) { cases := map[string]struct { maxSeriesLimit uint64 diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index f28798d8e15..bd73e5d08d0 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -60,7 +60,6 @@ import ( "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" storetestutil "github.com/thanos-io/thanos/pkg/store/storepb/testutil" - "github.com/thanos-io/thanos/pkg/stringset" "github.com/thanos-io/thanos/pkg/testutil/custom" "github.com/thanos-io/thanos/pkg/testutil/e2eutil" ) @@ -1634,7 +1633,6 @@ func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) { chunksLimiterFactory: NewChunksLimiterFactory(0), seriesLimiterFactory: NewSeriesLimiterFactory(0), bytesLimiterFactory: NewBytesLimiterFactory(0), - labelNamesSet: stringset.AllStrings(), } t.Run("invoke series for one block. Fill the cache on the way.", func(t *testing.T) { @@ -3419,9 +3417,6 @@ func TestBucketStoreDedupOnBlockSeriesSet(t *testing.T) { testutil.Ok(t, bucketStore.SyncBlocks(context.Background())) - // make sure to have updated inner label names - bucketStore.UpdateLabelNames() - srv := newStoreSeriesServer(context.Background()) testutil.Ok(t, bucketStore.Series(&storepb.SeriesRequest{ WithoutReplicaLabels: []string{"replica"}, diff --git a/pkg/store/prometheus.go b/pkg/store/prometheus.go index 308a0741cea..fd6d4c01952 100644 --- a/pkg/store/prometheus.go +++ b/pkg/store/prometheus.go @@ -42,7 +42,6 @@ import ( "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/store/storepb/prompb" - "github.com/thanos-io/thanos/pkg/stringset" "github.com/thanos-io/thanos/pkg/tracing" ) @@ -54,7 +53,6 @@ type PrometheusStore struct { buffers sync.Pool component component.StoreAPI externalLabelsFn func() labels.Labels - labelNamesSet func() stringset.Set promVersion func() string timestamps func() (mint int64, maxt int64) @@ -81,7 +79,6 @@ func NewPrometheusStore( component component.StoreAPI, externalLabelsFn func() labels.Labels, timestamps func() (mint int64, maxt int64), - labelNamesSet func() stringset.Set, promVersion func() string, ) (*PrometheusStore, error) { if logger == nil { @@ -95,7 +92,6 @@ func NewPrometheusStore( externalLabelsFn: externalLabelsFn, promVersion: promVersion, timestamps: timestamps, - labelNamesSet: labelNamesSet, remoteReadAcceptableResponses: []prompb.ReadRequest_ResponseType{prompb.ReadRequest_STREAMED_XOR_CHUNKS, prompb.ReadRequest_SAMPLES}, buffers: sync.Pool{New: func() interface{} { b := make([]byte, 0, initialBufSize) diff --git a/pkg/store/prometheus_test.go b/pkg/store/prometheus_test.go index 82965672c72..d0597b6e9cb 100644 --- a/pkg/store/prometheus_test.go +++ b/pkg/store/prometheus_test.go @@ -26,7 +26,6 @@ import ( "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/store/storepb/prompb" - "github.com/thanos-io/thanos/pkg/stringset" "github.com/thanos-io/thanos/pkg/testutil/custom" "github.com/thanos-io/thanos/pkg/testutil/e2eutil" ) @@ -72,7 +71,6 @@ func testPrometheusStoreSeriesE2e(t *testing.T, prefix string) { proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, func() labels.Labels { return labels.FromStrings("region", "eu-west") }, func() (int64, int64) { return limitMinT, -1 }, - func() stringset.Set { return stringset.AllStrings() }, nil, ) // MaxTime does not matter. testutil.Ok(t, err) @@ -234,7 +232,6 @@ func TestPrometheusStore_SeriesLabels_e2e(t *testing.T) { promStore, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, func() labels.Labels { return labels.FromStrings("region", "eu-west") }, func() (int64, int64) { return math.MinInt64/1000 + 62135596801, math.MaxInt64/1000 - 62135596801 }, - func() stringset.Set { return stringset.AllStrings() }, nil, ) testutil.Ok(t, err) @@ -417,7 +414,6 @@ func TestPrometheusStore_Series_MatchExternalLabel(t *testing.T) { proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, func() labels.Labels { return labels.FromStrings("region", "eu-west") }, func() (int64, int64) { return 0, math.MaxInt64 }, - func() stringset.Set { return stringset.AllStrings() }, nil) testutil.Ok(t, err) srv := newStoreSeriesServer(ctx) @@ -481,7 +477,6 @@ func TestPrometheusStore_Series_ChunkHashCalculation_Integration(t *testing.T) { proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, func() labels.Labels { return labels.FromStrings("region", "eu-west") }, func() (int64, int64) { return 0, math.MaxInt64 }, - func() stringset.Set { return stringset.AllStrings() }, nil) testutil.Ok(t, err) srv := newStoreSeriesServer(ctx) @@ -512,7 +507,6 @@ func TestPrometheusStore_Info(t *testing.T) { proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), nil, component.Sidecar, func() labels.Labels { return labels.FromStrings("region", "eu-west") }, func() (int64, int64) { return 123, 456 }, - func() stringset.Set { return stringset.AllStrings() }, nil) testutil.Ok(t, err) @@ -592,7 +586,6 @@ func TestPrometheusStore_Series_SplitSamplesIntoChunksWithMaxSizeOf120(t *testin proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, func() labels.Labels { return labels.FromStrings("region", "eu-west") }, func() (int64, int64) { return 0, math.MaxInt64 }, - func() stringset.Set { return stringset.AllStrings() }, nil) testutil.Ok(t, err) diff --git a/pkg/store/tsdb.go b/pkg/store/tsdb.go index 37badaf1dc8..b5182f30081 100644 --- a/pkg/store/tsdb.go +++ b/pkg/store/tsdb.go @@ -13,7 +13,6 @@ import ( "sync" "github.com/go-kit/log" - "github.com/go-kit/log/level" "github.com/pkg/errors" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" @@ -26,7 +25,6 @@ import ( "github.com/thanos-io/thanos/pkg/runutil" "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" - "github.com/thanos-io/thanos/pkg/stringset" ) const RemoteReadFrameLimit = 1048576 @@ -46,9 +44,6 @@ type TSDBStore struct { buffers sync.Pool maxBytesPerFrame int - lmx sync.RWMutex - labelNamesSet stringset.Set - extLset labels.Labels mtx sync.RWMutex } @@ -77,7 +72,6 @@ func NewTSDBStore(logger log.Logger, db TSDBReader, component component.StoreAPI component: component, extLset: extLset, maxBytesPerFrame: RemoteReadFrameLimit, - labelNamesSet: stringset.AllStrings(), buffers: sync.Pool{New: func() interface{} { b := make([]byte, 0, initialBufSize) return &b @@ -376,38 +370,3 @@ func (s *TSDBStore) LabelValues(ctx context.Context, r *storepb.LabelValuesReque return &storepb.LabelValuesResponse{Values: values}, nil } - -func (s *TSDBStore) UpdateLabelNames(ctx context.Context) { - newSet := stringset.New() - q, err := s.db.ChunkQuerier(ctx, math.MinInt64, math.MaxInt64) - if err != nil { - level.Warn(s.logger).Log("msg", "error creating tsdb querier", "err", err.Error()) - s.setLabelNamesSet(stringset.AllStrings()) - return - } - defer runutil.CloseWithLogOnErr(s.logger, q, "close tsdb querier label names") - - res, _, err := q.LabelNames() - if err != nil { - level.Warn(s.logger).Log("msg", "error getting label names", "err", err.Error()) - s.setLabelNamesSet(stringset.AllStrings()) - return - } - for _, l := range res { - newSet.Insert(l) - } - s.setLabelNamesSet(newSet) -} - -func (s *TSDBStore) setLabelNamesSet(newSet stringset.Set) { - s.lmx.Lock() - s.labelNamesSet = newSet - s.lmx.Unlock() -} - -func (b *TSDBStore) LabelNamesSet() stringset.Set { - b.lmx.RLock() - defer b.lmx.RUnlock() - - return b.labelNamesSet -} diff --git a/pkg/stringset/set.go b/pkg/stringset/set.go deleted file mode 100644 index 080071570fe..00000000000 --- a/pkg/stringset/set.go +++ /dev/null @@ -1,101 +0,0 @@ -// Copyright (c) The Thanos Authors. -// Licensed under the Apache License 2.0. - -package stringset - -import ( - cuckoo "github.com/seiflotfy/cuckoofilter" -) - -type Set interface { - Has(string) bool - HasAny([]string) bool - // Count returns the number of elements in the set. - // A value of -1 indicates infinite size and can be returned by a - // set representing all possible string values. - Count() int -} - -type fixedSet struct { - cuckoo *cuckoo.Filter -} - -func (f fixedSet) HasAny(strings []string) bool { - for _, s := range strings { - if f.Has(s) { - return true - } - } - return false -} - -func NewFromStrings(items ...string) Set { - f := cuckoo.NewFilter(uint(len(items))) - for _, label := range items { - f.InsertUnique([]byte(label)) - } - - return &fixedSet{cuckoo: f} -} - -func (f fixedSet) Has(s string) bool { - return f.cuckoo.Lookup([]byte(s)) -} - -func (f fixedSet) Count() int { - return int(f.cuckoo.Count()) -} - -type mutableSet struct { - cuckoo *cuckoo.ScalableCuckooFilter -} - -type MutableSet interface { - Set - Insert(string) -} - -func New() MutableSet { - return &mutableSet{ - cuckoo: cuckoo.NewScalableCuckooFilter(), - } -} - -func (e mutableSet) Insert(s string) { - e.cuckoo.InsertUnique([]byte(s)) -} - -func (e mutableSet) Has(s string) bool { - return e.cuckoo.Lookup([]byte(s)) -} - -func (e mutableSet) HasAny(strings []string) bool { - for _, s := range strings { - if e.Has(s) { - return true - } - } - return false -} - -func (e mutableSet) Count() int { - return int(e.cuckoo.Count()) -} - -type allStringsSet struct{} - -func (e allStringsSet) HasAny(_ []string) bool { - return true -} - -func AllStrings() *allStringsSet { - return &allStringsSet{} -} - -func (e allStringsSet) Has(_ string) bool { - return true -} - -func (e allStringsSet) Count() int { - return -1 -}