diff --git a/CHANGELOG.md b/CHANGELOG.md index 5ef910c64f..418ca3b3e3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,8 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re ### Changed +- [#7334](https://github.com/thanos-io/thanos/pull/7334) Compactor: do not vertically compact downsampled blocks. Such cases are now marked with `no-compact-mark.json`. Fixes panic `panic: unexpected seriesToChunkEncoder lack of iterations`. + ### Removed ## [v0.35.0](https://github.com/thanos-io/thanos/tree/release-0.35) - 02.05.2024 diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index f1437efc64..43fecdebc1 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -369,13 +369,20 @@ func runCompact( conf.blockFilesConcurrency, conf.compactBlocksFetchConcurrency, ) + var planner compact.Planner + tsdbPlanner := compact.NewPlanner(logger, levels, noCompactMarkerFilter) - planner := compact.WithLargeTotalIndexSizeFilter( + largeIndexFilterPlanner := compact.WithLargeTotalIndexSizeFilter( tsdbPlanner, insBkt, int64(conf.maxBlockIndexSize), compactMetrics.blocksMarked.WithLabelValues(metadata.NoCompactMarkFilename, metadata.IndexSizeExceedingNoCompactReason), ) + if enableVerticalCompaction { + planner = compact.WithVerticalCompactionDownsampleFilter(largeIndexFilterPlanner, insBkt, compactMetrics.blocksMarked.WithLabelValues(metadata.NoCompactMarkFilename, metadata.DownsampleVerticalCompactionNoCompactReason)) + } else { + planner = largeIndexFilterPlanner + } blocksCleaner := compact.NewBlocksCleaner(logger, insBkt, ignoreDeletionMarkFilter, deleteDelay, compactMetrics.blocksCleaned, compactMetrics.blockCleanupFailures) compactor, err := compact.NewBucketCompactor( logger, diff --git a/pkg/block/metadata/markers.go b/pkg/block/metadata/markers.go index 83273eb343..0a351a5fab 100644 --- a/pkg/block/metadata/markers.go +++ b/pkg/block/metadata/markers.go @@ -79,6 +79,8 @@ const ( IndexSizeExceedingNoCompactReason = "index-size-exceeding" // OutOfOrderChunksNoCompactReason is a reason of to no compact block with index contains out of order chunk so that the compaction is not blocked. OutOfOrderChunksNoCompactReason = "block-index-out-of-order-chunk" + // DownsampleVerticalCompactionNoCompactReason is a reason to not compact overlapping downsampled blocks as it does not make sense e.g. how to vertically compact the average. + DownsampleVerticalCompactionNoCompactReason = "downsample-vertical-compaction" ) // NoCompactMark marker stores reason of block being excluded from compaction if needed. diff --git a/pkg/compact/planner.go b/pkg/compact/planner.go index 783191cacf..6d7d03eea2 100644 --- a/pkg/compact/planner.go +++ b/pkg/compact/planner.go @@ -234,6 +234,67 @@ type largeTotalIndexSizeFilter struct { var _ Planner = &largeTotalIndexSizeFilter{} +type verticalCompactionDownsampleFilter struct { + bkt objstore.Bucket + markedForNoCompact prometheus.Counter + + *largeTotalIndexSizeFilter +} + +var _ Planner = &verticalCompactionDownsampleFilter{} + +func WithVerticalCompactionDownsampleFilter(with *largeTotalIndexSizeFilter, bkt objstore.Bucket, markedForNoCompact prometheus.Counter) Planner { + return &verticalCompactionDownsampleFilter{ + markedForNoCompact: markedForNoCompact, + bkt: bkt, + largeTotalIndexSizeFilter: with, + } +} + +func (v *verticalCompactionDownsampleFilter) Plan(ctx context.Context, metasByMinTime []*metadata.Meta, _ chan error, _ any) ([]*metadata.Meta, error) { + noCompactMarked := make(map[ulid.ULID]*metadata.NoCompactMark, 0) +PlanLoop: + for { + plan, err := v.plan(ctx, noCompactMarked, metasByMinTime) + if err != nil { + return nil, err + } + + if len(selectOverlappingMetas(plan)) == 0 { + return plan, nil + } + + // If we have downsampled blocks, we need to mark them as no compact because it's impossible to do that with vertical compaction. + // Technically, the resolution is part of the group key but do not attach ourselves to that level of detail. + var marked = false + for _, m := range plan { + if m.Thanos.Downsample.Resolution == 0 { + continue + } + if err := block.MarkForNoCompact( + ctx, + v.logger, + v.bkt, + m.ULID, + metadata.DownsampleVerticalCompactionNoCompactReason, + "verticalCompactionDownsampleFilter: Downsampled block, see https://github.com/thanos-io/thanos/issues/6775", + v.markedForNoCompact, + ); err != nil { + return nil, errors.Wrapf(err, "mark %v for no compaction", m.ULID.String()) + } + noCompactMarked[m.ULID] = &metadata.NoCompactMark{ID: m.ULID, Version: metadata.NoCompactMarkVersion1} + marked = true + } + + if marked { + continue PlanLoop + } + + return plan, nil + + } +} + // WithLargeTotalIndexSizeFilter wraps Planner with largeTotalIndexSizeFilter that checks the given plans and estimates total index size. // When found, it marks block for no compaction by placing no-compact-mark.json and updating cache. // NOTE: The estimation is very rough as it assumes extreme cases of indexes sharing no bytes, thus summing all source index sizes. @@ -243,16 +304,19 @@ func WithLargeTotalIndexSizeFilter(with *tsdbBasedPlanner, bkt objstore.Bucket, return &largeTotalIndexSizeFilter{tsdbBasedPlanner: with, bkt: bkt, totalMaxIndexSizeBytes: totalMaxIndexSizeBytes, markedForNoCompact: markedForNoCompact} } -func (t *largeTotalIndexSizeFilter) Plan(ctx context.Context, metasByMinTime []*metadata.Meta, _ chan error, _ any) ([]*metadata.Meta, error) { +func (t *largeTotalIndexSizeFilter) plan(ctx context.Context, extraNoCompactMarked map[ulid.ULID]*metadata.NoCompactMark, metasByMinTime []*metadata.Meta) ([]*metadata.Meta, error) { noCompactMarked := t.noCompBlocksFunc() - copiedNoCompactMarked := make(map[ulid.ULID]*metadata.NoCompactMark, len(noCompactMarked)) + copiedNoCompactMarked := make(map[ulid.ULID]*metadata.NoCompactMark, len(noCompactMarked)+len(extraNoCompactMarked)) for k, v := range noCompactMarked { copiedNoCompactMarked[k] = v } + for k, v := range extraNoCompactMarked { + copiedNoCompactMarked[k] = v + } PlanLoop: for { - plan, err := t.plan(copiedNoCompactMarked, metasByMinTime) + plan, err := t.tsdbBasedPlanner.plan(copiedNoCompactMarked, metasByMinTime) if err != nil { return nil, err } @@ -303,3 +367,7 @@ PlanLoop: return plan, nil } } + +func (t *largeTotalIndexSizeFilter) Plan(ctx context.Context, metasByMinTime []*metadata.Meta, _ chan error, _ any) ([]*metadata.Meta, error) { + return t.plan(ctx, nil, metasByMinTime) +} diff --git a/test/e2e/compact_test.go b/test/e2e/compact_test.go index 94a3f344fe..eb0ceb0054 100644 --- a/test/e2e/compact_test.go +++ b/test/e2e/compact_test.go @@ -998,6 +998,6 @@ func TestCompactorIssue6775(t *testing.T) { Type: client.S3, Config: e2ethanos.NewS3Config(bucket, m.InternalEndpoint("http"), m.Dir()), }, nil, "--compact.enable-vertical-compaction") - testutil.NotOk(t, e2e.StartAndWaitReady(c)) - testutil.NotOk(t, c.WaitSumMetricsWithOptions(e2emon.Greater(0), []string{"thanos_compact_iterations_total"}, e2emon.WaitMissingMetrics())) + testutil.Ok(t, e2e.StartAndWaitReady(c)) + testutil.Ok(t, c.WaitSumMetricsWithOptions(e2emon.Greater(0), []string{"thanos_compact_iterations_total"}, e2emon.WaitMissingMetrics())) }