Skip to content

Commit

Permalink
store & compact: For components that operates on blocks - expose the …
Browse files Browse the repository at this point in the history
…UI on /loaded-blocks

Signed-off-by: Bartlomiej Plotka <[email protected]>
  • Loading branch information
bwplotka committed Apr 1, 2020
1 parent 0aa9954 commit 0745b47
Show file tree
Hide file tree
Showing 10 changed files with 152 additions and 132 deletions.
25 changes: 16 additions & 9 deletions cmd/thanos/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,15 +340,10 @@ func registerBucketWeb(m map[string]setupFunc, root *kingpin.CmdClause, name str
httpserver.WithGracePeriod(time.Duration(*httpGracePeriod)),
)

flagsMap := map[string]string{
"web.external-prefix": *webExternalPrefix,
"web.prefix-header": *webPrefixHeaderName,
}

router := route.New()

bucketUI := ui.NewBucketUI(logger, *label, flagsMap)
bucketUI.Register(router.WithPrefix(*webExternalPrefix), extpromhttp.NewInstrumentationMiddleware(reg))
bucketUI := ui.NewBucketUI(logger, *label, *webExternalPrefix, *webPrefixHeaderName)
bucketUI.Register(router, *webExternalPrefix, extpromhttp.NewInstrumentationMiddleware(reg))
srv.Handle("/", router)

