Skip to content

Commit

Permalink
store & compact: For components that operates on blocks - expose the …
Browse files Browse the repository at this point in the history
…UI on /loaded-blocks (#2357)

Signed-off-by: Bartlomiej Plotka <[email protected]>
  • Loading branch information
bwplotka authored Apr 1, 2020
1 parent e4090ad commit 8f492a9
Show file tree
Hide file tree
Showing 14 changed files with 178 additions and 139 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel
- [#1714](https://github.com/thanos-io/thanos/pull/1714) Run the bucket web UI in the compact component when it is run as a long-lived process.
- [#2304](https://github.com/thanos-io/thanos/pull/2304) Store: Added `max_item_size` config option to memcached-based index cache. This should be set to the max item size configured in memcached (`-I` flag) in order to not waste network round-trips to cache items larger than the limit configured in memcached.
- [#2297](https://github.com/thanos-io/thanos/pull/2297) Store Gateway: Add `--experimental.enable-index-cache-postings-compression` flag to enable reencoding and compressing postings before storing them into cache. Compressed postings take about 10% of the original size.
- [#2357](https://github.com/thanos-io/thanos/pull/2357) Compactor and Store Gateway now have serve BucketUI on `:<http-port>/loaded` and shows exactly the blocks that are currently seen by compactor and store gateway. Compactor also serves different BucketUI on `:<http-port>/global` that shows the status of object storage without any filters.

### Changed

Expand Down
25 changes: 16 additions & 9 deletions cmd/thanos/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,15 +340,10 @@ func registerBucketWeb(m map[string]setupFunc, root *kingpin.CmdClause, name str
httpserver.WithGracePeriod(time.Duration(*httpGracePeriod)),
)

flagsMap := map[string]string{
"web.external-prefix": *webExternalPrefix,
"web.prefix-header": *webPrefixHeaderName,
}

router := route.New()

bucketUI := ui.NewBucketUI(logger, *label, flagsMap)
bucketUI.Register(router.WithPrefix(*webExternalPrefix), extpromhttp.NewInstrumentationMiddleware(reg))
bucketUI := ui.NewBucketUI(logger, *label, *webExternalPrefix, *webPrefixHeaderName)
bucketUI.Register(router, extpromhttp.NewInstrumentationMiddleware(reg))
srv.Handle("/", router)

if *interval < 5*time.Minute {
Expand All @@ -373,17 +368,29 @@ func registerBucketWeb(m map[string]setupFunc, root *kingpin.CmdClause, name str
return errors.Wrap(err, "bucket client")
}

// TODO(bwplotka): Allow Bucket UI to visualisate the state of block as well.
// TODO(bwplotka): Allow Bucket UI to visualize the state of block as well.
fetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), nil, nil)
if err != nil {
return err
}
fetcher.UpdateOnChange(bucketUI.Set)

ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
statusProber.Ready()
defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client")
return bucketUI.RunRefreshLoop(ctx, fetcher, *interval, *timeout)
return runutil.Repeat(*interval, ctx.Done(), func() error {
return runutil.RetryWithLog(logger, time.Minute, ctx.Done(), func() error {
iterCtx, iterCancel := context.WithTimeout(ctx, *timeout)
defer iterCancel()

_, _, err := fetcher.Fetch(iterCtx)
if err != nil {
return err
}
return nil
})
})
}, func(error) {
cancel()
})
Expand Down
62 changes: 40 additions & 22 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,6 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application) {

webExternalPrefix := cmd.Flag("web.external-prefix", "Static prefix for all HTML links and redirect URLs in the bucket web UI interface. Actual endpoints are still served on / or the web.route-prefix. This allows thanos bucket web UI to be served behind a reverse proxy that strips a URL sub-path.").Default("").String()
webPrefixHeaderName := cmd.Flag("web.prefix-header", "Name of HTTP request header used for dynamic prefixing of UI links and redirects. This option is ignored if web.external-prefix argument is set. Security risk: enable this option only if a reverse proxy in front of thanos is resetting the header. The --web.prefix-header=X-Forwarded-Prefix option can be useful, for example, if Thanos UI is served via Traefik reverse proxy with PathPrefixStrip option enabled, which sends the stripped prefix value in X-Forwarded-Prefix header. This allows thanos UI to be served on a sub-path.").Default("").String()
flagsMap := map[string]string{
"web.external-prefix": *webExternalPrefix,
"web.prefix-header": *webPrefixHeaderName,
}

label := cmd.Flag("bucket-web-label", "Prometheus label to use as timeline title in the bucket web UI").String()

m[component.Compact.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, _ bool) error {
Expand Down Expand Up @@ -179,7 +174,8 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application) {
selectorRelabelConf,
*waitInterval,
*label,
flagsMap,
*webExternalPrefix,
*webPrefixHeaderName,
)
}
}
Expand All @@ -194,21 +190,17 @@ func runCompact(
objStoreConfig *extflag.PathOrContent,
consistencyDelay time.Duration,
deleteDelay time.Duration,
haltOnError bool,
acceptMalformedIndex bool,
wait bool,
generateMissingIndexCacheFiles bool,
haltOnError, acceptMalformedIndex, wait, generateMissingIndexCacheFiles bool,
retentionByResolution map[compact.ResolutionLevel]time.Duration,
component component.Component,
disableDownsampling bool,
maxCompactionLevel int,
blockSyncConcurrency int,
maxCompactionLevel, blockSyncConcurrency int,
concurrency int,
dedupReplicaLabels []string,
selectorRelabelConf *extflag.PathOrContent,
waitInterval time.Duration,
label string,
flagsMap map[string]string,
externalPrefix, prefixHeader string,
) error {
halted := promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "thanos_compactor_halted",
Expand Down Expand Up @@ -306,13 +298,12 @@ func runCompact(
if err != nil {
return errors.Wrap(err, "create meta fetcher")
}
metaFetcherFilters := []block.MetadataFilter{
compactFetcher := baseMetaFetcher.NewMetaFetcher(extprom.WrapRegistererWithPrefix("thanos_", reg), []block.MetadataFilter{
block.NewLabelShardedMetaFilter(relabelConfig),
block.NewConsistencyDelayMetaFilter(logger, consistencyDelay, extprom.WrapRegistererWithPrefix("thanos_", reg)),
ignoreDeletionMarkFilter,
duplicateBlocksFilter,
}
compactFetcher := baseMetaFetcher.WithFilters(extprom.WrapRegistererWithPrefix("thanos_", reg), metaFetcherFilters, []block.MetadataModifier{block.NewReplicaLabelRemover(logger, dedupReplicaLabels)})
}, []block.MetadataModifier{block.NewReplicaLabelRemover(logger, dedupReplicaLabels)})
enableVerticalCompaction := false
if len(dedupReplicaLabels) > 0 {
enableVerticalCompaction = true
Expand Down Expand Up @@ -457,14 +448,41 @@ func runCompact(
})

if wait {
router := route.New()
bucketUI := ui.NewBucketUI(logger, label, flagsMap)
bucketUI.Register(router, extpromhttp.NewInstrumentationMiddleware(reg))
srv.Handle("/", router)
r := route.New()

ins := extpromhttp.NewInstrumentationMiddleware(reg)
compactorView := ui.NewBucketUI(logger, label, path.Join(externalPrefix, "/loaded"), prefixHeader)
compactorView.Register(r, ins)
compactFetcher.UpdateOnChange(compactorView.Set)

global := ui.NewBucketUI(logger, label, path.Join(externalPrefix, "/global"), prefixHeader)
global.Register(r, ins)

// Separate fetcher for global view.
// TODO(bwplotka): Allow Bucket UI to visualize the state of the block as well.
f := baseMetaFetcher.NewMetaFetcher(extprom.WrapRegistererWithPrefix("thanos_bucket_ui", reg), nil, nil)
f.UpdateOnChange(global.Set)

srv.Handle("/", r)

g.Add(func() error {
// TODO(bwplotka): Allow Bucket UI to visualisate the state of the block as well.
return bucketUI.RunRefreshLoop(ctx, baseMetaFetcher.WithFilters(extprom.WrapRegistererWithPrefix("thanos_bucket_ui", reg), metaFetcherFilters, nil), waitInterval, time.Minute)
iterCtx, iterCancel := context.WithTimeout(ctx, waitInterval)
_, _, _ = f.Fetch(iterCtx)
iterCancel()

// For /global state make sure to fetch periodically.
return runutil.Repeat(time.Minute, ctx.Done(), func() error {
return runutil.RetryWithLog(logger, time.Minute, ctx.Done(), func() error {
iterCtx, iterCancel := context.WithTimeout(ctx, waitInterval)
defer iterCancel()

_, _, err := f.Fetch(iterCtx)
if err != nil {
return err
}
return nil
})
})
}, func(error) {
cancel()
})
Expand Down
9 changes: 2 additions & 7 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,14 +351,9 @@ func runQuery(
router = router.WithPrefix(webRoutePrefix)
}

flagsMap := map[string]string{
// TODO(bplotka in PR #513 review): pass all flags, not only the flags needed by prefix rewriting.
"web.external-prefix": webExternalPrefix,
"web.prefix-header": webPrefixHeaderName,
}

ins := extpromhttp.NewInstrumentationMiddleware(reg)
ui.NewQueryUI(logger, reg, stores, flagsMap).Register(router, ins)
// TODO(bplotka in PR #513 review): pass all flags, not only the flags needed by prefix rewriting.
ui.NewQueryUI(logger, reg, stores, webExternalPrefix, webPrefixHeaderName).Register(router, ins)

api := v1.NewAPI(logger, reg, engine, queryableCreator, enableAutodownsampling, enablePartialResponse, replicaLabels, instantDefaultMaxSourceResolution)

Expand Down
9 changes: 2 additions & 7 deletions cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,15 +572,10 @@ func runRule(
}
})

flagsMap := map[string]string{
// TODO(bplotka in PR #513 review): pass all flags, not only the flags needed by prefix rewriting.
"web.external-prefix": webExternalPrefix,
"web.prefix-header": webPrefixHeaderName,
}

ins := extpromhttp.NewInstrumentationMiddleware(reg)

ui.NewRuleUI(logger, reg, ruleMgr, alertQueryURL.String(), flagsMap).Register(router, ins)
// TODO(bplotka in PR #513 review): pass all flags, not only the flags needed by prefix rewriting.
ui.NewRuleUI(logger, reg, ruleMgr, alertQueryURL.String(), webExternalPrefix, webPrefixHeaderName).Register(router, ins)

api := v1.NewAPI(logger, reg, ruleMgr)
api.Register(router.WithPrefix(path.Join(webRoutePrefix, "/api/v1")), tracer, logger, ins)
Expand Down
31 changes: 21 additions & 10 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package main

import (
"context"
"path"
"time"

"github.com/go-kit/kit/log"
Expand All @@ -13,11 +14,13 @@ import (
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/route"
"github.com/prometheus/prometheus/pkg/relabel"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/extflag"
"github.com/thanos-io/thanos/pkg/extprom"
extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http"
"github.com/thanos-io/thanos/pkg/model"
"github.com/thanos-io/thanos/pkg/objstore/client"
"github.com/thanos-io/thanos/pkg/prober"
Expand All @@ -27,6 +30,7 @@ import (
"github.com/thanos-io/thanos/pkg/store"
storecache "github.com/thanos-io/thanos/pkg/store/cache"
"github.com/thanos-io/thanos/pkg/tls"
"github.com/thanos-io/thanos/pkg/ui"
"gopkg.in/alecthomas/kingpin.v2"
yaml "gopkg.in/yaml.v2"
)
Expand Down Expand Up @@ -95,6 +99,9 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) {
"Default is 24h, half of the default value for --delete-delay on compactor.").
Default("24h"))

webExternalPrefix := cmd.Flag("web.external-prefix", "Static prefix for all HTML links and redirect URLs in the bucket web UI interface. Actual endpoints are still served on / or the web.route-prefix. This allows thanos bucket web UI to be served behind a reverse proxy that strips a URL sub-path.").Default("").String()
webPrefixHeaderName := cmd.Flag("web.prefix-header", "Name of HTTP request header used for dynamic prefixing of UI links and redirects. This option is ignored if web.external-prefix argument is set. Security risk: enable this option only if a reverse proxy in front of thanos is resetting the header. The --web.prefix-header=X-Forwarded-Prefix option can be useful, for example, if Thanos UI is served via Traefik reverse proxy with PathPrefixStrip option enabled, which sends the stripped prefix value in X-Forwarded-Prefix header. This allows thanos UI to be served on a sub-path.").Default("").String()

m[component.Store.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, debugLogging bool) error {
if minTime.PrometheusTimestamp() > maxTime.PrometheusTimestamp() {
return errors.Errorf("invalid argument: --min-time '%s' can't be greater than --max-time '%s'",
Expand Down Expand Up @@ -133,6 +140,8 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) {
*enablePostingsCompression,
time.Duration(*consistencyDelay),
time.Duration(*ignoreDeletionMarksDelay),
*webExternalPrefix,
*webPrefixHeaderName,
)
}
}
Expand All @@ -148,26 +157,20 @@ func runStore(
dataDir string,
grpcBindAddr string,
grpcGracePeriod time.Duration,
grpcCert string,
grpcKey string,
grpcClientCA string,
httpBindAddr string,
grpcCert, grpcKey, grpcClientCA, httpBindAddr string,
httpGracePeriod time.Duration,
indexCacheSizeBytes uint64,
chunkPoolSizeBytes uint64,
maxSampleCount uint64,
indexCacheSizeBytes, chunkPoolSizeBytes, maxSampleCount uint64,
maxConcurrency int,
component component.Component,
verbose bool,
syncInterval time.Duration,
blockSyncConcurrency int,
filterConf *store.FilterConfig,
selectorRelabelConf *extflag.PathOrContent,
advertiseCompatibilityLabel bool,
disableIndexHeader bool,
enablePostingsCompression bool,
advertiseCompatibilityLabel, disableIndexHeader, enablePostingsCompression bool,
consistencyDelay time.Duration,
ignoreDeletionMarksDelay time.Duration,
externalPrefix, prefixHeader string,
) error {
grpcProbe := prober.NewGRPC()
httpProbe := prober.NewHTTP()
Expand Down Expand Up @@ -328,6 +331,14 @@ func runStore(
s.Shutdown(err)
})
}
// Add bucket UI for loaded blocks.
{
r := route.New()
compactorView := ui.NewBucketUI(logger, "", path.Join(externalPrefix, "/loaded"), prefixHeader)
compactorView.Register(r, extpromhttp.NewInstrumentationMiddleware(reg))
metaFetcher.UpdateOnChange(compactorView.Set)
srv.Handle("/", r)
}

