Skip to content

Commit

Permalink
Store: remove stringset
Browse files Browse the repository at this point in the history
This is the wrong approach to detect if we need to resort. It cannot
detect if we might end up with an unsorted series set if we add
extLabels.

Signed-off-by: Michael Hoffmann <[email protected]>
  • Loading branch information
MichaHoffmann committed Sep 9, 2023
1 parent 053ca63 commit d4bc4bd
Show file tree
Hide file tree
Showing 13 changed files with 3 additions and 325 deletions.
15 changes: 0 additions & 15 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,21 +366,6 @@ func runReceive(
grpcserver.WithTLSConfig(tlsCfg),
)

ctx, cancel := context.WithCancel(context.Background())
level.Debug(logger).Log("msg", "setting up periodic update for label names")
g.Add(func() error {
return runutil.Repeat(10*time.Second, ctx.Done(), func() error {
level.Debug(logger).Log("msg", "Starting label names update")

dbs.UpdateLabelNames(ctx)

level.Debug(logger).Log("msg", "Finished label names update")
return nil
})
}, func(err error) {
cancel()
})

g.Add(
func() error {
level.Info(logger).Log("msg", "listening for StoreAPI and WritableStoreAPI gRPC", "address", conf.grpcConfig.bindAddress)
Expand Down
15 changes: 0 additions & 15 deletions cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -642,21 +642,6 @@ func runRule(
)
storeServer := store.NewLimitedStoreServer(store.NewInstrumentedStoreServer(reg, tsdbStore), reg, conf.storeRateLimits)
options = append(options, grpcserver.WithServer(store.RegisterStoreServer(storeServer, logger)))

ctx, cancel := context.WithCancel(context.Background())
level.Debug(logger).Log("msg", "setting up periodic update for label names")
g.Add(func() error {
return runutil.Repeat(10*time.Second, ctx.Done(), func() error {
level.Debug(logger).Log("msg", "Starting label names update")

tsdbStore.UpdateLabelNames(ctx)

level.Debug(logger).Log("msg", "Finished label names update")
return nil
})
}, func(err error) {
cancel()
})
}

options = append(options, grpcserver.WithServer(
Expand Down
47 changes: 3 additions & 44 deletions cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ import (
"github.com/thanos-io/thanos/pkg/shipper"
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/stringset"
"github.com/thanos-io/thanos/pkg/targets"
"github.com/thanos-io/thanos/pkg/tls"
)
Expand Down Expand Up @@ -113,9 +112,8 @@ func runSidecar(
mint: conf.limitMinTime.PrometheusTimestamp(),
maxt: math.MaxInt64,

limitMinTime: conf.limitMinTime,
client: promclient.NewWithTracingClient(logger, httpClient, "thanos-sidecar"),
labelNamesSet: stringset.AllStrings(),
limitMinTime: conf.limitMinTime,
client: promclient.NewWithTracingClient(logger, httpClient, "thanos-sidecar"),
}

confContentYaml, err := conf.objStore.Content()
Expand Down Expand Up @@ -239,19 +237,6 @@ func runSidecar(
}, func(error) {
cancel()
})

g.Add(func() error {
return runutil.Repeat(10*time.Second, ctx.Done(), func() error {
level.Debug(logger).Log("msg", "Starting label names update")

m.UpdateLabelNames(context.Background())

level.Debug(logger).Log("msg", "Finished label names update")
return nil
})
}, func(err error) {
cancel()
})
}
{
ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -264,7 +249,7 @@ func runSidecar(
{
c := promclient.NewWithTracingClient(logger, httpClient, httpconfig.ThanosUserAgent)

promStore, err := store.NewPrometheusStore(logger, reg, c, conf.prometheus.url, component.Sidecar, m.Labels, m.Timestamps, m.LabelNamesSet, m.Version)
promStore, err := store.NewPrometheusStore(logger, reg, c, conf.prometheus.url, component.Sidecar, m.Labels, m.Timestamps, m.Version)
if err != nil {
return errors.Wrap(err, "create Prometheus store")
}
Expand Down Expand Up @@ -434,8 +419,6 @@ type promMetadata struct {
limitMinTime thanosmodel.TimeOrDurationValue

client *promclient.Client

labelNamesSet stringset.Set
}

func (s *promMetadata) UpdateLabels(ctx context.Context) error {
Expand Down Expand Up @@ -463,30 +446,6 @@ func (s *promMetadata) UpdateTimestamps(mint, maxt int64) {
s.maxt = maxt
}

func (s *promMetadata) UpdateLabelNames(ctx context.Context) {
mint, _ := s.Timestamps()
labelNames, err := s.client.LabelNamesInGRPC(ctx, s.promURL, nil, mint, time.Now().UnixMilli())
if err != nil {
s.mtx.Lock()
defer s.mtx.Unlock()

s.labelNamesSet = stringset.AllStrings()
return
}

filter := stringset.NewFromStrings(labelNames...)
s.mtx.Lock()
s.labelNamesSet = filter
s.mtx.Unlock()
}

func (s *promMetadata) LabelNamesSet() stringset.Set {
s.mtx.Lock()
defer s.mtx.Unlock()

return s.labelNamesSet
}

func (s *promMetadata) Labels() labels.Labels {
s.mtx.Lock()
defer s.mtx.Unlock()
Expand Down
17 changes: 0 additions & 17 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -496,23 +496,6 @@ func runStore(
})
}

{
ctx, cancel := context.WithCancel(context.Background())
level.Debug(logger).Log("msg", "setting up periodic update for label names")
g.Add(func() error {
return runutil.Repeat(10*time.Second, ctx.Done(), func() error {
level.Debug(logger).Log("msg", "Starting label names update")

bs.UpdateLabelNames()

level.Debug(logger).Log("msg", "Finished label names update")
return nil
})
}, func(err error) {
cancel()
})

}
// Add bucket UI for loaded blocks.
{
ins := extpromhttp.NewInstrumentationMiddleware(reg, nil)
Expand Down
13 changes: 0 additions & 13 deletions pkg/receive/multitsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -867,19 +867,6 @@ func (t *MultiTSDB) extractTenantsLabels(tenantID string, initialLset labels.Lab
return initialLset, nil
}

func (t *MultiTSDB) UpdateLabelNames(ctx context.Context) {
t.mtx.RLock()
defer t.mtx.RUnlock()

for _, tenant := range t.tenants {
db := tenant.storeTSDB
if db == nil {
continue
}
db.UpdateLabelNames(ctx)
}
}

// extendLabels extends external labels of the initial label set.
// If an external label shares same name with a label in the initial label set,
// use the label in the initial label set and inform user about it.
Expand Down
2 changes: 0 additions & 2 deletions pkg/store/acceptance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/promclient"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/stringset"
"github.com/thanos-io/thanos/pkg/testutil/custom"
"github.com/thanos-io/thanos/pkg/testutil/e2eutil"
)
Expand Down Expand Up @@ -819,7 +818,6 @@ func TestPrometheusStore_Acceptance(t *testing.T) {
promStore, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar,
func() labels.Labels { return extLset },
func() (int64, int64) { return timestamp.FromTime(minTime), timestamp.FromTime(maxTime) },
func() stringset.Set { return stringset.AllStrings() },
func() string { return version })
testutil.Ok(tt, err)

Expand Down
37 changes: 0 additions & 37 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ import (
"github.com/thanos-io/thanos/pkg/store/hintspb"
"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/stringset"
"github.com/thanos-io/thanos/pkg/strutil"
"github.com/thanos-io/thanos/pkg/tenancy"
"github.com/thanos-io/thanos/pkg/tracing"
Expand Down Expand Up @@ -366,9 +365,6 @@ type BucketStore struct {

enableChunkHashCalculation bool

bmtx sync.Mutex
labelNamesSet stringset.Set

blockEstimatedMaxSeriesFunc BlockEstimator
blockEstimatedMaxChunkFunc BlockEstimator
}
Expand Down Expand Up @@ -515,7 +511,6 @@ func NewBucketStore(
enableSeriesResponseHints: enableSeriesResponseHints,
enableChunkHashCalculation: enableChunkHashCalculation,
seriesBatchSize: SeriesBatchSize,
labelNamesSet: stringset.AllStrings(),
}

for _, option := range options {
Expand Down Expand Up @@ -1696,38 +1691,6 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq
}, nil
}

func (s *BucketStore) UpdateLabelNames() {
s.mtx.RLock()
defer s.mtx.RUnlock()

newSet := stringset.New()
for _, b := range s.blocks {
labelNames, err := b.indexHeaderReader.LabelNames()
if err != nil {
level.Warn(s.logger).Log("msg", "error getting label names", "block", b.meta.ULID, "err", err.Error())
s.updateLabelNamesSet(stringset.AllStrings())
return
}
for _, l := range labelNames {
newSet.Insert(l)
}
}
s.updateLabelNamesSet(newSet)
}

func (s *BucketStore) updateLabelNamesSet(newSet stringset.Set) {
s.bmtx.Lock()
s.labelNamesSet = newSet
s.bmtx.Unlock()
}

func (b *BucketStore) LabelNamesSet() stringset.Set {
b.bmtx.Lock()
defer b.bmtx.Unlock()

return b.labelNamesSet
}

func (b *bucketBlock) FilterExtLabelsMatchers(matchers []*labels.Matcher) ([]*labels.Matcher, bool) {
// We filter external labels from matchers so we won't try to match series on them.
var result []*labels.Matcher
Expand Down
24 changes: 0 additions & 24 deletions pkg/store/bucket_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -779,30 +779,6 @@ func TestBucketStore_LabelNames_e2e(t *testing.T) {
})
}

func TestBucketStore_LabelNamesSet_e2e(t *testing.T) {
objtesting.ForeachStore(t, func(t *testing.T, bkt objstore.Bucket) {
dir := t.TempDir()

s := prepareStoreWithTestBlocks(t, dir, bkt, false, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0), NewBytesLimiterFactory(0), emptyRelabelConfig, allowAllFilterConf)
s.cache.SwapWith(noopCache{})

mint, maxt := s.store.TimeRange()
testutil.Equals(t, s.minTime, mint)
testutil.Equals(t, s.maxTime, maxt)

s.store.UpdateLabelNames()
for _, b := range s.store.blocks {
waitTimeout(t, &b.pendingReaders, 5*time.Second)
}

filter := s.store.LabelNamesSet()
for _, n := range []string{"a", "b", "c"} {
testutil.Assert(t, filter.Has(n), "expected filter to have %s", n)
}
testutil.Equals(t, 3, filter.Count())
})
}

func TestBucketStore_LabelNames_SeriesLimiter_e2e(t *testing.T) {
cases := map[string]struct {
maxSeriesLimit uint64
Expand Down
5 changes: 0 additions & 5 deletions pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ import (
"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/store/storepb"
storetestutil "github.com/thanos-io/thanos/pkg/store/storepb/testutil"
"github.com/thanos-io/thanos/pkg/stringset"
"github.com/thanos-io/thanos/pkg/testutil/custom"
"github.com/thanos-io/thanos/pkg/testutil/e2eutil"
)
Expand Down Expand Up @@ -1634,7 +1633,6 @@ func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) {
chunksLimiterFactory: NewChunksLimiterFactory(0),
seriesLimiterFactory: NewSeriesLimiterFactory(0),
bytesLimiterFactory: NewBytesLimiterFactory(0),
labelNamesSet: stringset.AllStrings(),
}

t.Run("invoke series for one block. Fill the cache on the way.", func(t *testing.T) {
Expand Down Expand Up @@ -3419,9 +3417,6 @@ func TestBucketStoreDedupOnBlockSeriesSet(t *testing.T) {

testutil.Ok(t, bucketStore.SyncBlocks(context.Background()))

// make sure to have updated inner label names
bucketStore.UpdateLabelNames()

srv := newStoreSeriesServer(context.Background())
testutil.Ok(t, bucketStore.Series(&storepb.SeriesRequest{
WithoutReplicaLabels: []string{"replica"},
Expand Down
4 changes: 0 additions & 4 deletions pkg/store/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ import (
"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/store/storepb/prompb"
"github.com/thanos-io/thanos/pkg/stringset"
"github.com/thanos-io/thanos/pkg/tracing"
)

Expand All @@ -54,7 +53,6 @@ type PrometheusStore struct {
buffers sync.Pool
component component.StoreAPI
externalLabelsFn func() labels.Labels
labelNamesSet func() stringset.Set

promVersion func() string
timestamps func() (mint int64, maxt int64)
Expand All @@ -81,7 +79,6 @@ func NewPrometheusStore(
component component.StoreAPI,
externalLabelsFn func() labels.Labels,
timestamps func() (mint int64, maxt int64),
labelNamesSet func() stringset.Set,
promVersion func() string,
) (*PrometheusStore, error) {
if logger == nil {
Expand All @@ -95,7 +92,6 @@ func NewPrometheusStore(
externalLabelsFn: externalLabelsFn,
promVersion: promVersion,
timestamps: timestamps,
labelNamesSet: labelNamesSet,
remoteReadAcceptableResponses: []prompb.ReadRequest_ResponseType{prompb.ReadRequest_STREAMED_XOR_CHUNKS, prompb.ReadRequest_SAMPLES},
buffers: sync.Pool{New: func() interface{} {
b := make([]byte, 0, initialBufSize)
Expand Down
Loading

0 comments on commit d4bc4bd

Please sign in to comment.