From 2b9fa84e194dac866a75cd21f031b2c9eabdf41d Mon Sep 17 00:00:00 2001
From: Bartlomiej Plotka <bwplotka@gmail.com>
Date: Mon, 6 Apr 2020 13:16:12 +0100
Subject: [PATCH] compact: Made MarkForDeletion less strict; Added more
 debugability to block deletion logic, made meta sync explicit.

Also:

* Changed order: Now BestEffortCleanAbortedPartialUploads is before DeleteMarkedBlocks.
* Increment markedForDeletion counter only when we actually uploaded block.
* Fixed logging issues.

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
---
 cmd/thanos/compact.go         | 88 +++++++++++++++++++++++------------
 cmd/thanos/downsample.go      | 21 +++++----
 cmd/thanos/main_test.go       |  8 +++-
 pkg/block/block.go            |  9 ++--
 pkg/block/block_test.go       | 82 +++++++++++++++++---------------
 pkg/block/fetcher.go          |  8 ++--
 pkg/compact/blocks_cleaner.go |  3 +-
 pkg/compact/clean.go          | 17 ++++---
 pkg/compact/clean_test.go     |  6 ++-
 pkg/compact/compact.go        | 53 +++++++++++++--------
 pkg/compact/retention.go      | 20 ++++----
 pkg/compact/retention_test.go |  6 ++-
 pkg/verifier/safe_delete.go   |  7 +--
 13 files changed, 198 insertions(+), 130 deletions(-)

diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go
index 0b1c22769f6..5efc3f6f48e 100644
--- a/cmd/thanos/compact.go
+++ b/cmd/thanos/compact.go
@@ -16,6 +16,7 @@ import (
 	"github.com/go-kit/kit/log"
 	"github.com/go-kit/kit/log/level"
 	"github.com/oklog/run"
+	"github.com/oklog/ulid"
 	"github.com/opentracing/opentracing-go"
 	"github.com/pkg/errors"
 	"github.com/prometheus/client_golang/prometheus"
@@ -113,7 +114,7 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application) {
 	waitInterval := cmd.Flag("wait-interval", "Wait interval between consecutive compaction runs and bucket refreshes. Only works when --wait flag specified.").
 		Default("5m").Duration()
 
-	generateMissingIndexCacheFiles := cmd.Flag("index.generate-missing-cache-file", "If enabled, on startup compactor runs an on-off job that scans all the blocks to find all blocks with missing index cache file. It generates those if needed and upload.").
+	generateMissingIndexCacheFiles := cmd.Flag("index.generate-missing-cache-file", "DEPRECATED flag. Will be removed in next release. If enabled, on startup compactor runs an on-off job that scans all the blocks to find all blocks with missing index cache file. It generates those if needed and upload.").
 		Hidden().Default("false").Bool()
 
 	disableDownsampling := cmd.Flag("downsampling.disable", "Disables downsampling. This is not recommended "+
@@ -290,29 +291,48 @@ func runCompact(
 	}()
 
 	// While fetching blocks, we filter out blocks that were marked for deletion by using IgnoreDeletionMarkFilter.
-	// The delay of  deleteDelay/2 is added to ensure we fetch blocks that are meant to be deleted but do not have a replacement yet.
-	ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, bkt, time.Duration(deleteDelay.Seconds()/2)*time.Second)
+	// The delay of deleteDelay/2 is added to ensure we fetch blocks that are meant to be deleted but do not have a replacement yet.
+	// This is to make sure compactor will not accidentally perform compactions with gap instead.
+	ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, bkt, deleteDelay/2)
 	duplicateBlocksFilter := block.NewDeduplicateFilter()
 
 	baseMetaFetcher, err := block.NewBaseFetcher(logger, 32, bkt, "", extprom.WrapRegistererWithPrefix("thanos_", reg))
 	if err != nil {
 		return errors.Wrap(err, "create meta fetcher")
 	}
-	compactFetcher := baseMetaFetcher.NewMetaFetcher(extprom.WrapRegistererWithPrefix("thanos_", reg), []block.MetadataFilter{
-		block.NewLabelShardedMetaFilter(relabelConfig),
-		block.NewConsistencyDelayMetaFilter(logger, consistencyDelay, extprom.WrapRegistererWithPrefix("thanos_", reg)),
-		ignoreDeletionMarkFilter,
-		duplicateBlocksFilter,
-	}, []block.MetadataModifier{block.NewReplicaLabelRemover(logger, dedupReplicaLabels)})
+
 	enableVerticalCompaction := false
 	if len(dedupReplicaLabels) > 0 {
 		enableVerticalCompaction = true
 		level.Info(logger).Log("msg", "deduplication.replica-label specified, vertical compaction is enabled", "dedupReplicaLabels", strings.Join(dedupReplicaLabels, ","))
 	}
 
