diff --git a/CHANGELOG.md b/CHANGELOG.md index 89c1aef8933..12f1624904e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,7 +27,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#5059](https://github.com/thanos-io/thanos/pull/5059) Compactor: Adding minimum retention flag validation for downsampling retention. - [#4667](https://github.com/thanos-io/thanos/pull/4667) Add a pure aws-sdk auth for s3 storage. - [#5111](https://github.com/thanos-io/thanos/pull/5111) Add matcher support to Query Rules endpoint. -- [#5117](https://github.com/thanos-io/thanos/pull/5117) Bucket replicate: Added flag `--ignore-marked-for-deletion` to avoid replication of blocks with the deletion mark. +- [#5162](https://github.com/thanos-io/thanos/pull/5162) Bucket replicate: Added flag `--ignore-marked-for-deletion` to avoid replication of blocks with the deletion mark. - [#5148](https://github.com/thanos-io/thanos/pull/5148) Receive: Add tenant tag for tracing spans. ## Changed diff --git a/pkg/replicate/replicator.go b/pkg/replicate/replicator.go index 8d5776110bc..aff95f86600 100644 --- a/pkg/replicate/replicator.go +++ b/pkg/replicate/replicator.go @@ -5,6 +5,7 @@ package replicate import ( "context" + "github.com/thanos-io/thanos/pkg/objstore" "math/rand" "strconv" "strings" @@ -167,15 +168,7 @@ func RunReplicate( }, []string{"result"}) replicationRunDuration.WithLabelValues(labelSuccess) replicationRunDuration.WithLabelValues(labelError) - - fetcher, err := thanosblock.NewMetaFetcher( - logger, - 32, - fromBkt, - "", - reg, - []thanosblock.MetadataFilter{thanosblock.NewTimePartitionMetaFilter(*minTime, *maxTime)}, - ) + fetcher, err := newMetaFetcher(logger, fromBkt, reg, *minTime, *maxTime, 32, ignoreMarkedForDeletion) if err != nil { return errors.Wrapf(err, "create meta fetcher with bucket %v", fromBkt) } @@ -186,7 +179,6 @@ func RunReplicate( resolutions, compactions, blockIDs, - ignoreMarkedForDeletion, ).Filter metrics := newReplicationMetrics(reg) ctx, cancel := context.WithCancel(context.Background()) @@ -243,3 +235,24 @@ func RunReplicate( return nil } + +func newMetaFetcher(logger log.Logger, fromBkt objstore.InstrumentedBucket, reg prometheus.Registerer, minTime, maxTime thanosmodel.TimeOrDurationValue, concurrency int, ignoreMarkedForDeletion bool) (*thanosblock.MetaFetcher, error) { + filters := []thanosblock.MetadataFilter{ + thanosblock.NewTimePartitionMetaFilter(minTime, maxTime), + } + if ignoreMarkedForDeletion { + filters = append(filters, thanosblock.NewIgnoreDeletionMarkFilter(logger, fromBkt, 0, concurrency)) + } + fetcher, err := thanosblock.NewMetaFetcher( + logger, + concurrency, + fromBkt, + "", + reg, + filters, + ) + if err != nil { + return nil, err + } + return fetcher, nil +} diff --git a/pkg/replicate/scheme.go b/pkg/replicate/scheme.go index 05d68991add..f9b1067b8ee 100644 --- a/pkg/replicate/scheme.go +++ b/pkg/replicate/scheme.go @@ -29,12 +29,11 @@ import ( // BlockFilter is block filter that filters out compacted and unselected blocks. type BlockFilter struct { - logger log.Logger - labelSelector labels.Selector - resolutionLevels map[compact.ResolutionLevel]struct{} - compactionLevels map[int]struct{} - blockIDs []ulid.ULID - ignoreMarkedForDeletion bool + logger log.Logger + labelSelector labels.Selector + resolutionLevels map[compact.ResolutionLevel]struct{} + compactionLevels map[int]struct{} + blockIDs []ulid.ULID } // NewBlockFilter returns block filter. @@ -44,7 +43,6 @@ func NewBlockFilter( resolutionLevels []compact.ResolutionLevel, compactionLevels []int, blockIDs []ulid.ULID, - ignoreMarkedForDeletion bool, ) *BlockFilter { allowedResolutions := make(map[compact.ResolutionLevel]struct{}) for _, resolutionLevel := range resolutionLevels { @@ -56,20 +54,16 @@ func NewBlockFilter( } return &BlockFilter{ - labelSelector: labelSelector, - logger: logger, - resolutionLevels: allowedResolutions, - compactionLevels: allowedCompactions, - blockIDs: blockIDs, - ignoreMarkedForDeletion: ignoreMarkedForDeletion, + labelSelector: labelSelector, + logger: logger, + resolutionLevels: allowedResolutions, + compactionLevels: allowedCompactions, + blockIDs: blockIDs, } } // Filter return true if block is non-compacted and matches selector. -func (bf *BlockFilter) Filter(b *metadata.Meta, markedForDeletion bool) bool { - if bf.ignoreMarkedForDeletion && markedForDeletion { - return false - } +func (bf *BlockFilter) Filter(b *metadata.Meta) bool { if len(b.Thanos.Labels) == 0 { level.Error(bf.logger).Log("msg", "filtering block", "reason", "labels should not be empty") return false @@ -121,7 +115,7 @@ func (bf *BlockFilter) Filter(b *metadata.Meta, markedForDeletion bool) bool { return true } -type blockFilterFunc func(b *metadata.Meta, markedForDeletion bool) bool +type blockFilterFunc func(b *metadata.Meta) bool // TODO: Add filters field. type replicationScheme struct { @@ -198,11 +192,7 @@ func (rs *replicationScheme) execute(ctx context.Context) error { } for id, meta := range metas { - _, err := rs.fromBkt.ReaderWithExpectedErrs(rs.fromBkt.IsObjNotFoundErr).Get(ctx, path.Join(meta.ULID.String(), metadata.DeletionMarkFilename)) - if err != nil && !rs.fromBkt.IsObjNotFoundErr(err) { - return errors.Wrapf(err, "failed to read deletion mark from origin bucket block %s", meta.ULID.String()) - } - if rs.blockFilter(meta, !rs.fromBkt.IsObjNotFoundErr(err)) { + if rs.blockFilter(meta) { level.Info(rs.logger).Log("msg", "adding block to be replicated", "block_uuid", id.String()) availableBlocks = append(availableBlocks, meta) } diff --git a/pkg/replicate/scheme_test.go b/pkg/replicate/scheme_test.go index 8d88f096f7d..5d588b92ccd 100644 --- a/pkg/replicate/scheme_test.go +++ b/pkg/replicate/scheme_test.go @@ -7,6 +7,7 @@ import ( "bytes" "context" "encoding/json" + "github.com/thanos-io/thanos/pkg/model" "io" "math/rand" "os" @@ -20,13 +21,19 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/tsdb" - "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/compact" "github.com/thanos-io/thanos/pkg/objstore" "github.com/thanos-io/thanos/pkg/testutil" ) +var ( + minTime = time.Unix(0, 0) + maxTime, _ = time.Parse(time.RFC3339, "9999-12-31T23:59:59Z") + minTimeDuration = model.TimeOrDurationValue{Time: &minTime} + maxTimeDuration = model.TimeOrDurationValue{Time: &maxTime} +) + func testLogger(testName string) log.Logger { return log.With( level.NewFilter(log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)), level.AllowDebug()), @@ -374,8 +381,15 @@ func TestReplicationSchemeAll(t *testing.T) { selector = c.selector } - filter := NewBlockFilter(logger, selector, []compact.ResolutionLevel{compact.ResolutionLevelRaw}, []int{1}, c.blockIDs, c.ignoreMarkedForDeletion).Filter - fetcher, err := block.NewMetaFetcher(logger, 32, objstore.WithNoopInstr(originBucket), "", nil, nil) + filter := NewBlockFilter(logger, selector, []compact.ResolutionLevel{compact.ResolutionLevelRaw}, []int{1}, c.blockIDs).Filter + fetcher, err := newMetaFetcher( + logger, objstore.WithNoopInstr(originBucket), + nil, + minTimeDuration, + maxTimeDuration, + 32, + c.ignoreMarkedForDeletion, + ) testutil.Ok(t, err) r := newReplicationScheme(logger, newReplicationMetrics(nil), filter, fetcher, objstore.WithNoopInstr(originBucket), targetBucket, nil)