Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Metrics for compaction and downsampling process #4801

Merged
merged 75 commits into from
Nov 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
75 commits
Select commit Hold shift + click to select a range
6fb9752
fixed template; export metadata
metonymic-smokey Oct 16, 2021
f403fcc
added some error handling
metonymic-smokey Oct 16, 2021
02d7037
remove planned blocks from original
metonymic-smokey Oct 16, 2021
ea83250
moved to pkg/compact
metonymic-smokey Oct 17, 2021
ff2e36a
draft: export metrics to Prom
metonymic-smokey Oct 18, 2021
5961552
update metrics after entire plan sim; change func name
metonymic-smokey Oct 18, 2021
a2b2b97
added method to delete metas from group
metonymic-smokey Oct 18, 2021
8f13acc
preallocate meta slice
metonymic-smokey Oct 18, 2021
f52e056
changed func signature; started adding metrics to Group
metonymic-smokey Oct 18, 2021
ad639c1
added termination condition
metonymic-smokey Oct 18, 2021
845d497
metrics struct added
metonymic-smokey Oct 18, 2021
cc4efe4
draft: updating progress metric
metonymic-smokey Oct 19, 2021
322340b
added feature flag for compact
metonymic-smokey Oct 19, 2021
a4642f6
use tsdb planner as default
metonymic-smokey Oct 20, 2021
3092479
initialised maps with size; changed metrics update map
metonymic-smokey Oct 20, 2021
7f91580
removed extra vars; changed prom init location
metonymic-smokey Oct 20, 2021
5fd74c6
added labels and downsample resolution to metadata
metonymic-smokey Oct 20, 2021
6f0b74a
added termination condition; reverted planner type
metonymic-smokey Oct 21, 2021
ec3e767
Merge branch 'thanos-io:main' into metrics
metonymic-smokey Oct 21, 2021
a3dfae1
draft: downsampling metrics overview
metonymic-smokey Oct 22, 2021
772041a
Merge branch 'metrics' of https://github.com/metonymic-smokey/thanos …
metonymic-smokey Oct 22, 2021
ad4c704
removed extra bkt; processing individual blocks
metonymic-smokey Oct 22, 2021
3c7b8ac
Merge branch 'thanos-io:main' into metrics
metonymic-smokey Oct 22, 2021
e6739d7
changed downsample sim logic; changed plan metric names
metonymic-smokey Oct 22, 2021
b4a662b
re-used interface; refactoring
metonymic-smokey Oct 23, 2021
cdeb0d7
draft: added Prom metrics
metonymic-smokey Oct 23, 2021
7a53935
re-used planner; added flag; fixed interface
metonymic-smokey Oct 23, 2021
3b90846
increment exporters; renamed funcs/structs
metonymic-smokey Oct 24, 2021
da04db8
check if downsampling is disabled
metonymic-smokey Oct 24, 2021
8922b33
iterate through groups instead of meta
metonymic-smokey Oct 24, 2021
81a5066
run both simulations repeatedly
metonymic-smokey Oct 25, 2021
63ab795
made naming more consistent
metonymic-smokey Oct 25, 2021
235528d
added unit test for planning simulation
metonymic-smokey Oct 25, 2021
dc301ab
Merge branch 'thanos-io:main' into metrics
metonymic-smokey Oct 26, 2021
1810ba4
removed extra comments
metonymic-smokey Oct 26, 2021
a355886
fixed flag bug
metonymic-smokey Oct 26, 2021
40fa1fc
added docs for new compact flag
metonymic-smokey Oct 26, 2021
13bec69
lint fix
metonymic-smokey Oct 26, 2021
cec1c58
added sync meta to repeat loop
metonymic-smokey Oct 27, 2021
d8a1ab8
draft: downsampling unit test
metonymic-smokey Oct 28, 2021
8b23ef3
Merge branch 'thanos-io:main' into compact-metrics
metonymic-smokey Oct 28, 2021
fc71280
new group for downsample; re-used context
metonymic-smokey Oct 28, 2021
0460638
Merge branch 'compact-metrics' of https://github.com/metonymic-smokey…
metonymic-smokey Oct 28, 2021
5ea12ef
added test names and comments for downsample unit test
metonymic-smokey Oct 28, 2021
3629986
fixed copy error - changed to deep copy
metonymic-smokey Oct 28, 2021
fa40551
changed metric names
metonymic-smokey Oct 29, 2021
825a721
changed to regular flag; recreate simulator obj every run
metonymic-smokey Oct 29, 2021
6b9b326
added explanatory comments
metonymic-smokey Oct 29, 2021
adba129
removed extra var
metonymic-smokey Oct 29, 2021
87e9667
Merge branch 'thanos-io:main' into compact-metrics
metonymic-smokey Oct 29, 2021
4d7da02
re-register simulators
metonymic-smokey Oct 30, 2021
a0ea1d4
fixed names and comments
metonymic-smokey Oct 30, 2021
5c5d7b6
fix simulator obj registration
metonymic-smokey Oct 30, 2021
d788f99
added some more test cases; table test for plan simulate
metonymic-smokey Nov 2, 2021
daea946
added comments for some public methods
metonymic-smokey Nov 2, 2021
60bbe9e
lint fixes
metonymic-smokey Nov 2, 2021
9c3d11c
fixed names, logs and comments
metonymic-smokey Nov 2, 2021
afd48bf
refactored test
metonymic-smokey Nov 2, 2021
54b412f
Merge branch 'thanos-io:main' into compact-metrics
metonymic-smokey Nov 2, 2021
8f1fff7
added test cases with two groups
metonymic-smokey Nov 3, 2021
ff3ea04
added test case with non consecutive blocks
metonymic-smokey Nov 3, 2021
fa5f03f
fixed some nits
metonymic-smokey Nov 3, 2021
3c5cbce
fixed two_groups_test
metonymic-smokey Nov 3, 2021
35c4ff7
updated docs
metonymic-smokey Nov 3, 2021
639b964
draft: refactor compact progress test
metonymic-smokey Nov 5, 2021
c0caf42
draft: refactor downsample test
metonymic-smokey Nov 5, 2021
a58b63f
lint fixes and code clean up
metonymic-smokey Nov 5, 2021
8ec72e6
draft: refactored compact progress test
metonymic-smokey Nov 5, 2021
e10c845
changed func signature
metonymic-smokey Nov 5, 2021
3d0d375
draft: refactored downsample unit test to use helper func
metonymic-smokey Nov 5, 2021
6999f7f
improved test cases; CircleCI fix
metonymic-smokey Nov 6, 2021
37848f1
use reset instead of unregister; minor nits
metonymic-smokey Nov 6, 2021
e366232
improved downsample test case
metonymic-smokey Nov 6, 2021
50d1be4
E2E test fix
metonymic-smokey Nov 6, 2021
02bb607
added CHANGELOG
metonymic-smokey Nov 6, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#4710](https://github.com/thanos-io/thanos/pull/4710) Store: add metric to capture timestamp of the last loaded block.
- [#4736](https://github.com/thanos-io/thanos/pull/4736) S3: Add capability to use custom AWS STS Endpoint.
- [#4764](https://github.com/thanos-io/thanos/pull/4764) Compactor: add `block-viewer.global.sync-block-timeout` flag to set the timeout of synchronization block metas.
- [#4801](https://github.com/thanos-io/thanos/pull/4801) Compactor: added Prometheus metrics for tracking the progress of compaction and downsampling.

### Fixed

Expand Down
47 changes: 46 additions & 1 deletion cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,8 +353,9 @@ func runCompact(
compactMetrics.blocksMarked.WithLabelValues(metadata.NoCompactMarkFilename, metadata.OutOfOrderChunksNoCompactReason),
metadata.HashFunc(conf.hashFunc),
)
tsdbPlanner := compact.NewPlanner(logger, levels, noCompactMarkerFilter)
planner := compact.WithLargeTotalIndexSizeFilter(
compact.NewPlanner(logger, levels, noCompactMarkerFilter),
tsdbPlanner,
bkt,
int64(conf.maxBlockIndexSize),
compactMetrics.blocksMarked.WithLabelValues(metadata.NoCompactMarkFilename, metadata.IndexSizeExceedingNoCompactReason),
Expand Down Expand Up @@ -458,6 +459,47 @@ func runCompact(
return cleanPartialMarked()
}

if conf.compactionProgressMetrics {
g.Add(func() error {
ps := compact.NewCompactionProgressCalculator(reg, tsdbPlanner)
var ds *compact.DownsampleProgressCalculator
if !conf.disableDownsampling {
ds = compact.NewDownsampleProgressCalculator(reg)
}

return runutil.Repeat(5*time.Minute, ctx.Done(), func() error {

if err := sy.SyncMetas(ctx); err != nil {
return errors.Wrapf(err, "could not sync metas")
}

metas := sy.Metas()
groups, err := grouper.Groups(metas)
if err != nil {
return errors.Wrapf(err, "could not group metadata")
}

if err = ps.ProgressCalculate(ctx, groups); err != nil {
return errors.Wrapf(err, "could not calculate compaction progress")
}

if !conf.disableDownsampling {
yeya24 marked this conversation as resolved.
Show resolved Hide resolved
groups, err = grouper.Groups(metas)
if err != nil {
return errors.Wrapf(err, "could not group metadata into downsample groups")
}
if err := ds.ProgressCalculate(ctx, groups); err != nil {
return errors.Wrapf(err, "could not calculate downsampling progress")
}
}

return nil
})
}, func(err error) {
cancel()
})
}

g.Add(func() error {
defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client")

Expand Down Expand Up @@ -590,9 +632,12 @@ type compactConfig struct {
enableVerticalCompaction bool
dedupFunc string
skipBlockWithOutOfOrderChunks bool
compactionProgressMetrics bool
}

func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) {
cmd.Flag("progress-metrics", "Enables the progress metrics, indicating the progress of compaction and downsampling").Default("true").BoolVar(&cc.compactionProgressMetrics)

cmd.Flag("debug.halt-on-error", "Halt the process if a critical compaction error is detected.").
Hidden().Default("true").BoolVar(&cc.haltOnError)
cmd.Flag("debug.accept-malformed-index",
Expand Down
2 changes: 2 additions & 0 deletions docs/components/compact.md
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,8 @@ Flags:
Path to YAML file that contains object store
configuration. See format details:
https://thanos.io/tip/thanos/storage.md/#configuration
--progress-metrics Enables the progress metrics, indicating the
progress of compaction and downsampling
--retention.resolution-1h=0d
How long to retain samples of resolution 2 (1
hour) in bucket. Setting this to 0d will retain
Expand Down
197 changes: 197 additions & 0 deletions pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,19 @@ func (cg *Group) Key() string {
return cg.key
}

func (cg *Group) deleteFromGroup(target map[ulid.ULID]struct{}) {
cg.mtx.Lock()
defer cg.mtx.Unlock()
var newGroupMeta []*metadata.Meta
for _, meta := range cg.metasByMinTime {
if _, found := target[meta.BlockMeta.ULID]; !found {
newGroupMeta = append(newGroupMeta, meta)
}
}

cg.metasByMinTime = newGroupMeta
}

// AppendMeta the block with the given meta to the group.
func (cg *Group) AppendMeta(meta *metadata.Meta) error {
cg.mtx.Lock()
Expand Down Expand Up @@ -470,6 +483,190 @@ func (cg *Group) Resolution() int64 {
return cg.resolution
}

// CompactProgressMetrics contains Prometheus metrics related to compaction progress.
type CompactProgressMetrics struct {
NumberOfCompactionRuns *prometheus.GaugeVec
NumberOfCompactionBlocks *prometheus.GaugeVec
}

// ProgressCalculator calculates the progress of the compaction process for a given slice of Groups.
type ProgressCalculator interface {
yeya24 marked this conversation as resolved.
Show resolved Hide resolved
ProgressCalculate(ctx context.Context, groups []*Group) error
}

// CompactionProgressCalculator contains a planner and ProgressMetrics, which are updated during the compaction simulation process.
type CompactionProgressCalculator struct {
planner Planner
*CompactProgressMetrics
}

// NewCompactProgressCalculator creates a new CompactionProgressCalculator.
func NewCompactionProgressCalculator(reg prometheus.Registerer, planner *tsdbBasedPlanner) *CompactionProgressCalculator {
return &CompactionProgressCalculator{
planner: planner,
CompactProgressMetrics: &CompactProgressMetrics{
NumberOfCompactionRuns: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
Name: "thanos_compact_todo_compactions",
Help: "number of compactions to be done",
}, []string{"group"}),
NumberOfCompactionBlocks: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
Name: "thanos_compact_todo_compaction_blocks",
Help: "number of blocks planned to be compacted",
}, []string{"group"}),
},
}
}

// ProgressCalculate calculates the number of blocks and compaction runs in the planning process of the given groups.
func (ps *CompactionProgressCalculator) ProgressCalculate(ctx context.Context, groups []*Group) error {
groupCompactions := make(map[string]int, len(groups))
groupBlocks := make(map[string]int, len(groups))

for len(groups) > 0 {
tmpGroups := make([]*Group, 0, len(groups))
for _, g := range groups {
if len(g.IDs()) == 1 {
continue
}
plan, err := ps.planner.Plan(ctx, g.metasByMinTime)
if err != nil {
return errors.Wrapf(err, "could not plan")
}
if len(plan) == 0 {
continue
}
groupCompactions[g.key]++

toRemove := make(map[ulid.ULID]struct{}, len(plan))
metas := make([]*tsdb.BlockMeta, 0, len(plan))
for _, p := range plan {
metas = append(metas, &p.BlockMeta)
toRemove[p.BlockMeta.ULID] = struct{}{}
}
g.deleteFromGroup(toRemove)

groupBlocks[g.key] += len(plan)

if len(g.metasByMinTime) == 0 {
continue
}

newMeta := tsdb.CompactBlockMetas(ulid.MustNew(uint64(time.Now().Unix()), nil), metas...)
if err := g.AppendMeta(&metadata.Meta{BlockMeta: *newMeta, Thanos: metadata.Thanos{Downsample: metadata.ThanosDownsample{Resolution: g.Resolution()}, Labels: g.Labels().Map()}}); err != nil {
return errors.Wrapf(err, "append meta")
}
tmpGroups = append(tmpGroups, g)
}

groups = tmpGroups
}

ps.CompactProgressMetrics.NumberOfCompactionRuns.Reset()
ps.CompactProgressMetrics.NumberOfCompactionBlocks.Reset()

for key, iters := range groupCompactions {
ps.CompactProgressMetrics.NumberOfCompactionRuns.WithLabelValues(key).Add(float64(iters))
ps.CompactProgressMetrics.NumberOfCompactionBlocks.WithLabelValues(key).Add(float64(groupBlocks[key]))
}

return nil
}

// DownsampleProgressMetrics contains Prometheus metrics related to downsampling progress.
type DownsampleProgressMetrics struct {
NumberOfBlocksDownsampled *prometheus.GaugeVec
}

// DownsampleProgressCalculator contains DownsampleMetrics, which are updated during the downsampling simulation process.
type DownsampleProgressCalculator struct {
*DownsampleProgressMetrics
}

// NewDownsampleProgressCalculator creates a new DownsampleProgressCalculator.
func NewDownsampleProgressCalculator(reg prometheus.Registerer) *DownsampleProgressCalculator {
return &DownsampleProgressCalculator{
DownsampleProgressMetrics: &DownsampleProgressMetrics{
NumberOfBlocksDownsampled: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
Name: "thanos_compact_todo_downsample_blocks",
Help: "number of blocks to be downsampled",
}, []string{"group"}),
},
}
}

// ProgressCalculate calculates the number of blocks to be downsampled for the given groups.
func (ds *DownsampleProgressCalculator) ProgressCalculate(ctx context.Context, groups []*Group) error {
sources5m := map[ulid.ULID]struct{}{}
sources1h := map[ulid.ULID]struct{}{}
groupBlocks := make(map[string]int, len(groups))

for _, group := range groups {
for _, m := range group.metasByMinTime {
switch m.Thanos.Downsample.Resolution {
case downsample.ResLevel0:
continue
case downsample.ResLevel1:
for _, id := range m.Compaction.Sources {
sources5m[id] = struct{}{}
}
case downsample.ResLevel2:
for _, id := range m.Compaction.Sources {
sources1h[id] = struct{}{}
}
default:
return errors.Errorf("unexpected downsampling resolution %d", m.Thanos.Downsample.Resolution)
}

}
}

for _, group := range groups {
for _, m := range group.metasByMinTime {
switch m.Thanos.Downsample.Resolution {
case downsample.ResLevel0:
missing := false
for _, id := range m.Compaction.Sources {
if _, ok := sources5m[id]; !ok {
missing = true
break
}
}
if !missing {
continue
}

if m.MaxTime-m.MinTime < downsample.DownsampleRange0 {
continue
}
groupBlocks[group.key]++
case downsample.ResLevel1:
missing := false
for _, id := range m.Compaction.Sources {
if _, ok := sources1h[id]; !ok {
missing = true
break
}
}
if !missing {
continue
}

if m.MaxTime-m.MinTime < downsample.DownsampleRange1 {
continue
}
groupBlocks[group.key]++
}
}
}

ds.DownsampleProgressMetrics.NumberOfBlocksDownsampled.Reset()
for key, blocks := range groupBlocks {
ds.DownsampleProgressMetrics.NumberOfBlocksDownsampled.WithLabelValues(key).Add(float64(blocks))
}

return nil
}

// Planner returns blocks to compact.
type Planner interface {
// Plan returns a list of blocks that should be compacted into single one.
Expand Down
Loading