-	sy, err := compact.NewSyncer(logger, reg, bkt, compactFetcher, duplicateBlocksFilter, ignoreDeletionMarkFilter, blocksMarkedForDeletion, blockSyncConcurrency, acceptMalformedIndex, enableVerticalCompaction)
-	if err != nil {
-		return errors.Wrap(err, "create syncer")
+	compactorView := ui.NewBucketUI(logger, label, path.Join(externalPrefix, "/loaded"), prefixHeader)
+	var sy *compact.Syncer
+	{
+		// Make sure all compactor meta syncs are done through Syncer.SyncMeta for readability.
+		cf := baseMetaFetcher.NewMetaFetcher(
+			extprom.WrapRegistererWithPrefix("thanos_", reg), []block.MetadataFilter{
+				block.NewLabelShardedMetaFilter(relabelConfig),
+				block.NewConsistencyDelayMetaFilter(logger, consistencyDelay, extprom.WrapRegistererWithPrefix("thanos_", reg)),
+				ignoreDeletionMarkFilter,
+				duplicateBlocksFilter,
+			}, []block.MetadataModifier{block.NewReplicaLabelRemover(logger, dedupReplicaLabels)},
+		)
+		cf.UpdateOnChange(compactorView.Set)
+		sy, err = compact.NewSyncer(
+			logger,
+			reg,
+			bkt,
+			cf,
+			duplicateBlocksFilter,
+			ignoreDeletionMarkFilter,
+			blocksMarkedForDeletion,
+			blockSyncConcurrency,
+			acceptMalformedIndex, enableVerticalCompaction)
+		if err != nil {
+			return errors.Wrap(err, "create syncer")
+		}
 	}
 
 	levels, err := compactions.levels(maxCompactionLevel)
@@ -371,39 +391,52 @@ func runCompact(
 			// We run two passes of this to ensure that the 1h downsampling is generated
 			// for 5m downsamplings created in the first run.
 			level.Info(logger).Log("msg", "start first pass of downsampling")
-
-			if err := downsampleBucket(ctx, logger, downsampleMetrics, bkt, compactFetcher, downsamplingDir); err != nil {
+			if err := sy.SyncMetas(ctx); err != nil {
+				return errors.Wrap(err, "sync before first pass of downsampling")
+			}
+			if err := downsampleBucket(ctx, logger, downsampleMetrics, bkt, sy.Metas(), downsamplingDir); err != nil {
 				return errors.Wrap(err, "first pass of downsampling failed")
 			}
 
 			level.Info(logger).Log("msg", "start second pass of downsampling")
-
-			if err := downsampleBucket(ctx, logger, downsampleMetrics, bkt, compactFetcher, downsamplingDir); err != nil {
+			if err := sy.SyncMetas(ctx); err != nil {
+				return errors.Wrap(err, "sync before second pass of downsampling")
+			}
+			if err := downsampleBucket(ctx, logger, downsampleMetrics, bkt, sy.Metas(), downsamplingDir); err != nil {
 				return errors.Wrap(err, "second pass of downsampling failed")
 			}
 			level.Info(logger).Log("msg", "downsampling iterations done")
 		} else {
-			level.Warn(logger).Log("msg", "downsampling was explicitly disabled")
+			level.Info(logger).Log("msg", "downsampling was explicitly disabled")
 		}
+		// TODO(bwplotka): Find a way to avoid syncing if no op was done
 
-		if err := compact.ApplyRetentionPolicyByResolution(ctx, logger, bkt, compactFetcher, retentionByResolution, blocksMarkedForDeletion); err != nil {
+		if err := sy.SyncMetas(ctx); err != nil {
+			return errors.Wrap(err, "sync before first pass of downsampling")
+		}
+
+		if err := compact.ApplyRetentionPolicyByResolution(ctx, logger, bkt, sy.Metas(), retentionByResolution, blocksMarkedForDeletion); err != nil {
 			return errors.Wrap(err, fmt.Sprintf("retention failed"))
 		}
 
+		// No need to resync before partial uploads and delete marked blocks. Last sync should be valid.
+		compact.BestEffortCleanAbortedPartialUploads(ctx, logger, sy.Partial(), bkt, partialUploadDeleteAttempts, blocksMarkedForDeletion)
 		if err := blocksCleaner.DeleteMarkedBlocks(ctx); err != nil {
 			return errors.Wrap(err, "error cleaning blocks")
 		}
-
-		compact.BestEffortCleanAbortedPartialUploads(ctx, logger, compactFetcher, bkt, partialUploadDeleteAttempts, blocksMarkedForDeletion)
 		return nil
 	}
 
 	g.Add(func() error {
 		defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client")
 
-		// Generate index file.
+		// Generate index files.
+		// TODO(bwplotka): Remove this in next release.
 		if generateMissingIndexCacheFiles {
-			if err := genMissingIndexCacheFiles(ctx, logger, reg, bkt, compactFetcher, indexCacheDir); err != nil {
+			if err := sy.SyncMetas(ctx); err != nil {
+				return err
+			}
+			if err := genMissingIndexCacheFiles(ctx, logger, reg, bkt, sy.Metas(), indexCacheDir); err != nil {
 				return err
 			}
 		}
@@ -451,16 +484,14 @@ func runCompact(
 		r := route.New()
 
 		ins := extpromhttp.NewInstrumentationMiddleware(reg)
-		compactorView := ui.NewBucketUI(logger, label, path.Join(externalPrefix, "/loaded"), prefixHeader)
 		compactorView.Register(r, ins)
-		compactFetcher.UpdateOnChange(compactorView.Set)
 
 		global := ui.NewBucketUI(logger, label, path.Join(externalPrefix, "/global"), prefixHeader)
 		global.Register(r, ins)
 
 		// Separate fetcher for global view.
 		// TODO(bwplotka): Allow Bucket UI to visualize the state of the block as well.
-		f := baseMetaFetcher.NewMetaFetcher(extprom.WrapRegistererWithPrefix("thanos_bucket_ui", reg), nil, nil)
+		f := baseMetaFetcher.NewMetaFetcher(extprom.WrapRegistererWithPrefix("thanos_bucket_ui", reg), nil, nil, "component", "globalBucketUI")
 		f.UpdateOnChange(global.Set)
 
 		srv.Handle("/", r)
@@ -494,7 +525,7 @@ func runCompact(
 }
 
 // genMissingIndexCacheFiles scans over all blocks, generates missing index cache files and uploads them to object storage.
-func genMissingIndexCacheFiles(ctx context.Context, logger log.Logger, reg *prometheus.Registry, bkt objstore.Bucket, fetcher block.MetadataFetcher, dir string) error {
+func genMissingIndexCacheFiles(ctx context.Context, logger log.Logger, reg *prometheus.Registry, bkt objstore.Bucket, metas map[ulid.ULID]*metadata.Meta, dir string) error {
 	genIndex := promauto.With(reg).NewCounter(prometheus.CounterOpts{
 		Name: metricIndexGenerateName,
 		Help: metricIndexGenerateHelp,
@@ -515,11 +546,6 @@ func genMissingIndexCacheFiles(ctx context.Context, logger log.Logger, reg *prom
 
 	level.Info(logger).Log("msg", "start index cache processing")
 
-	metas, _, err := fetcher.Fetch(ctx)
-	if err != nil {
-		return errors.Wrap(err, "fetch metas")
-	}
-
 	for _, meta := range metas {
 		// New version of compactor pushes index cache along with data block.
 		// Skip uncompacted blocks.
diff --git a/cmd/thanos/downsample.go b/cmd/thanos/downsample.go
index c98335fd0cf..13e1aa8a685 100644
--- a/cmd/thanos/downsample.go
+++ b/cmd/thanos/downsample.go
@@ -102,14 +102,20 @@ func RunDownsample(
 			statusProber.Ready()
 
 			level.Info(logger).Log("msg", "start first pass of downsampling")
-
-			if err := downsampleBucket(ctx, logger, metrics, bkt, metaFetcher, dataDir); err != nil {
+			metas, _, err := metaFetcher.Fetch(ctx)
+			if err != nil {
+				return errors.Wrap(err, "sync before first pass of downsampling")
+			}
+			if err := downsampleBucket(ctx, logger, metrics, bkt, metas, dataDir); err != nil {
 				return errors.Wrap(err, "downsampling failed")
 			}
 
 			level.Info(logger).Log("msg", "start second pass of downsampling")
-
-			if err := downsampleBucket(ctx, logger, metrics, bkt, metaFetcher, dataDir); err != nil {
+			metas, _, err = metaFetcher.Fetch(ctx)
+			if err != nil {
+				return errors.Wrap(err, "sync before second pass of downsampling")
+			}
+			if err := downsampleBucket(ctx, logger, metrics, bkt, metas, dataDir); err != nil {
 				return errors.Wrap(err, "downsampling failed")
 			}
 
@@ -144,7 +150,7 @@ func downsampleBucket(
 	logger log.Logger,
 	metrics *DownsampleMetrics,
 	bkt objstore.Bucket,
-	fetcher block.MetadataFetcher,
+	metas map[ulid.ULID]*metadata.Meta,
 	dir string,
 ) error {
 	if err := os.RemoveAll(dir); err != nil {
@@ -160,11 +166,6 @@ func downsampleBucket(
 		}
 	}()
 
-	metas, _, err := fetcher.Fetch(ctx)
-	if err != nil {
-		return errors.Wrap(err, "downsampling meta fetch")
-	}
-
 	// mapping from a hash over all source IDs to blocks. We don't need to downsample a block
 	// if a downsampled version with the same hash already exists.
 	sources5m := map[ulid.ULID]struct{}{}
diff --git a/cmd/thanos/main_test.go b/cmd/thanos/main_test.go
index ff0aa2863b7..ffd6598bd81 100644
--- a/cmd/thanos/main_test.go
+++ b/cmd/thanos/main_test.go
@@ -79,7 +79,9 @@ func TestCleanupIndexCacheFolder(t *testing.T) {
 	metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil, nil, nil)
 	testutil.Ok(t, err)
 
-	testutil.Ok(t, genMissingIndexCacheFiles(ctx, logger, reg, bkt, metaFetcher, dir))
+	metas, _, err := metaFetcher.Fetch(ctx)
+	testutil.Ok(t, err)
+	testutil.Ok(t, genMissingIndexCacheFiles(ctx, logger, reg, bkt, metas, dir))
 
 	genIndexExp.Inc()
 	testutil.GatherAndCompare(t, expReg, reg, metricIndexGenerateName)
@@ -119,7 +121,9 @@ func TestCleanupDownsampleCacheFolder(t *testing.T) {
 	metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil, nil, nil)
 	testutil.Ok(t, err)
 
-	testutil.Ok(t, downsampleBucket(ctx, logger, metrics, bkt, metaFetcher, dir))
+	metas, _, err := metaFetcher.Fetch(ctx)
+	testutil.Ok(t, err)
+	testutil.Ok(t, downsampleBucket(ctx, logger, metrics, bkt, metas, dir))
 	testutil.Equals(t, 1.0, promtest.ToFloat64(metrics.downsamples.WithLabelValues(compact.GroupKey(meta.Thanos))))
 
 	_, err = os.Stat(dir)
diff --git a/pkg/block/block.go b/pkg/block/block.go
index ddc877fb57c..2bc7f86aaea 100644
--- a/pkg/block/block.go
+++ b/pkg/block/block.go
@@ -21,6 +21,7 @@ import (
 	"github.com/go-kit/kit/log/level"
 	"github.com/oklog/ulid"
 	"github.com/pkg/errors"
+	"github.com/prometheus/client_golang/prometheus"
 
 	"github.com/thanos-io/thanos/pkg/block/metadata"
 	"github.com/thanos-io/thanos/pkg/objstore"
@@ -129,14 +130,15 @@ func cleanUp(logger log.Logger, bkt objstore.Bucket, id ulid.ULID, err error) er
 }
 
 // MarkForDeletion creates a file which stores information about when the block was marked for deletion.
-func MarkForDeletion(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID) error {
+func MarkForDeletion(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID, markedForDeletion prometheus.Counter) error {
 	deletionMarkFile := path.Join(id.String(), metadata.DeletionMarkFilename)
 	deletionMarkExists, err := bkt.Exists(ctx, deletionMarkFile)
 	if err != nil {
 		return errors.Wrapf(err, "check exists %s in bucket", deletionMarkFile)
 	}
 	if deletionMarkExists {
-		return errors.Errorf("file %s already exists in bucket", deletionMarkFile)
+		level.Warn(logger).Log("msg", "requested to mark for deletion, but file already exists; this should not happen; investigate", "err", errors.Errorf("file %s already exists in bucket", deletionMarkFile))
+		return nil
 	}
 
 	deletionMark, err := json.Marshal(metadata.DeletionMark{
@@ -151,7 +153,7 @@ func MarkForDeletion(ctx context.Context, logger log.Logger, bkt objstore.Bucket
 	if err := bkt.Upload(ctx, deletionMarkFile, bytes.NewBuffer(deletionMark)); err != nil {
 		return errors.Wrapf(err, "upload file %s to bucket", deletionMarkFile)
 	}
-
+	markedForDeletion.Inc()
 	level.Info(logger).Log("msg", "block has been marked for deletion", "block", id)
 	return nil
 }
@@ -168,6 +170,7 @@ func Delete(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid
 	if err != nil {
 		return errors.Wrapf(err, "stat %s", metaFile)
 	}
+
 	if ok {
 		if err := bkt.Delete(ctx, metaFile); err != nil {
 			return errors.Wrapf(err, "delete %s", metaFile)
diff --git a/pkg/block/block_test.go b/pkg/block/block_test.go
index 503d09db75d..2c35197e7c9 100644
--- a/pkg/block/block_test.go
+++ b/pkg/block/block_test.go
@@ -7,7 +7,6 @@ import (
 	"bytes"
 	"context"
 	"encoding/json"
-	"fmt"
 	"io/ioutil"
 	"os"
 	"path"
@@ -17,6 +16,9 @@ import (
 
 	"github.com/fortytw2/leaktest"
 	"github.com/go-kit/kit/log"
+	"github.com/prometheus/client_golang/prometheus"
+	"github.com/prometheus/client_golang/prometheus/promauto"
+	promtest "github.com/prometheus/client_golang/prometheus/testutil"
 	"github.com/prometheus/prometheus/pkg/labels"
 	"github.com/thanos-io/thanos/pkg/block/metadata"
 	"github.com/thanos-io/thanos/pkg/objstore"
@@ -232,44 +234,50 @@ func TestMarkForDeletion(t *testing.T) {
 	testutil.Ok(t, err)
 	defer func() { testutil.Ok(t, os.RemoveAll(tmpDir)) }()
 
-	bkt := objstore.NewInMemBucket()
-	{
-		blockWithoutDeletionMark, err := e2eutil.CreateBlock(ctx, tmpDir, []labels.Labels{
-			{{Name: "a", Value: "1"}},
-			{{Name: "a", Value: "2"}},
-			{{Name: "a", Value: "3"}},
-			{{Name: "a", Value: "4"}},
-			{{Name: "b", Value: "1"}},
-		}, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "val1"}}, 124)
-		testutil.Ok(t, err)
-		testutil.Ok(t, Upload(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, blockWithoutDeletionMark.String())))
+	for _, tcase := range []struct {
+		name      string
+		preUpload func(t testing.TB, id ulid.ULID, bkt objstore.Bucket)
 
-		testutil.Ok(t, MarkForDeletion(ctx, log.NewNopLogger(), bkt, blockWithoutDeletionMark))
-		exists, err := bkt.Exists(ctx, path.Join(blockWithoutDeletionMark.String(), metadata.DeletionMarkFilename))
-		testutil.Ok(t, err)
-		testutil.Equals(t, true, exists)
-	}
-	{
-		blockWithDeletionMark, err := e2eutil.CreateBlock(ctx, tmpDir, []labels.Labels{
-			{{Name: "a", Value: "1"}},
-			{{Name: "a", Value: "2"}},
-			{{Name: "a", Value: "3"}},
-			{{Name: "a", Value: "4"}},
-			{{Name: "b", Value: "1"}},
-		}, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "val1"}}, 124)
-		testutil.Ok(t, err)
-		testutil.Ok(t, Upload(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, blockWithDeletionMark.String())))
+		blocksMarked int
+	}{
+		{
+			name:         "block marked for deletion",
+			preUpload:    func(t testing.TB, id ulid.ULID, bkt objstore.Bucket) {},
+			blocksMarked: 1,
+		},
+		{
+			name: "block with deletion mark already, expected log and no metric increment",
+			preUpload: func(t testing.TB, id ulid.ULID, bkt objstore.Bucket) {
+				deletionMark, err := json.Marshal(metadata.DeletionMark{
+					ID:           id,
+					DeletionTime: time.Now().Unix(),
+					Version:      metadata.DeletionMarkVersion1,
+				})
+				testutil.Ok(t, err)
+				testutil.Ok(t, bkt.Upload(ctx, path.Join(id.String(), metadata.DeletionMarkFilename), bytes.NewReader(deletionMark)))
+			},
+			blocksMarked: 0,
+		},
+	} {
+		t.Run(tcase.name, func(t *testing.T) {
+			bkt := objstore.NewInMemBucket()
+			id, err := e2eutil.CreateBlock(ctx, tmpDir, []labels.Labels{
+				{{Name: "a", Value: "1"}},
+				{{Name: "a", Value: "2"}},
+				{{Name: "a", Value: "3"}},
+				{{Name: "a", Value: "4"}},
+				{{Name: "b", Value: "1"}},
+			}, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "val1"}}, 124)
+			testutil.Ok(t, err)
 
-		deletionMark, err := json.Marshal(metadata.DeletionMark{
-			ID:           blockWithDeletionMark,
-			DeletionTime: time.Now().Unix(),
-			Version:      metadata.DeletionMarkVersion1,
-		})
-		testutil.Ok(t, err)
-		testutil.Ok(t, bkt.Upload(ctx, path.Join(blockWithDeletionMark.String(), metadata.DeletionMarkFilename), bytes.NewReader(deletionMark)))
+			tcase.preUpload(t, id, bkt)
 
-		err = MarkForDeletion(ctx, log.NewNopLogger(), bkt, blockWithDeletionMark)
-		testutil.NotOk(t, err)
-		testutil.Equals(t, fmt.Sprintf("file %s already exists in bucket", path.Join(blockWithDeletionMark.String(), metadata.DeletionMarkFilename)), err.Error())
+			testutil.Ok(t, Upload(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, id.String())))
+
+			c := promauto.With(nil).NewCounter(prometheus.CounterOpts{})
+			err = MarkForDeletion(ctx, log.NewNopLogger(), bkt, id, c)
+			testutil.Ok(t, err)
+			testutil.Equals(t, float64(tcase.blocksMarked), promtest.ToFloat64(c))
+		})
 	}
 }
diff --git a/pkg/block/fetcher.go b/pkg/block/fetcher.go
index 0a8ca111966..7f21d4d8016 100644
--- a/pkg/block/fetcher.go
+++ b/pkg/block/fetcher.go
@@ -189,8 +189,8 @@ func NewMetaFetcher(logger log.Logger, concurrency int, bkt objstore.Instrumente
 }
 
 // NewMetaFetcher transforms BaseFetcher into actually usable *MetaFetcher.
-func (f *BaseFetcher) NewMetaFetcher(reg prometheus.Registerer, filters []MetadataFilter, modifiers []MetadataModifier) *MetaFetcher {
-	return &MetaFetcher{metrics: newFetcherMetrics(reg), wrapped: f, filters: filters, modifiers: modifiers}
+func (f *BaseFetcher) NewMetaFetcher(reg prometheus.Registerer, filters []MetadataFilter, modifiers []MetadataModifier, logTags ...interface{}) *MetaFetcher {
+	return &MetaFetcher{metrics: newFetcherMetrics(reg), wrapped: f, filters: filters, modifiers: modifiers, logger: log.With(f.logger, logTags...)}
 }
 
 var (
@@ -448,7 +448,7 @@ func (f *BaseFetcher) fetch(ctx context.Context, metrics *fetcherMetrics, filter
 		return metas, resp.partial, errors.Wrap(resp.metaErrs, "incomplete view")
 	}
 
-	level.Debug(f.logger).Log("msg", "successfully fetched block metadata", "duration", time.Since(start).String(), "cached", len(f.cached), "returned", len(metas), "partial", len(resp.partial))
+	level.Info(f.logger).Log("msg", "successfully synchronized block metadata", "duration", time.Since(start).String(), "cached", len(f.cached), "returned", len(metas), "partial", len(resp.partial))
 	return metas, resp.partial, nil
 }
 
@@ -460,6 +460,8 @@ type MetaFetcher struct {
 	modifiers []MetadataModifier
 
 	listener func([]metadata.Meta, error)
+
+	logger log.Logger
 }
 
 // Fetch returns all block metas as well as partial blocks (blocks without or with corrupted meta file) from the bucket.
diff --git a/pkg/compact/blocks_cleaner.go b/pkg/compact/blocks_cleaner.go
index 820346a1bf9..73815051185 100644
--- a/pkg/compact/blocks_cleaner.go
+++ b/pkg/compact/blocks_cleaner.go
@@ -37,7 +37,8 @@ func NewBlocksCleaner(logger log.Logger, bkt objstore.Bucket, ignoreDeletionMark
 	}
 }
 
-// DeleteMarkedBlocks uses ignoreDeletionMarkFilter to delete the blocks that are marked for deletion.
+// DeleteMarkedBlocks uses ignoreDeletionMarkFilter to gather the blocks that are marked for deletion and deletes those
+// if older than given deleteDelay.
 func (s *BlocksCleaner) DeleteMarkedBlocks(ctx context.Context) error {
 	level.Info(s.logger).Log("msg", "started cleaning of blocks marked for deletion")
 
diff --git a/pkg/compact/clean.go b/pkg/compact/clean.go
index 14fdfc4d7cb..53cfb169083 100644
--- a/pkg/compact/clean.go
+++ b/pkg/compact/clean.go
@@ -21,12 +21,15 @@ const (
 	PartialUploadThresholdAge = 2 * 24 * time.Hour
 )
 
-func BestEffortCleanAbortedPartialUploads(ctx context.Context, logger log.Logger, fetcher block.MetadataFetcher, bkt objstore.Bucket, deleteAttempts prometheus.Counter, blocksMarkedForDeletion prometheus.Counter) {
+func BestEffortCleanAbortedPartialUploads(
+	ctx context.Context,
+	logger log.Logger,
+	partial map[ulid.ULID]error,
+	bkt objstore.Bucket,
+	deleteAttempts prometheus.Counter,
+	blocksMarkedForDeletion prometheus.Counter,
+) {
 	level.Info(logger).Log("msg", "started cleaning of aborted partial uploads")
-	_, partial, err := fetcher.Fetch(ctx)
-	if err != nil {
-		level.Warn(logger).Log("msg", "failed to fetch metadata for cleaning of aborted partial uploads; skipping", "err", err)
-	}
 
 	// Delete partial blocks that are older than partialUploadThresholdAge.
 	// TODO(bwplotka): This is can cause data loss if blocks are:
@@ -41,11 +44,11 @@ func BestEffortCleanAbortedPartialUploads(ctx context.Context, logger log.Logger
 		}
 
 		deleteAttempts.Inc()
-		if err := block.MarkForDeletion(ctx, logger, bkt, id); err != nil {
+		level.Info(logger).Log("msg", "found partially uploaded block; marking for deletion", "block", id)
+		if err := block.MarkForDeletion(ctx, logger, bkt, id, blocksMarkedForDeletion); err != nil {
 			level.Warn(logger).Log("msg", "failed to delete aborted partial upload; skipping", "block", id, "thresholdAge", PartialUploadThresholdAge, "err", err)
 			return
 		}
-		blocksMarkedForDeletion.Inc()
 		level.Info(logger).Log("msg", "deleted aborted partial upload", "block", id, "thresholdAge", PartialUploadThresholdAge)
 	}
 	level.Info(logger).Log("msg", "cleaning of aborted partial uploads done")
diff --git a/pkg/compact/clean_test.go b/pkg/compact/clean_test.go
index 7d80b032015..1321c888ba6 100644
--- a/pkg/compact/clean_test.go
+++ b/pkg/compact/clean_test.go
@@ -60,7 +60,11 @@ func TestBestEffortCleanAbortedPartialUploads(t *testing.T) {
 
 	deleteAttempts := promauto.With(nil).NewCounter(prometheus.CounterOpts{})
 	blocksMarkedForDeletion := promauto.With(nil).NewCounter(prometheus.CounterOpts{})
-	BestEffortCleanAbortedPartialUploads(ctx, logger, metaFetcher, bkt, deleteAttempts, blocksMarkedForDeletion)
+
+	_, partial, err := metaFetcher.Fetch(ctx)
+	testutil.Ok(t, err)
+
+	BestEffortCleanAbortedPartialUploads(ctx, logger, partial, bkt, deleteAttempts, blocksMarkedForDeletion)
 	testutil.Equals(t, 1.0, promtest.ToFloat64(deleteAttempts))
 
 	exists, err := bkt.Exists(ctx, path.Join(shouldDeleteID.String(), "chunks", "000001"))
diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go
index 24d2c86ce1e..134e193d172 100644
--- a/pkg/compact/compact.go
+++ b/pkg/compact/compact.go
@@ -46,6 +46,7 @@ type Syncer struct {
 	fetcher                  block.MetadataFetcher
 	mtx                      sync.Mutex
 	blocks                   map[ulid.ULID]*metadata.Meta
+	partial                  map[ulid.ULID]error
 	blockSyncConcurrency     int
 	metrics                  *syncerMetrics
 	acceptMalformedIndex     bool
@@ -153,19 +154,36 @@ func UntilNextDownsampling(m *metadata.Meta) (time.Duration, error) {
 	}
 }
 
+// SyncMetas synchronises local state of block metas with what we have in the bucket.
 func (s *Syncer) SyncMetas(ctx context.Context) error {
 	s.mtx.Lock()
 	defer s.mtx.Unlock()
 
-	metas, _, err := s.fetcher.Fetch(ctx)
+	metas, partial, err := s.fetcher.Fetch(ctx)
 	if err != nil {
 		return retry(err)
 	}
 	s.blocks = metas
-
+	s.partial = partial
 	return nil
 }
 
+// Partial returns partial blocks since last sync.
+func (s *Syncer) Partial() map[ulid.ULID]error {
+	s.mtx.Lock()
+	defer s.mtx.Unlock()
+
+	return s.partial
+}
+
+// Metas returns loaded metadata blocks since last sync.
+func (s *Syncer) Metas() map[ulid.ULID]*metadata.Meta {
+	s.mtx.Lock()
+	defer s.mtx.Unlock()
+
+	return s.blocks
+}
+
 // GroupKey returns a unique identifier for the group the block belongs to. It considers
 // the downsampling resolution and the block's labels.
 func GroupKey(meta metadata.Thanos) string {
@@ -228,16 +246,18 @@ func (s *Syncer) GarbageCollect(ctx context.Context) error {
 
 	begin := time.Now()
 
-	duplicateIDs := s.duplicateBlocksFilter.DuplicateIDs()
+	// Ignore filter exists before deduplicate filter.
 	deletionMarkMap := s.ignoreDeletionMarkFilter.DeletionMarkBlocks()
+	duplicateIDs := s.duplicateBlocksFilter.DuplicateIDs()
 
 	// GarbageIDs contains the duplicateIDs, since these blocks can be replaced with other blocks.
-	// We also remove ids present in deletionMarkMap since these blocks are already marked for deletion.
+	// We also remove ids present in deletionMarkMapAND  01E4NVH3TCJB3QA3WNANTYH66F since these blocks are already marked for deletion.
 	garbageIDs := []ulid.ULID{}
 	for _, id := range duplicateIDs {
-		if _, exists := deletionMarkMap[id]; !exists {
-			garbageIDs = append(garbageIDs, id)
+		if _, exists := deletionMarkMap[id]; exists {
+			continue
 		}
+		garbageIDs = append(garbageIDs, id)
 	}
 
 	for _, id := range garbageIDs {
@@ -249,14 +269,12 @@ func (s *Syncer) GarbageCollect(ctx context.Context) error {
 		delCtx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
 
 		level.Info(s.logger).Log("msg", "marking outdated block for deletion", "block", id)
-
-		err := block.MarkForDeletion(delCtx, s.logger, s.bkt, id)
+		err := block.MarkForDeletion(delCtx, s.logger, s.bkt, id, s.metrics.blocksMarkedForDeletion)
 		cancel()
 		if err != nil {
 			s.metrics.garbageCollectionFailures.Inc()
-			return retry(errors.Wrapf(err, "delete block %s from bucket", id))
+			return retry(errors.Wrapf(err, "mark block %s for deletion", id))
 		}
-		s.metrics.blocksMarkedForDeletion.Inc()
 
 		// Immediately update our in-memory state so no further call to SyncMetas is needed
 		// after running garbage collection.
@@ -568,10 +586,9 @@ func RepairIssue347(ctx context.Context, logger log.Logger, bkt objstore.Bucket,
 	defer cancel()
 
 	// TODO(bplotka): Issue with this will introduce overlap that will halt compactor. Automate that (fix duplicate overlaps caused by this).
-	if err := block.MarkForDeletion(delCtx, logger, bkt, ie.id); err != nil {
+	if err := block.MarkForDeletion(delCtx, logger, bkt, ie.id, blocksMarkedForDeletion); err != nil {
 		return errors.Wrapf(err, "deleting old block %s failed. You need to delete this block manually", ie.id)
 	}
-	blocksMarkedForDeletion.Inc()
 	return nil
 }
 
@@ -702,7 +719,7 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) (
 	if overlappingBlocks {
 		cg.verticalCompactions.Inc()
 	}
-	level.Info(cg.logger).Log("msg", "compacted blocks",
+	level.Info(cg.logger).Log("msg", "compacted blocks", "new", compID,
 		"blocks", fmt.Sprintf("%v", plan), "duration", time.Since(begin), "overlapping_blocks", overlappingBlocks)
 
 	bdir := filepath.Join(dir, compID.String())
@@ -773,10 +790,9 @@ func (cg *Group) deleteBlock(b string) error {
 	delCtx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
 	defer cancel()
 	level.Info(cg.logger).Log("msg", "marking compacted block for deletion", "old_block", id)
-	if err := block.MarkForDeletion(delCtx, cg.logger, cg.bkt, id); err != nil {
+	if err := block.MarkForDeletion(delCtx, cg.logger, cg.bkt, id, cg.blocksMarkedForDeletion); err != nil {
 		return errors.Wrapf(err, "delete block %s from bucket", id)
 	}
-	cg.blocksMarkedForDeletion.Inc()
 	return nil
 }
 
@@ -869,29 +885,26 @@ func (c *BucketCompactor) Compact(ctx context.Context) error {
 		}
 
 		level.Info(c.logger).Log("msg", "start sync of metas")
-
 		if err := c.sy.SyncMetas(ctx); err != nil {
 			return errors.Wrap(err, "sync")
 		}
 
 		level.Info(c.logger).Log("msg", "start of GC")
-
 		// Blocks that were compacted are garbage collected after each Compaction.
 		// However if compactor crashes we need to resolve those on startup.
 		if err := c.sy.GarbageCollect(ctx); err != nil {
 			return errors.Wrap(err, "garbage")
 		}
 
-		level.Info(c.logger).Log("msg", "start of compaction")
-
 		groups, err := c.sy.Groups()
 		if err != nil {
 			return errors.Wrap(err, "build compaction groups")
 		}
 
+		level.Info(c.logger).Log("msg", "start of compactions")
+
 		// Send all groups found during this pass to the compaction workers.
 		var groupErrs terrors.MultiError
-
 	groupLoop:
 		for _, g := range groups {
 			select {
diff --git a/pkg/compact/retention.go b/pkg/compact/retention.go
index eb07507d9d7..8d1ba7d5fb7 100644
--- a/pkg/compact/retention.go
+++ b/pkg/compact/retention.go
@@ -9,21 +9,25 @@ import (
 
 	"github.com/go-kit/kit/log"
 	"github.com/go-kit/kit/log/level"
+	"github.com/oklog/ulid"
 	"github.com/pkg/errors"
 	"github.com/prometheus/client_golang/prometheus"
 	"github.com/thanos-io/thanos/pkg/block"
+	"github.com/thanos-io/thanos/pkg/block/metadata"
 	"github.com/thanos-io/thanos/pkg/objstore"
 )
 
 // ApplyRetentionPolicyByResolution removes blocks depending on the specified retentionByResolution based on blocks MaxTime.
 // A value of 0 disables the retention for its resolution.
-func ApplyRetentionPolicyByResolution(ctx context.Context, logger log.Logger, bkt objstore.Bucket, fetcher block.MetadataFetcher, retentionByResolution map[ResolutionLevel]time.Duration, blocksMarkedForDeletion prometheus.Counter) error {
+func ApplyRetentionPolicyByResolution(
+	ctx context.Context,
+	logger log.Logger,
+	bkt objstore.Bucket,
+	metas map[ulid.ULID]*metadata.Meta,
+	retentionByResolution map[ResolutionLevel]time.Duration,
+	blocksMarkedForDeletion prometheus.Counter,
+) error {
 	level.Info(logger).Log("msg", "start optional retention")
-	metas, _, err := fetcher.Fetch(ctx)
-	if err != nil {
-		return errors.Wrap(err, "fetch metas")
-	}
-
 	for id, m := range metas {
 		retentionDuration := retentionByResolution[ResolutionLevel(m.Thanos.Downsample.Resolution)]
 		if retentionDuration.Seconds() == 0 {
@@ -33,13 +37,11 @@ func ApplyRetentionPolicyByResolution(ctx context.Context, logger log.Logger, bk
 		maxTime := time.Unix(m.MaxTime/1000, 0)
 		if time.Now().After(maxTime.Add(retentionDuration)) {
 			level.Info(logger).Log("msg", "applying retention: marking block for deletion", "id", id, "maxTime", maxTime.String())
-			if err := block.MarkForDeletion(ctx, logger, bkt, id); err != nil {
+			if err := block.MarkForDeletion(ctx, logger, bkt, id, blocksMarkedForDeletion); err != nil {
 				return errors.Wrap(err, "delete block")
 			}
-			blocksMarkedForDeletion.Inc()
 		}
 	}
-
 	level.Info(logger).Log("msg", "optional retention apply done")
 	return nil
 }
diff --git a/pkg/compact/retention_test.go b/pkg/compact/retention_test.go
index ca27429890e..8d3cee397d8 100644
--- a/pkg/compact/retention_test.go
+++ b/pkg/compact/retention_test.go
@@ -248,7 +248,11 @@ func TestApplyRetentionPolicyByResolution(t *testing.T) {
 			testutil.Ok(t, err)
 
 			blocksMarkedForDeletion := promauto.With(nil).NewCounter(prometheus.CounterOpts{})
-			if err := compact.ApplyRetentionPolicyByResolution(ctx, logger, bkt, metaFetcher, tt.retentionByResolution, blocksMarkedForDeletion); (err != nil) != tt.wantErr {
+
+			metas, _, err := metaFetcher.Fetch(ctx)
+			testutil.Ok(t, err)
+
+			if err := compact.ApplyRetentionPolicyByResolution(ctx, logger, bkt, metas, tt.retentionByResolution, blocksMarkedForDeletion); (err != nil) != tt.wantErr {
 				t.Errorf("ApplyRetentionPolicyByResolution() error = %v, wantErr %v", err, tt.wantErr)
 			}
 
diff --git a/pkg/verifier/safe_delete.go b/pkg/verifier/safe_delete.go
index b124317e290..5e5e5e53161 100644
--- a/pkg/verifier/safe_delete.go
+++ b/pkg/verifier/safe_delete.go
@@ -80,11 +80,9 @@ func BackupAndDelete(ctx context.Context, logger log.Logger, bkt, backupBkt objs
 	}
 
 	level.Info(logger).Log("msg", "Marking block as deleted", "id", id.String())
-	if err := block.MarkForDeletion(ctx, logger, bkt, id); err != nil {
+	if err := block.MarkForDeletion(ctx, logger, bkt, id, blocksMarkedForDeletion); err != nil {
 		return errors.Wrap(err, "marking delete from source")
 	}
-	blocksMarkedForDeletion.Inc()
-
 	return nil
 }
 
@@ -119,10 +117,9 @@ func BackupAndDeleteDownloaded(ctx context.Context, logger log.Logger, bdir stri
 	}
 
 	level.Info(logger).Log("msg", "Marking block as deleted", "id", id.String())
-	if err := block.MarkForDeletion(ctx, logger, bkt, id); err != nil {
+	if err := block.MarkForDeletion(ctx, logger, bkt, id, blocksMarkedForDeletion); err != nil {
 		return errors.Wrap(err, "marking delete from source")
 	}
-	blocksMarkedForDeletion.Inc()
 	return nil
 }