Skip to content

Commit

Permalink
Merge pull request #6317 from fpetkovski/resort-dataset-on-internal-d…
Browse files Browse the repository at this point in the history
…edup

Resort store response set on internal label dedup
  • Loading branch information
fpetkovski authored Aug 10, 2023
2 parents 4633976 + 8c511ac commit 84567ec
Show file tree
Hide file tree
Showing 17 changed files with 629 additions and 85 deletions.
15 changes: 15 additions & 0 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,21 @@ func runReceive(
grpcserver.WithTLSConfig(tlsCfg),
)

ctx, cancel := context.WithCancel(context.Background())
level.Debug(logger).Log("msg", "setting up periodic update for label names")
g.Add(func() error {
return runutil.Repeat(10*time.Second, ctx.Done(), func() error {
level.Debug(logger).Log("msg", "Starting label names update")

dbs.UpdateLabelNames(ctx)

level.Debug(logger).Log("msg", "Finished label names update")
return nil
})
}, func(err error) {
cancel()
})

g.Add(
func() error {
level.Info(logger).Log("msg", "listening for StoreAPI and WritableStoreAPI gRPC", "address", conf.grpcConfig.bindAddress)
Expand Down
58 changes: 49 additions & 9 deletions cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
"github.com/thanos-io/thanos/pkg/shipper"
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/stringset"
"github.com/thanos-io/thanos/pkg/targets"
"github.com/thanos-io/thanos/pkg/tls"
)
Expand Down Expand Up @@ -112,8 +113,9 @@ func runSidecar(
mint: conf.limitMinTime.PrometheusTimestamp(),
maxt: math.MaxInt64,

limitMinTime: conf.limitMinTime,
client: promclient.NewWithTracingClient(logger, httpClient, "thanos-sidecar"),
limitMinTime: conf.limitMinTime,
client: promclient.NewWithTracingClient(logger, httpClient, "thanos-sidecar"),
labelNamesSet: stringset.AllStrings(),
}

confContentYaml, err := conf.objStore.Content()
Expand Down Expand Up @@ -237,6 +239,19 @@ func runSidecar(
}, func(error) {
cancel()
})