level.Info(logger).Log("msg", "starting store node")
return nil
Expand Down
30 changes: 23 additions & 7 deletions pkg/block/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func newFetcherMetrics(reg prometheus.Registerer) *fetcherMetrics {
prometheus.GaugeOpts{
Subsystem: fetcherSubSys,
Name: "modified",
Help: "Number of blocks that their metadata modified",
Help: "Number of blocks whose metadata changed",
},
[]string{"modified"},
[]string{replicaRemovedMeta},
Expand All @@ -126,6 +126,7 @@ func newFetcherMetrics(reg prometheus.Registerer) *fetcherMetrics {

type MetadataFetcher interface {
Fetch(ctx context.Context) (metas map[ulid.ULID]*metadata.Meta, partial map[ulid.ULID]error, err error)
UpdateOnChange(func([]metadata.Meta, error))
}

type MetadataFilter interface {
Expand Down Expand Up @@ -184,11 +185,11 @@ func NewMetaFetcher(logger log.Logger, concurrency int, bkt objstore.BucketReade
if err != nil {
return nil, err
}
return b.WithFilters(reg, filters, modifiers), nil
return b.NewMetaFetcher(reg, filters, modifiers), nil
}

// WithFilters transforms BaseFetcher into actually usable MetadataFetcher.
func (f *BaseFetcher) WithFilters(reg prometheus.Registerer, filters []MetadataFilter, modifiers []MetadataModifier) *MetaFetcher {
// NewMetaFetcher transforms BaseFetcher into actually usable *MetaFetcher.
func (f *BaseFetcher) NewMetaFetcher(reg prometheus.Registerer, filters []MetadataFilter, modifiers []MetadataModifier) *MetaFetcher {
return &MetaFetcher{metrics: newFetcherMetrics(reg), wrapped: f, filters: filters, modifiers: modifiers}
}

Expand Down Expand Up @@ -457,14 +458,29 @@ type MetaFetcher struct {

filters []MetadataFilter
modifiers []MetadataModifier

listener func([]metadata.Meta, error)
}

// Fetch returns all block metas as well as partial blocks (blocks without or with corrupted meta file) from the bucket.
// It's caller responsibility to not change the returned metadata files. Maps can be modified.
//
// Returned error indicates a failure in fetching metadata. Returned meta can be assumed as correct, with some blocks missing.
func (f *MetaFetcher) Fetch(ctx context.Context) (metas map[ulid.ULID]*metadata.Meta, partial map[ulid.ULID]error, err error) {
return f.wrapped.fetch(ctx, f.metrics, f.filters, f.modifiers)
metas, partial, err = f.wrapped.fetch(ctx, f.metrics, f.filters, f.modifiers)
if f.listener != nil {
blocks := make([]metadata.Meta, 0, len(metas))
for _, meta := range metas {
blocks = append(blocks, *meta)
}
f.listener(blocks, err)
}
return metas, partial, err
}

// UpdateOnChange allows to add listener that will be update on every change.
func (f *MetaFetcher) UpdateOnChange(listener func([]metadata.Meta, error)) {
f.listener = listener
}

var _ MetadataFilter = &TimePartitionMetaFilter{}
Expand Down Expand Up @@ -558,7 +574,7 @@ func (f *DeduplicateFilter) Filter(_ context.Context, metas map[ulid.ULID]*metad
BlockMeta: tsdb.BlockMeta{
ULID: ulid.MustNew(uint64(0), nil),
},
}), metasByResolution[res], metas, res, synced)
}), metasByResolution[res], metas, synced)
}(res)
}