if *interval < 5*time.Minute {
Expand All @@ -373,17 +368,29 @@ func registerBucketWeb(m map[string]setupFunc, root *kingpin.CmdClause, name str
return errors.Wrap(err, "bucket client")
}

// TODO(bwplotka): Allow Bucket UI to visualisate the state of block as well.
// TODO(bwplotka): Allow Bucket UI to visualize the state of block as well.
fetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), nil, nil)
if err != nil {
return err
}
fetcher.UpdateOnChange(bucketUI.Set)

ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
statusProber.Ready()
defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client")
return bucketUI.RunRefreshLoop(ctx, fetcher, *interval, *timeout)
return runutil.Repeat(*interval, ctx.Done(), func() error {
return runutil.RetryWithLog(logger, time.Minute, ctx.Done(), func() error {
iterCtx, iterCancel := context.WithTimeout(ctx, *timeout)
defer iterCancel()

_, _, err := fetcher.Fetch(iterCtx)
if err != nil {
return err
}
return nil
})
})
}, func(error) {
cancel()
})
Expand Down
56 changes: 34 additions & 22 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,6 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application) {

webExternalPrefix := cmd.Flag("web.external-prefix", "Static prefix for all HTML links and redirect URLs in the bucket web UI interface. Actual endpoints are still served on / or the web.route-prefix. This allows thanos bucket web UI to be served behind a reverse proxy that strips a URL sub-path.").Default("").String()
webPrefixHeaderName := cmd.Flag("web.prefix-header", "Name of HTTP request header used for dynamic prefixing of UI links and redirects. This option is ignored if web.external-prefix argument is set. Security risk: enable this option only if a reverse proxy in front of thanos is resetting the header. The --web.prefix-header=X-Forwarded-Prefix option can be useful, for example, if Thanos UI is served via Traefik reverse proxy with PathPrefixStrip option enabled, which sends the stripped prefix value in X-Forwarded-Prefix header. This allows thanos UI to be served on a sub-path.").Default("").String()
flagsMap := map[string]string{
"web.external-prefix": *webExternalPrefix,
"web.prefix-header": *webPrefixHeaderName,
}

label := cmd.Flag("bucket-web-label", "Prometheus label to use as timeline title in the bucket web UI").String()

m[component.Compact.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, _ bool) error {
Expand Down Expand Up @@ -179,7 +174,8 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application) {
selectorRelabelConf,
*waitInterval,
*label,
flagsMap,
*webExternalPrefix,
*webPrefixHeaderName,
)
}
}
Expand All @@ -194,21 +190,17 @@ func runCompact(
objStoreConfig *extflag.PathOrContent,
consistencyDelay time.Duration,
deleteDelay time.Duration,
haltOnError bool,
acceptMalformedIndex bool,
wait bool,
generateMissingIndexCacheFiles bool,
haltOnError, acceptMalformedIndex, wait, generateMissingIndexCacheFiles bool,
retentionByResolution map[compact.ResolutionLevel]time.Duration,
component component.Component,
disableDownsampling bool,
maxCompactionLevel int,
blockSyncConcurrency int,
maxCompactionLevel, blockSyncConcurrency int,
concurrency int,
dedupReplicaLabels []string,
selectorRelabelConf *extflag.PathOrContent,
waitInterval time.Duration,
label string,
flagsMap map[string]string,
externalPrefix, prefixHeader string,
) error {
halted := promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "thanos_compactor_halted",
Expand Down Expand Up @@ -306,13 +298,12 @@ func runCompact(
if err != nil {
return errors.Wrap(err, "create meta fetcher")
}
metaFetcherFilters := []block.MetadataFilter{
compactFetcher := baseMetaFetcher.NewMetaFetcher(extprom.WrapRegistererWithPrefix("thanos_", reg), []block.MetadataFilter{
block.NewLabelShardedMetaFilter(relabelConfig),
block.NewConsistencyDelayMetaFilter(logger, consistencyDelay, extprom.WrapRegistererWithPrefix("thanos_", reg)),
ignoreDeletionMarkFilter,
duplicateBlocksFilter,
}
compactFetcher := baseMetaFetcher.WithFilters(extprom.WrapRegistererWithPrefix("thanos_", reg), metaFetcherFilters, []block.MetadataModifier{block.NewReplicaLabelRemover(logger, dedupReplicaLabels)})
}, []block.MetadataModifier{block.NewReplicaLabelRemover(logger, dedupReplicaLabels)})
enableVerticalCompaction := false
if len(dedupReplicaLabels) > 0 {
enableVerticalCompaction = true
Expand Down Expand Up @@ -457,14 +448,35 @@ func runCompact(
})

if wait {
router := route.New()
bucketUI := ui.NewBucketUI(logger, label, flagsMap)
bucketUI.Register(router, extpromhttp.NewInstrumentationMiddleware(reg))
srv.Handle("/", router)
r := route.New()
compactorView := ui.NewBucketUI(logger, label, externalPrefix, prefixHeader)
compactorView.Register(r, "loaded-blocks", extpromhttp.NewInstrumentationMiddleware(reg))
compactFetcher.UpdateOnChange(compactorView.Set)

global := ui.NewBucketUI(logger, label, externalPrefix, prefixHeader)
global.Register(r, "global", extpromhttp.NewInstrumentationMiddleware(reg))

// Separate fetcher for global view.
// TODO(bwplotka): Allow Bucket UI to visualize the state of the block as well.
f := baseMetaFetcher.NewMetaFetcher(extprom.WrapRegistererWithPrefix("thanos_bucket_ui", reg), nil, nil)
f.UpdateOnChange(compactorView.Set)

srv.Handle("/", r)

g.Add(func() error {
// TODO(bwplotka): Allow Bucket UI to visualisate the state of the block as well.
return bucketUI.RunRefreshLoop(ctx, baseMetaFetcher.WithFilters(extprom.WrapRegistererWithPrefix("thanos_bucket_ui", reg), metaFetcherFilters, nil), waitInterval, time.Minute)
// For /global state make sure to fetch periodically.
return runutil.Repeat(time.Minute, ctx.Done(), func() error {
return runutil.RetryWithLog(logger, time.Minute, ctx.Done(), func() error {
iterCtx, iterCancel := context.WithTimeout(ctx, waitInterval)
defer iterCancel()

_, _, err := f.Fetch(iterCtx)
if err != nil {
return err
}
return nil
})
})
}, func(error) {
cancel()
})
Expand Down
9 changes: 2 additions & 7 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,14 +351,9 @@ func runQuery(
router = router.WithPrefix(webRoutePrefix)
}

flagsMap := map[string]string{
// TODO(bplotka in PR #513 review): pass all flags, not only the flags needed by prefix rewriting.
"web.external-prefix": webExternalPrefix,
"web.prefix-header": webPrefixHeaderName,
}

ins := extpromhttp.NewInstrumentationMiddleware(reg)
ui.NewQueryUI(logger, reg, stores, flagsMap).Register(router, ins)
// TODO(bplotka in PR #513 review): pass all flags, not only the flags needed by prefix rewriting.
ui.NewQueryUI(logger, reg, stores, *webExternalPrefix, *webPrefixHeaderName).Register(router, ins)

api := v1.NewAPI(logger, reg, engine, queryableCreator, enableAutodownsampling, enablePartialResponse, replicaLabels, instantDefaultMaxSourceResolution)

Expand Down
30 changes: 20 additions & 10 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@ import (
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/route"
"github.com/prometheus/prometheus/pkg/relabel"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/extflag"
"github.com/thanos-io/thanos/pkg/extprom"
extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http"
"github.com/thanos-io/thanos/pkg/model"
"github.com/thanos-io/thanos/pkg/objstore/client"
"github.com/thanos-io/thanos/pkg/prober"
Expand All @@ -27,6 +29,7 @@ import (
"github.com/thanos-io/thanos/pkg/store"
storecache "github.com/thanos-io/thanos/pkg/store/cache"
"github.com/thanos-io/thanos/pkg/tls"
"github.com/thanos-io/thanos/pkg/ui"
"gopkg.in/alecthomas/kingpin.v2"
yaml "gopkg.in/yaml.v2"
)
Expand Down Expand Up @@ -95,6 +98,9 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) {
"Default is 24h, half of the default value for --delete-delay on compactor.").
Default("24h"))

webExternalPrefix := cmd.Flag("web.external-prefix", "Static prefix for all HTML links and redirect URLs in the bucket web UI interface. Actual endpoints are still served on / or the web.route-prefix. This allows thanos bucket web UI to be served behind a reverse proxy that strips a URL sub-path.").Default("").String()
webPrefixHeaderName := cmd.Flag("web.prefix-header", "Name of HTTP request header used for dynamic prefixing of UI links and redirects. This option is ignored if web.external-prefix argument is set. Security risk: enable this option only if a reverse proxy in front of thanos is resetting the header. The --web.prefix-header=X-Forwarded-Prefix option can be useful, for example, if Thanos UI is served via Traefik reverse proxy with PathPrefixStrip option enabled, which sends the stripped prefix value in X-Forwarded-Prefix header. This allows thanos UI to be served on a sub-path.").Default("").String()

m[component.Store.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, debugLogging bool) error {
if minTime.PrometheusTimestamp() > maxTime.PrometheusTimestamp() {
return errors.Errorf("invalid argument: --min-time '%s' can't be greater than --max-time '%s'",
Expand Down Expand Up @@ -133,6 +139,8 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) {
*enablePostingsCompression,
time.Duration(*consistencyDelay),
time.Duration(*ignoreDeletionMarksDelay),
*webExternalPrefix,
*webPrefixHeaderName,
)
}
}
Expand All @@ -148,26 +156,20 @@ func runStore(
dataDir string,
grpcBindAddr string,
grpcGracePeriod time.Duration,
grpcCert string,
grpcKey string,
grpcClientCA string,
httpBindAddr string,
grpcCert, grpcKey, grpcClientCA, httpBindAddr string,
httpGracePeriod time.Duration,
indexCacheSizeBytes uint64,
chunkPoolSizeBytes uint64,
maxSampleCount uint64,
indexCacheSizeBytes, chunkPoolSizeBytes, maxSampleCount uint64,
maxConcurrency int,
component component.Component,
verbose bool,
syncInterval time.Duration,
blockSyncConcurrency int,
filterConf *store.FilterConfig,
selectorRelabelConf *extflag.PathOrContent,
advertiseCompatibilityLabel bool,
disableIndexHeader bool,
enablePostingsCompression bool,
advertiseCompatibilityLabel, disableIndexHeader, enablePostingsCompression bool,
consistencyDelay time.Duration,
ignoreDeletionMarksDelay time.Duration,
externalPrefix, prefixHeader string,
) error {
grpcProbe := prober.NewGRPC()
httpProbe := prober.NewHTTP()
Expand Down Expand Up @@ -328,6 +330,14 @@ func runStore(
s.Shutdown(err)
})
}
// Add bucket UI for loaded blocks.
{
r := route.New()
compactorView := ui.NewBucketUI(logger, "", externalPrefix, prefixHeader)
compactorView.Register(r, "loaded-blocks", extpromhttp.NewInstrumentationMiddleware(reg))
metaFetcher.UpdateOnChange(compactorView.Set)
srv.Handle("/", r)
}

level.Info(logger).Log("msg", "starting store node")
return nil
Expand Down
30 changes: 23 additions & 7 deletions pkg/block/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func newFetcherMetrics(reg prometheus.Registerer) *fetcherMetrics {
prometheus.GaugeOpts{
Subsystem: fetcherSubSys,
Name: "modified",
Help: "Number of blocks that their metadata modified",
Help: "Number of blocks whose metadata changed",
},
[]string{"modified"},
[]string{replicaRemovedMeta},
Expand All @@ -126,6 +126,7 @@ func newFetcherMetrics(reg prometheus.Registerer) *fetcherMetrics {

type MetadataFetcher interface {
Fetch(ctx context.Context) (metas map[ulid.ULID]*metadata.Meta, partial map[ulid.ULID]error, err error)
UpdateOnChange(func([]metadata.Meta, error))
}

type MetadataFilter interface {
Expand Down Expand Up @@ -184,11 +185,11 @@ func NewMetaFetcher(logger log.Logger, concurrency int, bkt objstore.BucketReade
if err != nil {
return nil, err
}
return b.WithFilters(reg, filters, modifiers), nil
return b.NewMetaFetcher(reg, filters, modifiers), nil
}

// WithFilters transforms BaseFetcher into actually usable MetadataFetcher.
func (f *BaseFetcher) WithFilters(reg prometheus.Registerer, filters []MetadataFilter, modifiers []MetadataModifier) *MetaFetcher {
// NewMetaFetcher transforms BaseFetcher into actually usable *MetaFetcher.
func (f *BaseFetcher) NewMetaFetcher(reg prometheus.Registerer, filters []MetadataFilter, modifiers []MetadataModifier) *MetaFetcher {
return &MetaFetcher{metrics: newFetcherMetrics(reg), wrapped: f, filters: filters, modifiers: modifiers}
}

Expand Down Expand Up @@ -457,14 +458,29 @@ type MetaFetcher struct {

filters []MetadataFilter
modifiers []MetadataModifier

listener func([]metadata.Meta, error)
}

// Fetch returns all block metas as well as partial blocks (blocks without or with corrupted meta file) from the bucket.
// It's caller responsibility to not change the returned metadata files. Maps can be modified.
//
// Returned error indicates a failure in fetching metadata. Returned meta can be assumed as correct, with some blocks missing.
func (f *MetaFetcher) Fetch(ctx context.Context) (metas map[ulid.ULID]*metadata.Meta, partial map[ulid.ULID]error, err error) {
return f.wrapped.fetch(ctx, f.metrics, f.filters, f.modifiers)
metas, partial, err = f.wrapped.fetch(ctx, f.metrics, f.filters, f.modifiers)
if f.listener != nil {
blocks := make([]metadata.Meta, 0, len(metas))
for _, meta := range metas {
blocks = append(blocks, *meta)
}
f.listener(blocks, err)
}
return metas, partial, err
}

// UpdateOnChange allows to add listener that will be update on every change.
func (f *MetaFetcher) UpdateOnChange(listener func([]metadata.Meta, error)) {
f.listener = listener
}

var _ MetadataFilter = &TimePartitionMetaFilter{}
Expand Down Expand Up @@ -558,7 +574,7 @@ func (f *DeduplicateFilter) Filter(_ context.Context, metas map[ulid.ULID]*metad
BlockMeta: tsdb.BlockMeta{
ULID: ulid.MustNew(uint64(0), nil),
},
}), metasByResolution[res], metas, res, synced)
}), metasByResolution[res], metas, synced)
}(res)
}

