Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix tools bucket replicate: fix ignoring of deleted blocks #5162

Merged
merged 3 commits into from
Feb 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#4667](https://github.com/thanos-io/thanos/pull/4667) Add a pure aws-sdk auth for s3 storage.
- [#5111](https://github.com/thanos-io/thanos/pull/5111) Add matcher support to Query Rules endpoint.
- [#5117](https://github.com/thanos-io/thanos/pull/5117) Bucket replicate: Added flag `--ignore-marked-for-deletion` to avoid replication of blocks with the deletion mark.
- [#5148](https://github.com/thanos-io/thanos/pull/5148) Receive: Add tenant tag for tracing spans.
- [#5148](https://github.com/thanos-io/thanos/pull/5148) Receive: Add tenant tag for tracing spans.

## Changed
- [#5144](https://github.com/thanos-io/thanos/pull/5144) UI: Improve graph color
Expand Down Expand Up @@ -220,11 +220,11 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re

### Changed

-
-

### Removed

-
-

## [v0.20.0](https://github.com/thanos-io/thanos/releases/tag/v0.20.0) - 2021.04.28

Expand Down
37 changes: 27 additions & 10 deletions pkg/replicate/replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/extprom"
thanosmodel "github.com/thanos-io/thanos/pkg/model"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/objstore/client"
"github.com/thanos-io/thanos/pkg/prober"
"github.com/thanos-io/thanos/pkg/runutil"
Expand Down Expand Up @@ -167,15 +168,7 @@ func RunReplicate(
}, []string{"result"})
replicationRunDuration.WithLabelValues(labelSuccess)
replicationRunDuration.WithLabelValues(labelError)

fetcher, err := thanosblock.NewMetaFetcher(
logger,
32,
fromBkt,
"",
reg,
[]thanosblock.MetadataFilter{thanosblock.NewTimePartitionMetaFilter(*minTime, *maxTime)},
)
fetcher, err := newMetaFetcher(logger, fromBkt, reg, *minTime, *maxTime, 32, ignoreMarkedForDeletion)
if err != nil {
return errors.Wrapf(err, "create meta fetcher with bucket %v", fromBkt)
}
Expand All @@ -186,7 +179,6 @@ func RunReplicate(
resolutions,
compactions,
blockIDs,
ignoreMarkedForDeletion,
).Filter
metrics := newReplicationMetrics(reg)
ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -243,3 +235,28 @@ func RunReplicate(

return nil
}

func newMetaFetcher(
logger log.Logger,
fromBkt objstore.InstrumentedBucket,
reg prometheus.Registerer,
minTime,
maxTime thanosmodel.TimeOrDurationValue,
concurrency int,
ignoreMarkedForDeletion bool,
) (*thanosblock.MetaFetcher, error) {
filters := []thanosblock.MetadataFilter{
thanosblock.NewTimePartitionMetaFilter(minTime, maxTime),
}
if ignoreMarkedForDeletion {
filters = append(filters, thanosblock.NewIgnoreDeletionMarkFilter(logger, fromBkt, 0, concurrency))
}
return thanosblock.NewMetaFetcher(
logger,
concurrency,
fromBkt,
"",
reg,
filters,
)
}
36 changes: 13 additions & 23 deletions pkg/replicate/scheme.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,11 @@ import (

// BlockFilter is block filter that filters out compacted and unselected blocks.
type BlockFilter struct {
logger log.Logger
labelSelector labels.Selector
resolutionLevels map[compact.ResolutionLevel]struct{}
compactionLevels map[int]struct{}
blockIDs []ulid.ULID
ignoreMarkedForDeletion bool
logger log.Logger
labelSelector labels.Selector
resolutionLevels map[compact.ResolutionLevel]struct{}
compactionLevels map[int]struct{}
blockIDs []ulid.ULID
}

// NewBlockFilter returns block filter.
Expand All @@ -44,7 +43,6 @@ func NewBlockFilter(
resolutionLevels []compact.ResolutionLevel,
compactionLevels []int,
blockIDs []ulid.ULID,
ignoreMarkedForDeletion bool,
) *BlockFilter {
allowedResolutions := make(map[compact.ResolutionLevel]struct{})
for _, resolutionLevel := range resolutionLevels {
Expand All @@ -56,20 +54,16 @@ func NewBlockFilter(
}

return &BlockFilter{
labelSelector: labelSelector,
logger: logger,
resolutionLevels: allowedResolutions,
compactionLevels: allowedCompactions,
blockIDs: blockIDs,
ignoreMarkedForDeletion: ignoreMarkedForDeletion,
labelSelector: labelSelector,
logger: logger,
resolutionLevels: allowedResolutions,
compactionLevels: allowedCompactions,
blockIDs: blockIDs,
}
}

// Filter return true if block is non-compacted and matches selector.
func (bf *BlockFilter) Filter(b *metadata.Meta, markedForDeletion bool) bool {
if bf.ignoreMarkedForDeletion && markedForDeletion {
return false
}
func (bf *BlockFilter) Filter(b *metadata.Meta) bool {
if len(b.Thanos.Labels) == 0 {
level.Error(bf.logger).Log("msg", "filtering block", "reason", "labels should not be empty")
return false
Expand Down Expand Up @@ -121,7 +115,7 @@ func (bf *BlockFilter) Filter(b *metadata.Meta, markedForDeletion bool) bool {
return true
}

type blockFilterFunc func(b *metadata.Meta, markedForDeletion bool) bool
type blockFilterFunc func(b *metadata.Meta) bool

// TODO: Add filters field.
type replicationScheme struct {
Expand Down Expand Up @@ -198,11 +192,7 @@ func (rs *replicationScheme) execute(ctx context.Context) error {
}

for id, meta := range metas {
_, err := rs.fromBkt.ReaderWithExpectedErrs(rs.fromBkt.IsObjNotFoundErr).Get(ctx, path.Join(meta.ULID.String(), metadata.DeletionMarkFilename))
if err != nil && !rs.fromBkt.IsObjNotFoundErr(err) {
return errors.Wrapf(err, "failed to read deletion mark from origin bucket block %s", meta.ULID.String())
}
if rs.blockFilter(meta, !rs.fromBkt.IsObjNotFoundErr(err)) {
if rs.blockFilter(meta) {
level.Info(rs.logger).Log("msg", "adding block to be replicated", "block_uuid", id.String())
availableBlocks = append(availableBlocks, meta)
}
Expand Down
20 changes: 17 additions & 3 deletions pkg/replicate/scheme_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,20 @@ import (
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/tsdb"

"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/compact"
"github.com/thanos-io/thanos/pkg/model"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/testutil"
)

var (
minTime = time.Unix(0, 0)
maxTime, _ = time.Parse(time.RFC3339, "9999-12-31T23:59:59Z")
minTimeDuration = model.TimeOrDurationValue{Time: &minTime}
maxTimeDuration = model.TimeOrDurationValue{Time: &maxTime}
)

func testLogger(testName string) log.Logger {
return log.With(
level.NewFilter(log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)), level.AllowDebug()),
Expand Down Expand Up @@ -374,8 +381,15 @@ func TestReplicationSchemeAll(t *testing.T) {
selector = c.selector
}

filter := NewBlockFilter(logger, selector, []compact.ResolutionLevel{compact.ResolutionLevelRaw}, []int{1}, c.blockIDs, c.ignoreMarkedForDeletion).Filter
fetcher, err := block.NewMetaFetcher(logger, 32, objstore.WithNoopInstr(originBucket), "", nil, nil)
filter := NewBlockFilter(logger, selector, []compact.ResolutionLevel{compact.ResolutionLevelRaw}, []int{1}, c.blockIDs).Filter
fetcher, err := newMetaFetcher(
logger, objstore.WithNoopInstr(originBucket),
nil,
minTimeDuration,
maxTimeDuration,
32,
c.ignoreMarkedForDeletion,
)
testutil.Ok(t, err)

r := newReplicationScheme(logger, newReplicationMetrics(nil), filter, fetcher, objstore.WithNoopInstr(originBucket), targetBucket, nil)
Expand Down