diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index 7f7fae30847..d0a4dff53bd 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -1031,53 +1031,6 @@ func (cg *Group) areBlocksOverlapping(include *metadata.Meta, exclude ...*metada return nil } -func (cg *Group) removeOverlappingBlocks(ctx context.Context, toCompact []*metadata.Meta, dir string) error { - if len(toCompact) == 0 { - return nil - } - kept := toCompact[0] - for _, m := range toCompact { - if m.MinTime < kept.MinTime && m.MaxTime > kept.MaxTime { - level.Warn(cg.logger).Log("msg", "found overlapping block in plan that are not the first", - "first", kept.String(), "block", m.String()) - kept = m - } else if (m.MinTime < kept.MinTime && kept.MinTime < m.MaxTime) || - (m.MinTime < kept.MaxTime && kept.MaxTime < m.MaxTime) { - err := errors.Errorf("found partially overlapping block: %s vs %s", m.String(), kept.String()) - if cg.enableVerticalCompaction { - level.Error(cg.logger).Log("msg", "best effort to vertical compact", "err", err) - return nil - } else { - return halt(err) - } - } - } - for _, m := range toCompact { - if m.ULID.Compare(kept.ULID) == 0 || m.Thanos.Source == metadata.ReceiveSource { - level.Info(cg.logger).Log("msg", "keep this overlapping block", "block", m.String(), - "level", m.Compaction.Level, "source", m.Thanos.Source, "labels", m.Thanos.GetLabels()) - continue - } - cg.overlappingBlocks.Inc() - if err := DeleteBlockNow(ctx, cg.logger, cg.bkt, m, dir); err != nil { - return retry(err) - } - } - return retry(errors.Errorf("found overlapping blocks in plan. Only kept %s", kept.String())) -} - -func DeleteBlockNow(ctx context.Context, logger log.Logger, bkt objstore.Bucket, m *metadata.Meta, dir string) error { - level.Warn(logger).Log("msg", "delete polluted block immediately", "block", m.String(), - "level", m.Compaction.Level, "source", m.Thanos.Source, "labels", m.Thanos.GetLabels()) - if err := block.Delete(ctx, logger, bkt, m.ULID); err != nil { - return errors.Wrapf(err, "delete overlapping block %s", m.String()) - } - if err := os.RemoveAll(filepath.Join(dir, m.ULID.String())); err != nil { - return errors.Wrapf(err, "remove old block dir %s", m.String()) - } - return nil -} - // RepairIssue347 repairs the https://github.com/prometheus/tsdb/issues/347 issue when having issue347Error. func RepairIssue347(ctx context.Context, logger log.Logger, bkt objstore.Bucket, blocksMarkedForDeletion prometheus.Counter, issue347Err error) error { ie, ok := errors.Cause(issue347Err).(Issue347Error) @@ -1171,12 +1124,9 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp begin := groupCompactionBegin if err := compactionLifecycleCallback.PreCompactionCallback(ctx, cg.logger, cg, toCompact); err != nil { - level.Error(cg.logger).Log("msg", fmt.Sprintf("failed to run pre compaction callback for plan: %v", toCompact), "err", err) - // instead of halting, we attempt to remove overlapped blocks and only keep the longest one. - if newErr := cg.removeOverlappingBlocks(ctx, toCompact, dir); newErr != nil { - return false, ulid.ULID{}, newErr - } + return false, ulid.ULID{}, errors.Wrapf(err, "failed to run pre compaction callback for plan: %s", fmt.Sprintf("%v", toCompact)) } + toCompact = FilterNilBlocks(toCompact) level.Info(cg.logger).Log("msg", "finished running pre compaction callback; downloading blocks", "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds(), "plan", fmt.Sprintf("%v", toCompact)) begin = time.Now() @@ -1407,7 +1357,7 @@ func NewBucketCompactor( planner, comp, DefaultBlockDeletableChecker{}, - DefaultCompactionLifecycleCallback{}, + NewOverlappingCompactionLifecycleCallback(), compactDir, bkt, concurrency, diff --git a/pkg/compact/overlapping.go b/pkg/compact/overlapping.go new file mode 100644 index 00000000000..f4101792c14 --- /dev/null +++ b/pkg/compact/overlapping.go @@ -0,0 +1,92 @@ +package compact + +import ( + "context" + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/oklog/ulid" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/tsdb" + "github.com/thanos-io/objstore" + "github.com/thanos-io/thanos/pkg/block" + "github.com/thanos-io/thanos/pkg/block/metadata" + "os" + "path/filepath" +) + +type OverlappingCompactionLifecycleCallback struct { + overlappingBlocks prometheus.Counter + metaDir string +} + +func NewOverlappingCompactionLifecycleCallback() *OverlappingCompactionLifecycleCallback { + return &OverlappingCompactionLifecycleCallback{} +} + +func (c *OverlappingCompactionLifecycleCallback) PreCompactionCallback(ctx context.Context, logger log.Logger, cg *Group, toCompact []*metadata.Meta) error { + if len(toCompact) == 0 { + return nil + } + longest := toCompact[0] + for _, m := range toCompact { + if m.MinTime < longest.MinTime && m.MaxTime > longest.MaxTime { + level.Warn(logger).Log("msg", "found longest overlapping block in plan that are not the first", + "first", longest.String(), "block", m.String()) + longest = m + } else if (m.MinTime < longest.MinTime && longest.MinTime < m.MaxTime) || + (m.MinTime < longest.MaxTime && longest.MaxTime < m.MaxTime) { + err := errors.Errorf("found partially overlapping block: %s vs %s", m.String(), longest.String()) + if cg.enableVerticalCompaction { + level.Error(logger).Log("msg", "best effort to vertical compact", "err", err) + return nil + } else { + return halt(err) + } + } + } + for i, m := range toCompact { + if m.ULID.Compare(longest.ULID) == 0 || m.Thanos.Source == metadata.ReceiveSource { + level.Info(logger).Log("msg", "keep this overlapping block", "block", m.String(), + "level", m.Compaction.Level, "source", m.Thanos.Source, "labels", m.Thanos.GetLabels()) + continue + } + cg.overlappingBlocks.Inc() + if err := DeleteBlockNow(ctx, logger, cg.bkt, m, c.metaDir); err != nil { + return retry(err) + } else { + // mark the block as nil so it won't be compacted + toCompact[i] = nil + } + } + return nil +} + +func (c *OverlappingCompactionLifecycleCallback) PostCompactionCallback(_ context.Context, _ log.Logger, _ *Group, _ ulid.ULID) error { + return nil +} + +func (c *OverlappingCompactionLifecycleCallback) GetBlockPopulator(_ context.Context, _ log.Logger, _ *Group) (tsdb.BlockPopulator, error) { + return tsdb.DefaultBlockPopulator{}, nil +} + +func FilterNilBlocks(blocks []*metadata.Meta) (res []*metadata.Meta) { + for _, b := range blocks { + if b != nil { + res = append(res, b) + } + } + return res +} + +func DeleteBlockNow(ctx context.Context, logger log.Logger, bkt objstore.Bucket, m *metadata.Meta, dir string) error { + level.Warn(logger).Log("msg", "delete polluted block immediately", "block", m.String(), + "level", m.Compaction.Level, "source", m.Thanos.Source, "labels", m.Thanos.GetLabels()) + if err := block.Delete(ctx, logger, bkt, m.ULID); err != nil { + return errors.Wrapf(err, "delete overlapping block %s", m.String()) + } + if err := os.RemoveAll(filepath.Join(dir, m.ULID.String())); err != nil { + return errors.Wrapf(err, "remove old block dir %s", m.String()) + } + return nil +} diff --git a/pkg/compact/overlapping_test.go b/pkg/compact/overlapping_test.go new file mode 100644 index 00000000000..78f248156c7 --- /dev/null +++ b/pkg/compact/overlapping_test.go @@ -0,0 +1,13 @@ +package compact + +import ( + "github.com/efficientgo/core/testutil" + "github.com/thanos-io/thanos/pkg/block/metadata" + "testing" +) + +func TestFilterNilCompact(t *testing.T) { + blocks := []*metadata.Meta{nil, nil} + filtered := FilterNilBlocks(blocks) + testutil.Equals(t, 0, len(filtered)) +}