Expand All @@ -567,7 +583,7 @@ func (f *DeduplicateFilter) Filter(_ context.Context, metas map[ulid.ULID]*metad
return nil
}

func (f *DeduplicateFilter) filterForResolution(root *Node, metaSlice []*metadata.Meta, metas map[ulid.ULID]*metadata.Meta, res int64, synced *extprom.TxGaugeVec) {
func (f *DeduplicateFilter) filterForResolution(root *Node, metaSlice []*metadata.Meta, metas map[ulid.ULID]*metadata.Meta, synced *extprom.TxGaugeVec) {
sort.Slice(metaSlice, func(i, j int) bool {
ilen := len(metaSlice[i].Compaction.Sources)
jlen := len(metaSlice[j].Compaction.Sources)
Expand Down
2 changes: 1 addition & 1 deletion pkg/block/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func TestMetaFetcher_Fetch(t *testing.T) {
baseFetcher, err := NewBaseFetcher(log.NewNopLogger(), 20, bkt, dir, r)
testutil.Ok(t, err)

fetcher := baseFetcher.WithFilters(r, []MetadataFilter{
fetcher := baseFetcher.NewMetaFetcher(r, []MetadataFilter{
&ulidFilter{ulidToDelete: &ulidToDelete},
}, nil)

Expand Down
Loading

0 comments on commit 0745b47

Please sign in to comment.