Skip to content

Commit

Permalink
fix tools bucket replicate: fix ignoring of deleted blocks
Browse files Browse the repository at this point in the history
Signed-off-by: Martin Chodur <[email protected]>
  • Loading branch information
FUSAKLA committed Feb 15, 2022
1 parent 2898724 commit 9d1654b
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 37 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#5059](https://github.com/thanos-io/thanos/pull/5059) Compactor: Adding minimum retention flag validation for downsampling retention.
- [#4667](https://github.com/thanos-io/thanos/pull/4667) Add a pure aws-sdk auth for s3 storage.
- [#5111](https://github.com/thanos-io/thanos/pull/5111) Add matcher support to Query Rules endpoint.
- [#5117](https://github.com/thanos-io/thanos/pull/5117) Bucket replicate: Added flag `--ignore-marked-for-deletion` to avoid replication of blocks with the deletion mark.
- [#5162](https://github.com/thanos-io/thanos/pull/5162) Bucket replicate: Added flag `--ignore-marked-for-deletion` to avoid replication of blocks with the deletion mark.
- [#5148](https://github.com/thanos-io/thanos/pull/5148) Receive: Add tenant tag for tracing spans.

## Changed
Expand Down
33 changes: 23 additions & 10 deletions pkg/replicate/replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package replicate

import (
"context"
"github.com/thanos-io/thanos/pkg/objstore"
"math/rand"
"strconv"
"strings"
Expand Down Expand Up @@ -167,15 +168,7 @@ func RunReplicate(
}, []string{"result"})
replicationRunDuration.WithLabelValues(labelSuccess)
replicationRunDuration.WithLabelValues(labelError)

fetcher, err := thanosblock.NewMetaFetcher(
logger,
32,
fromBkt,
"",
reg,
[]thanosblock.MetadataFilter{thanosblock.NewTimePartitionMetaFilter(*minTime, *maxTime)},
)
fetcher, err := newMetaFetcher(logger, fromBkt, reg, *minTime, *maxTime, 32, ignoreMarkedForDeletion)
if err != nil {
return errors.Wrapf(err, "create meta fetcher with bucket %v", fromBkt)
}
Expand All @@ -186,7 +179,6 @@ func RunReplicate(
resolutions,
compactions,
blockIDs,
ignoreMarkedForDeletion,
).Filter
metrics := newReplicationMetrics(reg)
ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -243,3 +235,24 @@ func RunReplicate(

return nil
}

func newMetaFetcher(logger log.Logger, fromBkt objstore.InstrumentedBucket, reg prometheus.Registerer, minTime, maxTime thanosmodel.TimeOrDurationValue, concurrency int, ignoreMarkedForDeletion bool) (*thanosblock.MetaFetcher, error) {
filters := []thanosblock.MetadataFilter{
thanosblock.NewTimePartitionMetaFilter(minTime, maxTime),
}
if ignoreMarkedForDeletion {
filters = append(filters, thanosblock.NewIgnoreDeletionMarkFilter(logger, fromBkt, 0, concurrency))
}
fetcher, err := thanosblock.NewMetaFetcher(
logger,
concurrency,
fromBkt,
"",
reg,
filters,
)
if err != nil {
return nil, err
}
return fetcher, nil
}
36 changes: 13 additions & 23 deletions pkg/replicate/scheme.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,11 @@ import (

// BlockFilter is block filter that filters out compacted and unselected blocks.
type BlockFilter struct {
logger log.Logger
labelSelector labels.Selector
resolutionLevels map[compact.ResolutionLevel]struct{}
compactionLevels map[int]struct{}
blockIDs []ulid.ULID
ignoreMarkedForDeletion bool
logger log.Logger
labelSelector labels.Selector
resolutionLevels map[compact.ResolutionLevel]struct{}
compactionLevels map[int]struct{}
blockIDs []ulid.ULID
}

// NewBlockFilter returns block filter.
Expand All @@ -44,7 +43,6 @@ func NewBlockFilter(
resolutionLevels []compact.ResolutionLevel,
compactionLevels []int,
blockIDs []ulid.ULID,
ignoreMarkedForDeletion bool,
) *BlockFilter {
allowedResolutions := make(map[compact.ResolutionLevel]struct{})
for _, resolutionLevel := range resolutionLevels {
Expand All @@ -56,20 +54,16 @@ func NewBlockFilter(
}

return &BlockFilter{
labelSelector: labelSelector,
logger: logger,
resolutionLevels: allowedResolutions,
compactionLevels: allowedCompactions,
blockIDs: blockIDs,
ignoreMarkedForDeletion: ignoreMarkedForDeletion,
labelSelector: labelSelector,
logger: logger,
resolutionLevels: allowedResolutions,
compactionLevels: allowedCompactions,
blockIDs: blockIDs,
}
}

// Filter return true if block is non-compacted and matches selector.
func (bf *BlockFilter) Filter(b *metadata.Meta, markedForDeletion bool) bool {
if bf.ignoreMarkedForDeletion && markedForDeletion {
return false
}
func (bf *BlockFilter) Filter(b *metadata.Meta) bool {
if len(b.Thanos.Labels) == 0 {
level.Error(bf.logger).Log("msg", "filtering block", "reason", "labels should not be empty")
return false
Expand Down Expand Up @@ -121,7 +115,7 @@ func (bf *BlockFilter) Filter(b *metadata.Meta, markedForDeletion bool) bool {
return true
}

type blockFilterFunc func(b *metadata.Meta, markedForDeletion bool) bool
type blockFilterFunc func(b *metadata.Meta) bool

// TODO: Add filters field.
type replicationScheme struct {
Expand Down Expand Up @@ -198,11 +192,7 @@ func (rs *replicationScheme) execute(ctx context.Context) error {
}

for id, meta := range metas {
_, err := rs.fromBkt.ReaderWithExpectedErrs(rs.fromBkt.IsObjNotFoundErr).Get(ctx, path.Join(meta.ULID.String(), metadata.DeletionMarkFilename))
if err != nil && !rs.fromBkt.IsObjNotFoundErr(err) {
return errors.Wrapf(err, "failed to read deletion mark from origin bucket block %s", meta.ULID.String())
}
if rs.blockFilter(meta, !rs.fromBkt.IsObjNotFoundErr(err)) {
if rs.blockFilter(meta) {
level.Info(rs.logger).Log("msg", "adding block to be replicated", "block_uuid", id.String())
availableBlocks = append(availableBlocks, meta)
}
Expand Down
20 changes: 17 additions & 3 deletions pkg/replicate/scheme_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"bytes"
"context"
"encoding/json"
"github.com/thanos-io/thanos/pkg/model"
"io"
"math/rand"
"os"
Expand All @@ -20,13 +21,19 @@ import (
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/tsdb"

"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/compact"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/testutil"
)

var (
minTime = time.Unix(0, 0)
maxTime, _ = time.Parse(time.RFC3339, "9999-12-31T23:59:59Z")
minTimeDuration = model.TimeOrDurationValue{Time: &minTime}
maxTimeDuration = model.TimeOrDurationValue{Time: &maxTime}
)

func testLogger(testName string) log.Logger {
return log.With(
level.NewFilter(log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)), level.AllowDebug()),
Expand Down Expand Up @@ -374,8 +381,15 @@ func TestReplicationSchemeAll(t *testing.T) {
selector = c.selector
}

filter := NewBlockFilter(logger, selector, []compact.ResolutionLevel{compact.ResolutionLevelRaw}, []int{1}, c.blockIDs, c.ignoreMarkedForDeletion).Filter
fetcher, err := block.NewMetaFetcher(logger, 32, objstore.WithNoopInstr(originBucket), "", nil, nil)
filter := NewBlockFilter(logger, selector, []compact.ResolutionLevel{compact.ResolutionLevelRaw}, []int{1}, c.blockIDs).Filter
fetcher, err := newMetaFetcher(
logger, objstore.WithNoopInstr(originBucket),
nil,
minTimeDuration,
maxTimeDuration,
32,
c.ignoreMarkedForDeletion,
)
testutil.Ok(t, err)

r := newReplicationScheme(logger, newReplicationMetrics(nil), filter, fetcher, objstore.WithNoopInstr(originBucket), targetBucket, nil)
Expand Down

0 comments on commit 9d1654b

Please sign in to comment.