g.Add(func() error {
return runutil.Repeat(10*time.Second, ctx.Done(), func() error {
level.Debug(logger).Log("msg", "Starting label names update")

m.UpdateLabelNames(context.Background())

level.Debug(logger).Log("msg", "Finished label names update")
return nil
})
}, func(err error) {
cancel()
})
}
{
ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -249,7 +264,7 @@ func runSidecar(
{
c := promclient.NewWithTracingClient(logger, httpClient, httpconfig.ThanosUserAgent)

promStore, err := store.NewPrometheusStore(logger, reg, c, conf.prometheus.url, component.Sidecar, m.Labels, m.Timestamps, m.Version)
promStore, err := store.NewPrometheusStore(logger, reg, c, conf.prometheus.url, component.Sidecar, m.Labels, m.Timestamps, m.LabelNamesSet, m.Version)
if err != nil {
return errors.Wrap(err, "create Prometheus store")
}
Expand Down Expand Up @@ -411,15 +426,16 @@ func validatePrometheus(ctx context.Context, client *promclient.Client, logger l
type promMetadata struct {
promURL *url.URL

mtx sync.Mutex
mint int64
maxt int64
labels labels.Labels
promVersion string

mtx sync.Mutex
mint int64
maxt int64
labels labels.Labels
promVersion string
limitMinTime thanosmodel.TimeOrDurationValue

client *promclient.Client

labelNamesSet stringset.Set
}

func (s *promMetadata) UpdateLabels(ctx context.Context) error {
Expand Down Expand Up @@ -447,6 +463,30 @@ func (s *promMetadata) UpdateTimestamps(mint, maxt int64) {
s.maxt = maxt
}

func (s *promMetadata) UpdateLabelNames(ctx context.Context) {
mint, _ := s.Timestamps()
labelNames, err := s.client.LabelNamesInGRPC(ctx, s.promURL, nil, mint, time.Now().UnixMilli())
if err != nil {
s.mtx.Lock()
defer s.mtx.Unlock()

s.labelNamesSet = stringset.AllStrings()
return
}

filter := stringset.NewFromStrings(labelNames...)
s.mtx.Lock()
s.labelNamesSet = filter
s.mtx.Unlock()
}

func (s *promMetadata) LabelNamesSet() stringset.Set {
s.mtx.Lock()
defer s.mtx.Unlock()

return s.labelNamesSet
}

func (s *promMetadata) Labels() labels.Labels {
s.mtx.Lock()
defer s.mtx.Unlock()
Expand Down
18 changes: 18 additions & 0 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,24 @@ func runStore(
s.Shutdown(err)
})
}

{
ctx, cancel := context.WithCancel(context.Background())
level.Debug(logger).Log("msg", "setting up periodic update for label names")
g.Add(func() error {
return runutil.Repeat(10*time.Second, ctx.Done(), func() error {
level.Debug(logger).Log("msg", "Starting label names update")

bs.UpdateLabelNames()

level.Debug(logger).Log("msg", "Finished label names update")
return nil
})
}, func(err error) {
cancel()
})

}
// Add bucket UI for loaded blocks.
{
ins := extpromhttp.NewInstrumentationMiddleware(reg, nil)
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,14 @@ require (

require (
github.com/onsi/gomega v1.27.10
github.com/seiflotfy/cuckoofilter v0.0.0-20220411075957-e3b120b3f5fb
go.opentelemetry.io/contrib/propagators/autoprop v0.38.0
go4.org/intern v0.0.0-20220617035311-6925f38cc365
golang.org/x/exp v0.0.0-20230321023759-10a507213a29
)

require (
github.com/dgryski/go-metro v0.0.0-20200812162917-85c65e2d0165 // indirect
github.com/huaweicloud/huaweicloud-sdk-go-obs v3.23.3+incompatible // indirect
github.com/onsi/ginkgo v1.16.5 // indirect
go.opentelemetry.io/contrib/propagators/ot v1.13.0 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,8 @@ github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
github.com/dennwc/varint v1.0.0 h1:kGNFFSSw8ToIy3obO/kKr8U9GZYUAxQEVuix4zfDWzE=
github.com/dennwc/varint v1.0.0/go.mod h1:hnItb35rvZvJrbTALZtY/iQfDs48JKRG1RPpgziApxA=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/dgryski/go-metro v0.0.0-20200812162917-85c65e2d0165 h1:BS21ZUJ/B5X2UVUbczfmdWH7GapPWAhxcMsDnjJTU1E=
github.com/dgryski/go-metro v0.0.0-20200812162917-85c65e2d0165/go.mod h1:c9O8+fpSOX1DM8cPNSkX/qsBWdkD4yd2dpciOWQjpBw=
github.com/digitalocean/godo v1.99.0 h1:gUHO7n9bDaZFWvbzOum4bXE0/09ZuYA9yA8idQHX57E=
github.com/dnaeon/go-vcr v1.2.0 h1:zHCHvJYTMh1N7xnV7zf1m1GPBF9Ad0Jk/whtQ1663qI=
github.com/dnaeon/go-vcr v1.2.0/go.mod h1:R4UdLID7HZT3taECzJs4YgbbH6PIGXB6W/sc5OLb6RQ=
Expand Down Expand Up @@ -846,6 +848,8 @@ github.com/santhosh-tekuri/jsonschema v1.2.4/go.mod h1:TEAUOeZSmIxTTuHatJzrvARHi
github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b h1:gQZ0qzfKHQIybLANtM3mBXNUtOfsCFXeTsnBqCsx1KM=
github.com/scaleway/scaleway-sdk-go v1.0.0-beta.15 h1:Y7xOFbD+3jaPw+VN7lkakNJ/pa+ZSQVFp1ONtJaBxns=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
github.com/seiflotfy/cuckoofilter v0.0.0-20220411075957-e3b120b3f5fb h1:XfLJSPIOUX+osiMraVgIrMR27uMXnRJWGm1+GL8/63U=
github.com/seiflotfy/cuckoofilter v0.0.0-20220411075957-e3b120b3f5fb/go.mod h1:bR6DqgcAl1zTcOX8/pE2Qkj9XO00eCNqmKb7lXP8EAg=
github.com/sercand/kuberesolver v2.4.0+incompatible h1:WE2OlRf6wjLxHwNkkFLQGaZcVLEXjMjBPjjEU5vksH8=
github.com/sercand/kuberesolver v2.4.0+incompatible/go.mod h1:lWF3GL0xptCB/vCiJPl/ZshwPsX/n4Y7u0CW9E7aQIQ=
github.com/shirou/gopsutil/v3 v3.21.2/go.mod h1:ghfMypLDrFSWN2c9cDYFLHyynQ+QUht0cv/18ZqVczw=
Expand Down
46 changes: 25 additions & 21 deletions pkg/receive/multitsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,40 +97,31 @@ func NewMultiTSDB(

type localClient struct {
storepb.StoreClient
labelSetFunc func() []labelpb.ZLabelSet
timeRangeFunc func() (int64, int64)
tsdbOpts *tsdb.Options
store *store.TSDBStore
}

func NewLocalClient(
c storepb.StoreClient,
labelSetFunc func() []labelpb.ZLabelSet,
timeRangeFunc func() (int64, int64),
tsdbOpts *tsdb.Options,
) store.Client {
func newLocalClient(c storepb.StoreClient, store *store.TSDBStore) *localClient {
return &localClient{
StoreClient: c,
labelSetFunc: labelSetFunc,
timeRangeFunc: timeRangeFunc,
tsdbOpts: tsdbOpts,
StoreClient: c,
store: store,
}
}

func (l *localClient) LabelSets() []labels.Labels {
return labelpb.ZLabelSetsToPromLabelSets(l.labelSetFunc()...)
return labelpb.ZLabelSetsToPromLabelSets(l.store.LabelSet()...)
}

func (l *localClient) TimeRange() (mint int64, maxt int64) {
return l.timeRangeFunc()
return l.store.TimeRange()
}

func (l *localClient) TSDBInfos() []infopb.TSDBInfo {
labelsets := l.labelSetFunc()
labelsets := l.store.LabelSet()
if len(labelsets) == 0 {
return []infopb.TSDBInfo{}
}

mint, maxt := l.timeRangeFunc()
mint, maxt := l.store.TimeRange()
return []infopb.TSDBInfo{
{
Labels: labelsets[0],
Expand All @@ -141,7 +132,7 @@ func (l *localClient) TSDBInfos() []infopb.TSDBInfo {
}

func (l *localClient) String() string {
mint, maxt := l.timeRangeFunc()
mint, maxt := l.store.TimeRange()
return fmt.Sprintf(
"LabelSets: %v MinTime: %d MaxTime: %d",
labelpb.PromLabelSetsToString(l.LabelSets()), mint, maxt,
Expand Down Expand Up @@ -186,7 +177,7 @@ func (t *tenant) store() *store.TSDBStore {
return t.storeTSDB
}

func (t *tenant) client(logger log.Logger, tsdbOpts *tsdb.Options) store.Client {
func (t *tenant) client(logger log.Logger) store.Client {
t.mtx.RLock()
defer t.mtx.RUnlock()

Expand All @@ -196,7 +187,7 @@ func (t *tenant) client(logger log.Logger, tsdbOpts *tsdb.Options) store.Client
}

client := storepb.ServerAsClient(store.NewRecoverableStoreServer(logger, tsdbStore), 0)
return NewLocalClient(client, tsdbStore.LabelSet, tsdbStore.TimeRange, tsdbOpts)
return newLocalClient(client, tsdbStore)
}

func (t *tenant) exemplars() *exemplars.TSDB {
Expand Down Expand Up @@ -495,7 +486,7 @@ func (t *MultiTSDB) TSDBLocalClients() []store.Client {

res := make([]store.Client, 0, len(t.tenants))
for _, tenant := range t.tenants {
client := tenant.client(t.logger, t.tsdbOpts)
client := tenant.client(t.logger)
if client != nil {
res = append(res, client)
}
Expand Down Expand Up @@ -876,6 +867,19 @@ func (t *MultiTSDB) extractTenantsLabels(tenantID string, initialLset labels.Lab
return initialLset, nil
}

func (t *MultiTSDB) UpdateLabelNames(ctx context.Context) {
t.mtx.RLock()
defer t.mtx.RUnlock()

for _, tenant := range t.tenants {
db := tenant.storeTSDB
if db == nil {
continue
}
db.UpdateLabelNames(ctx)
}
}

// extendLabels extends external labels of the initial label set.
// If an external label shares same name with a label in the initial label set,
// use the label in the initial label set and inform user about it.
Expand Down
Loading

0 comments on commit 84567ec

Please sign in to comment.