Skip to content

Commit

Permalink
Internal: remove unnecessary parameter to NoCompactionMarkFilter (#7301)
Browse files Browse the repository at this point in the history
* Remove unnecessary parameter to NewNoCompactionMarkFilter.

Signed-off-by: Peter Štibraný <[email protected]>

* Remove unnecessary parameter to NewNoCompactionMarkFilter.

Signed-off-by: Peter Štibraný <[email protected]>

---------

Signed-off-by: Peter Štibraný <[email protected]>
  • Loading branch information
pstibrany authored Feb 5, 2024
1 parent 83ec531 commit 3026016
Show file tree
Hide file tree
Showing 6 changed files with 12 additions and 37 deletions.
2 changes: 1 addition & 1 deletion pkg/compactor/blocks_cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -679,7 +679,7 @@ func (c *BlocksCleaner) estimateCompactionJobsFrom(ctx context.Context, userID s
for _, f := range []block.MetadataFilter{
// We don't include ShardAwareDeduplicateFilter, because it relies on list of compaction sources, which are not present in the BucketIndex.
// We do include NoCompactionMarkFilter to avoid computing jobs from blocks that are marked for no-compaction.
NewNoCompactionMarkFilter(userBucket, true),
NewNoCompactionMarkFilter(userBucket),
} {
err := f.Filter(ctx, metas, synced)
if err != nil {
Expand Down
14 changes: 5 additions & 9 deletions pkg/compactor/bucket_compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1018,16 +1018,14 @@ var _ block.MetadataFilter = &NoCompactionMarkFilter{}
// NoCompactionMarkFilter is a block.Fetcher filter that finds all blocks with no-compact marker files, and optionally
// removes them from synced metas.
type NoCompactionMarkFilter struct {
bkt objstore.InstrumentedBucketReader
noCompactMarkedMap map[ulid.ULID]struct{}
removeNoCompactBlocks bool
bkt objstore.InstrumentedBucketReader
noCompactMarkedMap map[ulid.ULID]struct{}
}

// NewNoCompactionMarkFilter creates NoCompactionMarkFilter.
func NewNoCompactionMarkFilter(bkt objstore.InstrumentedBucketReader, removeNoCompactBlocks bool) *NoCompactionMarkFilter {
func NewNoCompactionMarkFilter(bkt objstore.InstrumentedBucketReader) *NoCompactionMarkFilter {
return &NoCompactionMarkFilter{
bkt: bkt,
removeNoCompactBlocks: removeNoCompactBlocks,
bkt: bkt,
}
}

Expand All @@ -1054,9 +1052,7 @@ func (f *NoCompactionMarkFilter) Filter(ctx context.Context, metas map[ulid.ULID
noCompactMarkedMap[blockID] = struct{}{}
synced.WithLabelValues(block.MarkedForNoCompactionMeta).Inc()

if f.removeNoCompactBlocks {
delete(metas, blockID)
}
delete(metas, blockID)
}

}
Expand Down
2 changes: 1 addition & 1 deletion pkg/compactor/bucket_compactor_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ func TestGroupCompactE2E(t *testing.T) {
reg := prometheus.NewRegistry()

duplicateBlocksFilter := NewShardAwareDeduplicateFilter()
noCompactMarkerFilter := NewNoCompactionMarkFilter(objstore.WithNoopInstr(bkt), true)
noCompactMarkerFilter := NewNoCompactionMarkFilter(objstore.WithNoopInstr(bkt))
metaFetcher, err := block.NewMetaFetcher(nil, 32, objstore.WithNoopInstr(bkt), "", nil, []block.MetadataFilter{
duplicateBlocksFilter,
noCompactMarkerFilter,
Expand Down
27 changes: 3 additions & 24 deletions pkg/compactor/bucket_compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,27 +175,6 @@ func TestNoCompactionMarkFilter(t *testing.T) {
block5 := ulid.MustParse("01DTVP434PA9VFXSW2JK000005") // No mark file.

for name, testFn := range map[string]func(t *testing.T, synced block.GaugeVec){
"filter with no deletion of blocks marked for no-compaction": func(t *testing.T, synced block.GaugeVec) {
metas := map[ulid.ULID]*block.Meta{
block1: blockMeta(block1.String(), 100, 200, nil),
block2: blockMeta(block2.String(), 200, 300, nil), // Has no-compaction marker.
block4: blockMeta(block4.String(), 400, 500, nil), // Invalid marker is still a marker, and block will be in NoCompactMarkedBlocks.
block5: blockMeta(block5.String(), 500, 600, nil),
}

f := NewNoCompactionMarkFilter(objstore.WithNoopInstr(bkt), false)
require.NoError(t, f.Filter(ctx, metas, synced))

require.Contains(t, metas, block1)
require.Contains(t, metas, block2)
require.Contains(t, metas, block4)
require.Contains(t, metas, block5)

require.Len(t, f.NoCompactMarkedBlocks(), 2)
require.Contains(t, f.NoCompactMarkedBlocks(), block2, block4)

assert.Equal(t, 2.0, testutil.ToFloat64(synced.WithLabelValues(block.MarkedForNoCompactionMeta)))
},
"filter with deletion enabled": func(t *testing.T, synced block.GaugeVec) {
metas := map[ulid.ULID]*block.Meta{
block1: blockMeta(block1.String(), 100, 200, nil),
Expand All @@ -204,7 +183,7 @@ func TestNoCompactionMarkFilter(t *testing.T) {
block5: blockMeta(block5.String(), 500, 600, nil),
}

f := NewNoCompactionMarkFilter(objstore.WithNoopInstr(bkt), true)
f := NewNoCompactionMarkFilter(objstore.WithNoopInstr(bkt))
require.NoError(t, f.Filter(ctx, metas, synced))

require.Contains(t, metas, block1)
Expand All @@ -230,7 +209,7 @@ func TestNoCompactionMarkFilter(t *testing.T) {
canceledCtx, cancel := context.WithCancel(context.Background())
cancel()

f := NewNoCompactionMarkFilter(objstore.WithNoopInstr(bkt), true)
f := NewNoCompactionMarkFilter(objstore.WithNoopInstr(bkt))
require.Error(t, f.Filter(canceledCtx, metas, synced))

require.Contains(t, metas, block1)
Expand All @@ -247,7 +226,7 @@ func TestNoCompactionMarkFilter(t *testing.T) {
block3: blockMeta(block3.String(), 300, 300, nil), // Has compaction marker with invalid version, but Filter doesn't check for that.
}

f := NewNoCompactionMarkFilter(objstore.WithNoopInstr(bkt), true)
f := NewNoCompactionMarkFilter(objstore.WithNoopInstr(bkt))
err := f.Filter(ctx, metas, synced)
require.NoError(t, err)
require.Empty(t, metas)
Expand Down
2 changes: 1 addition & 1 deletion pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -733,7 +733,7 @@ func (c *MultitenantCompactor) compactUser(ctx context.Context, userID string) e
}),
deduplicateBlocksFilter,
// removes blocks that should not be compacted due to being marked so.
NewNoCompactionMarkFilter(userBucket, true),
NewNoCompactionMarkFilter(userBucket),
}

fetcher, err := block.NewMetaFetcher(
Expand Down
2 changes: 1 addition & 1 deletion tools/compaction-planner/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func main() {

for _, f := range []block.MetadataFilter{
// No need to exclude blocks marked for deletion, as we did that above already.
compactor.NewNoCompactionMarkFilter(bucket.NewUserBucketClient(cfg.userID, bkt, nil), true),
compactor.NewNoCompactionMarkFilter(bucket.NewUserBucketClient(cfg.userID, bkt, nil)),
} {
log.Printf("Filtering using %T\n", f)
err = f.Filter(ctx, metas, synced)
Expand Down

0 comments on commit 3026016

Please sign in to comment.