Expand All @@ -567,7 +583,7 @@ func (f *DeduplicateFilter) Filter(_ context.Context, metas map[ulid.ULID]*metad
return nil
}

func (f *DeduplicateFilter) filterForResolution(root *Node, metaSlice []*metadata.Meta, metas map[ulid.ULID]*metadata.Meta, res int64, synced *extprom.TxGaugeVec) {
func (f *DeduplicateFilter) filterForResolution(root *Node, metaSlice []*metadata.Meta, metas map[ulid.ULID]*metadata.Meta, synced *extprom.TxGaugeVec) {
sort.Slice(metaSlice, func(i, j int) bool {
ilen := len(metaSlice[i].Compaction.Sources)
jlen := len(metaSlice[j].Compaction.Sources)
Expand Down
2 changes: 1 addition & 1 deletion pkg/block/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func TestMetaFetcher_Fetch(t *testing.T) {
baseFetcher, err := NewBaseFetcher(log.NewNopLogger(), 20, bkt, dir, r)
testutil.Ok(t, err)

fetcher := baseFetcher.WithFilters(r, []MetadataFilter{
fetcher := baseFetcher.NewMetaFetcher(r, []MetadataFilter{
&ulidFilter{ulidToDelete: &ulidToDelete},
}, nil)

Expand Down
Loading

0 comments on commit 8f492a9

Please sign in to comment.