From 497de41b09a48ae229b839d6511ae012b7d6f8ed Mon Sep 17 00:00:00 2001
From: Bartlomiej Plotka <bwplotka@gmail.com>
Date: Mon, 6 Jan 2020 13:01:29 +0000
Subject: [PATCH] Use block.MetaFetcher in Compactor. (#1937)

Fixes: https://github.com/thanos-io/thanos/issues/1335
Fixes: https://github.com/thanos-io/thanos/issues/1919
Fixes: https://github.com/thanos-io/thanos/issues/1300

* Clean up of meta files are now started only if block which is being uploaded is older than 2 days (only a mitigation).
* Blocks without meta.json are handled properly for all compactor phases.
* Prepare for future implementation of https://thanos.io/proposals/201901-read-write-operations-bucket.md/
* Added metric for partialUploadAttempt deletions and delayed it.
* More tests.


Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
---
 cmd/thanos/compact.go                         | 117 ++++----
 cmd/thanos/downsample.go                      |  31 +-
 cmd/thanos/main_test.go                       |  10 +-
 docs/components/compact.md                    |   2 +-
 pkg/block/fetcher.go                          |   1 -
 pkg/compact/clean.go                          |  48 ++++
 pkg/compact/clean_test.go                     |  72 +++++
 pkg/compact/compact.go                        | 269 +++---------------
 pkg/compact/compact_e2e_test.go               | 155 +---------
 pkg/compact/compact_test.go                   |  42 ---
 .../downsample/streamed_block_writer.go       |   2 +-
 pkg/compact/retention.go                      |  22 +-
 pkg/compact/retention_test.go                 |   7 +-
 13 files changed, 274 insertions(+), 504 deletions(-)
 create mode 100644 pkg/compact/clean.go
 create mode 100644 pkg/compact/clean_test.go

diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go
index 8d9c4e9222..0dc8c4d58b 100644
--- a/cmd/thanos/compact.go
+++ b/cmd/thanos/compact.go
@@ -13,6 +13,7 @@ import (
 	"github.com/go-kit/kit/log"
 	"github.com/go-kit/kit/log/level"
 	"github.com/oklog/run"
+	"github.com/oklog/ulid"
 	"github.com/opentracing/opentracing-go"
 	"github.com/pkg/errors"
 	"github.com/prometheus/client_golang/prometheus"
@@ -23,6 +24,7 @@ import (
 	"github.com/thanos-io/thanos/pkg/compact/downsample"
 	"github.com/thanos-io/thanos/pkg/component"
 	"github.com/thanos-io/thanos/pkg/extflag"
+	"github.com/thanos-io/thanos/pkg/extprom"
 	"github.com/thanos-io/thanos/pkg/objstore"
 	"github.com/thanos-io/thanos/pkg/objstore/client"
 	"github.com/thanos-io/thanos/pkg/prober"
@@ -31,6 +33,11 @@ import (
 	"gopkg.in/alecthomas/kingpin.v2"
 )
 
+const (
+	metricIndexGenerateName = "thanos_compact_generated_index_total"
+	metricIndexGenerateHelp = "Total number of generated indexes."
+)
+
 var (
 	compactions = compactionSet{
 		1 * time.Hour,
@@ -85,7 +92,7 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application) {
 
 	objStoreConfig := regCommonObjStoreFlags(cmd, "", true)
 
-	consistencyDelay := modelDuration(cmd.Flag("consistency-delay", fmt.Sprintf("Minimum age of fresh (non-compacted) blocks before they are being processed. Malformed blocks older than the maximum of consistency-delay and %s will be removed.", compact.MinimumAgeForRemoval)).
+	consistencyDelay := modelDuration(cmd.Flag("consistency-delay", fmt.Sprintf("Minimum age of fresh (non-compacted) blocks before they are being processed. Malformed blocks older than the maximum of consistency-delay and %v will be removed.", compact.PartialUploadThresholdAge)).
 		Default("30m"))
 
 	retentionRaw := modelDuration(cmd.Flag("retention.resolution-raw", "How long to retain raw samples in bucket. 0d - disables this retention").Default("0d"))
@@ -162,21 +169,28 @@ func runCompact(
 ) error {
 	halted := prometheus.NewGauge(prometheus.GaugeOpts{
 		Name: "thanos_compactor_halted",
-		Help: "Set to 1 if the compactor halted due to an unexpected error",
+		Help: "Set to 1 if the compactor halted due to an unexpected error.",
 	})
+	halted.Set(0)
 	retried := prometheus.NewCounter(prometheus.CounterOpts{
 		Name: "thanos_compactor_retries_total",
-		Help: "Total number of retries after retriable compactor error",
+		Help: "Total number of retries after retriable compactor error.",
 	})
 	iterations := prometheus.NewCounter(prometheus.CounterOpts{
 		Name: "thanos_compactor_iterations_total",
-		Help: "Total number of iterations that were executed successfully",
+		Help: "Total number of iterations that were executed successfully.",
 	})
-	halted.Set(0)
-
-	reg.MustRegister(halted)
-	reg.MustRegister(retried)
-	reg.MustRegister(iterations)
+	consistencyDelayMetric := prometheus.NewGaugeFunc(prometheus.GaugeOpts{
+		Name: "thanos_consistency_delay_seconds",
+		Help: "Configured consistency delay in seconds.",
+	}, func() float64 {
+		return consistencyDelay.Seconds()
+	})
+	partialUploadDeleteAttempts := prometheus.NewCounter(prometheus.CounterOpts{
+		Name: "thanos_compactor_aborted_partial_uploads_deletion_attempts_total",
+		Help: "Total number of started deletions of blocks that are assumed aborted and only partially uploaded.",
+	})
+	reg.MustRegister(halted, retried, iterations, consistencyDelayMetric, partialUploadDeleteAttempts)
 
 	downsampleMetrics := newDownsampleMetrics(reg)
 
@@ -225,8 +239,15 @@ func runCompact(
 		}
 	}()
 
-	sy, err := compact.NewSyncer(logger, reg, bkt, consistencyDelay,
-		blockSyncConcurrency, acceptMalformedIndex, false, relabelConfig)
+	metaFetcher, err := block.NewMetaFetcher(logger, 32, bkt, "", extprom.WrapRegistererWithPrefix("thanos_", reg),
+		block.NewLabelShardedMetaFilter(relabelConfig).Filter,
+		(&consistencyDelayMetaFilter{logger: logger, consistencyDelay: consistencyDelay}).Filter,
+	)
+	if err != nil {
+		return errors.Wrap(err, "create meta fetcher")
+	}
+
+	sy, err := compact.NewSyncer(logger, reg, bkt, metaFetcher, blockSyncConcurrency, acceptMalformedIndex, false)
 	if err != nil {
 		return errors.Wrap(err, "create syncer")
 	}
@@ -276,26 +297,24 @@ func runCompact(
 		level.Info(logger).Log("msg", "retention policy of 1 hour aggregated samples is enabled", "duration", retentionByResolution[compact.ResolutionLevel1h])
 	}
 
-	f := func() error {
+	compactMainFn := func() error {
 		if err := compactor.Compact(ctx); err != nil {
 			return errors.Wrap(err, "compaction failed")
 		}
-		level.Info(logger).Log("msg", "compaction iterations done")
 
-		// TODO(bplotka): Remove "disableDownsampling" once https://github.com/thanos-io/thanos/issues/297 is fixed.
 		if !disableDownsampling {
 			// After all compactions are done, work down the downsampling backlog.
 			// We run two passes of this to ensure that the 1h downsampling is generated
 			// for 5m downsamplings created in the first run.
 			level.Info(logger).Log("msg", "start first pass of downsampling")
 
-			if err := downsampleBucket(ctx, logger, downsampleMetrics, bkt, downsamplingDir); err != nil {
+			if err := downsampleBucket(ctx, logger, downsampleMetrics, bkt, metaFetcher, downsamplingDir); err != nil {
 				return errors.Wrap(err, "first pass of downsampling failed")
 			}
 
 			level.Info(logger).Log("msg", "start second pass of downsampling")
 
-			if err := downsampleBucket(ctx, logger, downsampleMetrics, bkt, downsamplingDir); err != nil {
+			if err := downsampleBucket(ctx, logger, downsampleMetrics, bkt, metaFetcher, downsamplingDir); err != nil {
 				return errors.Wrap(err, "second pass of downsampling failed")
 			}
 			level.Info(logger).Log("msg", "downsampling iterations done")
@@ -303,9 +322,11 @@ func runCompact(
 			level.Warn(logger).Log("msg", "downsampling was explicitly disabled")
 		}
 
-		if err := compact.ApplyRetentionPolicyByResolution(ctx, logger, bkt, retentionByResolution); err != nil {
+		if err := compact.ApplyRetentionPolicyByResolution(ctx, logger, bkt, metaFetcher, retentionByResolution); err != nil {
 			return errors.Wrap(err, fmt.Sprintf("retention failed"))
 		}
+
+		compact.BestEffortCleanAbortedPartialUploads(ctx, logger, metaFetcher, bkt, partialUploadDeleteAttempts)
 		return nil
 	}
 
@@ -314,18 +335,18 @@ func runCompact(
 
 		// Generate index file.
 		if generateMissingIndexCacheFiles {
-			if err := genMissingIndexCacheFiles(ctx, logger, reg, bkt, indexCacheDir); err != nil {
+			if err := genMissingIndexCacheFiles(ctx, logger, reg, bkt, metaFetcher, indexCacheDir); err != nil {
 				return err
 			}
 		}
 
 		if !wait {
-			return f()
+			return compactMainFn()
 		}
 
 		// --wait=true is specified.
 		return runutil.Repeat(5*time.Minute, ctx.Done(), func() error {
-			err := f()
+			err := compactMainFn()
 			if err == nil {
 				iterations.Inc()
 				return nil
@@ -363,13 +384,27 @@ func runCompact(
 	return nil
 }
 
-const (
-	metricIndexGenerateName = "thanos_compact_generated_index_total"
-	metricIndexGenerateHelp = "Total number of generated indexes."
-)
+type consistencyDelayMetaFilter struct {
+	logger           log.Logger
+	consistencyDelay time.Duration
+}
+
+func (f *consistencyDelayMetaFilter) Filter(metas map[ulid.ULID]*metadata.Meta, synced block.GaugeLabeled, _ bool) {
+	for id, meta := range metas {
+		if ulid.Now()-id.Time() < uint64(f.consistencyDelay/time.Millisecond) &&
+			meta.Thanos.Source != metadata.BucketRepairSource &&
+			meta.Thanos.Source != metadata.CompactorSource &&
+			meta.Thanos.Source != metadata.CompactorRepairSource {
+
+			level.Debug(f.logger).Log("msg", "block is too fresh for now", "block", id)
+			synced.WithLabelValues(block.TooFreshMeta).Inc()
+			delete(metas, id)
+		}
+	}
+}
 
 // genMissingIndexCacheFiles scans over all blocks, generates missing index cache files and uploads them to object storage.
-func genMissingIndexCacheFiles(ctx context.Context, logger log.Logger, reg *prometheus.Registry, bkt objstore.Bucket, dir string) error {
+func genMissingIndexCacheFiles(ctx context.Context, logger log.Logger, reg *prometheus.Registry, bkt objstore.Bucket, fetcher block.MetadataFetcher, dir string) error {
 	genIndex := prometheus.NewCounter(prometheus.CounterOpts{
 		Name: metricIndexGenerateName,
 		Help: metricIndexGenerateHelp,
@@ -391,38 +426,18 @@ func genMissingIndexCacheFiles(ctx context.Context, logger log.Logger, reg *prom
 
 	level.Info(logger).Log("msg", "start index cache processing")
 
-	var metas []*metadata.Meta
-
-	if err := bkt.Iter(ctx, "", func(name string) error {
-		id, ok := block.IsBlockDir(name)
-		if !ok {
-			return nil
-		}
-
-		meta, err := block.DownloadMeta(ctx, logger, bkt, id)
-		if err != nil {
-			// Probably not finished block, skip it.
-			if bkt.IsObjNotFoundErr(errors.Cause(err)) {
-				level.Warn(logger).Log("msg", "meta file wasn't found", "block", id.String())
-				return nil
-			}
-			return errors.Wrap(err, "download metadata")
-		}
+	metas, _, err := fetcher.Fetch(ctx)
+	if err != nil {
+		return errors.Wrap(err, "fetch metas")
+	}
 
+	for _, meta := range metas {
 		// New version of compactor pushes index cache along with data block.
 		// Skip uncompacted blocks.
 		if meta.Compaction.Level == 1 {
-			return nil
+			continue
 		}
 
-		metas = append(metas, &meta)
-
-		return nil
-	}); err != nil {
-		return errors.Wrap(err, "retrieve bucket block metas")
-	}
-
-	for _, meta := range metas {
 		if err := generateIndexCacheFile(ctx, bkt, logger, dir, meta); err != nil {
 			return err
 		}
diff --git a/cmd/thanos/downsample.go b/cmd/thanos/downsample.go
index 5c053d7e6e..00de9e57cb 100644
--- a/cmd/thanos/downsample.go
+++ b/cmd/thanos/downsample.go
@@ -21,6 +21,7 @@ import (
 	"github.com/thanos-io/thanos/pkg/compact/downsample"
 	"github.com/thanos-io/thanos/pkg/component"
 	"github.com/thanos-io/thanos/pkg/extflag"
+	"github.com/thanos-io/thanos/pkg/extprom"
 	"github.com/thanos-io/thanos/pkg/objstore"
 	"github.com/thanos-io/thanos/pkg/objstore/client"
 	"github.com/thanos-io/thanos/pkg/prober"
@@ -88,6 +89,11 @@ func runDownsample(
 		return err
 	}
 
+	metaFetcher, err := block.NewMetaFetcher(logger, 32, bkt, "", extprom.WrapRegistererWithPrefix("thanos_", reg))
+	if err != nil {
+		return errors.Wrap(err, "create meta fetcher")
+	}
+
 	// Ensure we close up everything properly.
 	defer func() {
 		if err != nil {
@@ -107,13 +113,13 @@ func runDownsample(
 
 			level.Info(logger).Log("msg", "start first pass of downsampling")
 
-			if err := downsampleBucket(ctx, logger, metrics, bkt, dataDir); err != nil {
+			if err := downsampleBucket(ctx, logger, metrics, bkt, metaFetcher, dataDir); err != nil {
 				return errors.Wrap(err, "downsampling failed")
 			}
 
 			level.Info(logger).Log("msg", "start second pass of downsampling")
 
-			if err := downsampleBucket(ctx, logger, metrics, bkt, dataDir); err != nil {
+			if err := downsampleBucket(ctx, logger, metrics, bkt, metaFetcher, dataDir); err != nil {
 				return errors.Wrap(err, "downsampling failed")
 			}
 
@@ -148,6 +154,7 @@ func downsampleBucket(
 	logger log.Logger,
 	metrics *DownsampleMetrics,
 	bkt objstore.Bucket,
+	fetcher block.MetadataFetcher,
 	dir string,
 ) error {
 	if err := os.RemoveAll(dir); err != nil {
@@ -163,25 +170,9 @@ func downsampleBucket(
 		}
 	}()
 
-	var metas []*metadata.Meta
-
-	err := bkt.Iter(ctx, "", func(name string) error {
-		id, ok := block.IsBlockDir(name)
-		if !ok {
-			return nil
-		}
-
-		m, err := block.DownloadMeta(ctx, logger, bkt, id)
-		if err != nil {
-			return errors.Wrap(err, "download metadata")
-		}
-
-		metas = append(metas, &m)
-
-		return nil
-	})
+	metas, _, err := fetcher.Fetch(ctx)
 	if err != nil {
-		return errors.Wrap(err, "retrieve bucket block metas")
+		return errors.Wrap(err, "downsampling meta fetch")
 	}
 
 	// mapping from a hash over all source IDs to blocks. We don't need to downsample a block
diff --git a/cmd/thanos/main_test.go b/cmd/thanos/main_test.go
index ea4f7063e1..f6aedb48eb 100644
--- a/cmd/thanos/main_test.go
+++ b/cmd/thanos/main_test.go
@@ -75,7 +75,10 @@ func TestCleanupIndexCacheFolder(t *testing.T) {
 	})
 	expReg.MustRegister(genIndexExp)
 
-	testutil.Ok(t, genMissingIndexCacheFiles(ctx, logger, reg, bkt, dir))
+	metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil)
+	testutil.Ok(t, err)
+
+	testutil.Ok(t, genMissingIndexCacheFiles(ctx, logger, reg, bkt, metaFetcher, dir))
 
 	genIndexExp.Inc()
 	testutil.GatherAndCompare(t, expReg, reg, metricIndexGenerateName)
@@ -112,7 +115,10 @@ func TestCleanupDownsampleCacheFolder(t *testing.T) {
 
 	metrics := newDownsampleMetrics(prometheus.NewRegistry())
 	testutil.Equals(t, 0.0, promtest.ToFloat64(metrics.downsamples.WithLabelValues(compact.GroupKey(meta.Thanos))))
-	testutil.Ok(t, downsampleBucket(ctx, logger, metrics, bkt, dir))
+	metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil)
+	testutil.Ok(t, err)
+
+	testutil.Ok(t, downsampleBucket(ctx, logger, metrics, bkt, metaFetcher, dir))
 	testutil.Equals(t, 1.0, promtest.ToFloat64(metrics.downsamples.WithLabelValues(compact.GroupKey(meta.Thanos))))
 
 	_, err = os.Stat(dir)
diff --git a/docs/components/compact.md b/docs/components/compact.md
index 070d82b7b0..440ad9f1c9 100644
--- a/docs/components/compact.md
+++ b/docs/components/compact.md
@@ -103,7 +103,7 @@ Flags:
       --consistency-delay=30m  Minimum age of fresh (non-compacted) blocks
                                before they are being processed. Malformed blocks
                                older than the maximum of consistency-delay and
-                               30m0s will be removed.
+                               48h0m0s will be removed.
       --retention.resolution-raw=0d
                                How long to retain raw samples in bucket. 0d -
                                disables this retention
diff --git a/pkg/block/fetcher.go b/pkg/block/fetcher.go
index 7e2d5a5423..3fc658e3ba 100644
--- a/pkg/block/fetcher.go
+++ b/pkg/block/fetcher.go
@@ -223,7 +223,6 @@ func (s *MetaFetcher) loadMeta(ctx context.Context, id ulid.ULID) (*metadata.Met
 			level.Warn(s.logger).Log("msg", "best effort save of the meta.json to local dir failed; ignoring", "dir", cachedBlockDir, "err", err)
 		}
 	}
-
 	return m, nil
 }
 
diff --git a/pkg/compact/clean.go b/pkg/compact/clean.go
new file mode 100644
index 0000000000..7fa085eebc
--- /dev/null
+++ b/pkg/compact/clean.go
@@ -0,0 +1,48 @@
+package compact
+
+import (
+	"context"
+	"time"
+
+	"github.com/go-kit/kit/log"
+	"github.com/go-kit/kit/log/level"
+	"github.com/oklog/ulid"
+	"github.com/prometheus/client_golang/prometheus"
+	"github.com/thanos-io/thanos/pkg/block"
+	"github.com/thanos-io/thanos/pkg/objstore"
+)
+
+const (
+	// PartialUploadThresholdAge is a time after partial block is assumed aborted and ready to be cleaned.
+	// Keep it long as it is based on block creation time not upload start time.
+	PartialUploadThresholdAge = 2 * 24 * time.Hour
+)
+
+func BestEffortCleanAbortedPartialUploads(ctx context.Context, logger log.Logger, fetcher block.MetadataFetcher, bkt objstore.Bucket, deleteAttempts prometheus.Counter) {
+	level.Info(logger).Log("msg", "started cleaning of aborted partial uploads")
+	_, partial, err := fetcher.Fetch(ctx)
+	if err != nil {
+		level.Warn(logger).Log("msg", "failed to fetch metadata for cleaning of aborted partial uploads; skipping", "err", err)
+	}
+
+	// Delete partial blocks that are older than partialUploadThresholdAge.
+	// TODO(bwplotka): This is can cause data loss if blocks are:
+	// * being uploaded longer than partialUploadThresholdAge
+	// * being uploaded and started after their partialUploadThresholdAge
+	// can be assumed in this case. Keep partialUploadThresholdAge long for now.
+	// Mitigate this by adding ModifiedTime to bkt and check that instead of ULID (block creation time).
+	for id := range partial {
+		if ulid.Now()-id.Time() <= uint64(PartialUploadThresholdAge/time.Millisecond) {
+			// Minimum delay has not expired, ignore for now.
+			continue
+		}
+
+		deleteAttempts.Inc()
+		if err := block.Delete(ctx, logger, bkt, id); err != nil {
+			level.Warn(logger).Log("msg", "failed to delete aborted partial upload; skipping", "block", id, "thresholdAge", PartialUploadThresholdAge, "err", err)
+			return
+		}
+		level.Info(logger).Log("msg", "deleted aborted partial upload", "block", id, "thresholdAge", PartialUploadThresholdAge)
+	}
+	level.Info(logger).Log("msg", "cleaning of aborted partial uploads done")
+}
diff --git a/pkg/compact/clean_test.go b/pkg/compact/clean_test.go
new file mode 100644
index 0000000000..5332da7637
--- /dev/null
+++ b/pkg/compact/clean_test.go
@@ -0,0 +1,72 @@
+package compact
+
+import (
+	"bytes"
+	"context"
+	"encoding/json"
+	"path"
+	"testing"
+	"time"
+
+	"github.com/go-kit/kit/log"
+	"github.com/oklog/ulid"
+	"github.com/prometheus/client_golang/prometheus"
+	promtest "github.com/prometheus/client_golang/prometheus/testutil"
+	"github.com/thanos-io/thanos/pkg/block"
+	"github.com/thanos-io/thanos/pkg/block/metadata"
+	"github.com/thanos-io/thanos/pkg/objstore/inmem"
+	"github.com/thanos-io/thanos/pkg/testutil"
+)
+
+func TestBestEffortCleanAbortedPartialUploads(t *testing.T) {
+	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+	defer cancel()
+
+	bkt := inmem.NewBucket()
+	logger := log.NewNopLogger()
+
+	metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil)
+	testutil.Ok(t, err)
+
+	// 1. No meta, old block, should be removed.
+	shouldDeleteID, err := ulid.New(uint64(time.Now().Add(-PartialUploadThresholdAge-1*time.Hour).Unix()*1000), nil)
+	testutil.Ok(t, err)
+
+	var fakeChunk bytes.Buffer
+	fakeChunk.Write([]byte{0, 1, 2, 3})
+	testutil.Ok(t, bkt.Upload(ctx, path.Join(shouldDeleteID.String(), "chunks", "000001"), &fakeChunk))
+
+	// 2.  Old block with meta, so should be kept.
+	shouldIgnoreID1, err := ulid.New(uint64(time.Now().Add(-PartialUploadThresholdAge-2*time.Hour).Unix()*1000), nil)
+	testutil.Ok(t, err)
+	var meta metadata.Meta
+	meta.Version = 1
+	meta.ULID = shouldIgnoreID1
+
+	var buf bytes.Buffer
+	testutil.Ok(t, json.NewEncoder(&buf).Encode(&meta))
+	testutil.Ok(t, bkt.Upload(ctx, path.Join(shouldIgnoreID1.String(), metadata.MetaFilename), &buf))
+	testutil.Ok(t, bkt.Upload(ctx, path.Join(shouldIgnoreID1.String(), "chunks", "000001"), &fakeChunk))
+
+	// 3. No meta, newer block that should be kept.
+	shouldIgnoreID2, err := ulid.New(uint64(time.Now().Add(-2*time.Hour).Unix()*1000), nil)
+	testutil.Ok(t, err)
+
+	testutil.Ok(t, bkt.Upload(ctx, path.Join(shouldIgnoreID2.String(), "chunks", "000001"), &fakeChunk))
+
+	deleteAttempts := prometheus.NewCounter(prometheus.CounterOpts{})
+	BestEffortCleanAbortedPartialUploads(ctx, logger, metaFetcher, bkt, deleteAttempts)
+	testutil.Equals(t, 1.0, promtest.ToFloat64(deleteAttempts))
+
+	exists, err := bkt.Exists(ctx, path.Join(shouldDeleteID.String(), "chunks", "000001"))
+	testutil.Ok(t, err)
+	testutil.Equals(t, false, exists)
+
+	exists, err = bkt.Exists(ctx, path.Join(shouldIgnoreID1.String(), "chunks", "000001"))
+	testutil.Ok(t, err)
+	testutil.Equals(t, true, exists)
+
+	exists, err = bkt.Exists(ctx, path.Join(shouldIgnoreID2.String(), "chunks", "000001"))
+	testutil.Ok(t, err)
+	testutil.Equals(t, true, exists)
+}
diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go
index b853ff5be1..5b40bca70d 100644
--- a/pkg/compact/compact.go
+++ b/pkg/compact/compact.go
@@ -5,7 +5,6 @@ import (
 	"fmt"
 	"io/ioutil"
 	"os"
-	"path"
 	"path/filepath"
 	"sort"
 	"sync"
@@ -17,7 +16,6 @@ import (
 	"github.com/pkg/errors"
 	"github.com/prometheus/client_golang/prometheus"
 	"github.com/prometheus/prometheus/pkg/labels"
-	"github.com/prometheus/prometheus/pkg/relabel"
 	"github.com/prometheus/prometheus/tsdb"
 	terrors "github.com/prometheus/prometheus/tsdb/errors"
 	"github.com/thanos-io/thanos/pkg/block"
@@ -32,33 +30,24 @@ const (
 	ResolutionLevelRaw = ResolutionLevel(downsample.ResLevel0)
 	ResolutionLevel5m  = ResolutionLevel(downsample.ResLevel1)
 	ResolutionLevel1h  = ResolutionLevel(downsample.ResLevel2)
-
-	MinimumAgeForRemoval = time.Duration(30 * time.Minute)
 )
 
-var blockTooFreshSentinelError = errors.New("Block too fresh")
-
-// Syncer syncronizes block metas from a bucket into a local directory.
+// Syncer synchronizes block metas from a bucket into a local directory.
 // It sorts them into compaction groups based on equal label sets.
 type Syncer struct {
 	logger                   log.Logger
 	reg                      prometheus.Registerer
 	bkt                      objstore.Bucket
-	consistencyDelay         time.Duration
+	fetcher                  block.MetadataFetcher
 	mtx                      sync.Mutex
 	blocks                   map[ulid.ULID]*metadata.Meta
-	blocksMtx                sync.Mutex
 	blockSyncConcurrency     int
 	metrics                  *syncerMetrics
 	acceptMalformedIndex     bool
 	enableVerticalCompaction bool
-	relabelConfig            []*relabel.Config
 }
 
 type syncerMetrics struct {
-	syncMetas                 prometheus.Counter
-	syncMetaFailures          prometheus.Counter
-	syncMetaDuration          prometheus.Histogram
 	garbageCollectedBlocks    prometheus.Counter
 	garbageCollections        prometheus.Counter
 	garbageCollectionFailures prometheus.Counter
@@ -73,20 +62,6 @@ type syncerMetrics struct {
 func newSyncerMetrics(reg prometheus.Registerer) *syncerMetrics {
 	var m syncerMetrics
 
-	m.syncMetas = prometheus.NewCounter(prometheus.CounterOpts{
-		Name: "thanos_compact_sync_meta_total",
-		Help: "Total number of sync meta operations.",
-	})
-	m.syncMetaFailures = prometheus.NewCounter(prometheus.CounterOpts{
-		Name: "thanos_compact_sync_meta_failures_total",
-		Help: "Total number of failed sync meta operations.",
-	})
-	m.syncMetaDuration = prometheus.NewHistogram(prometheus.HistogramOpts{
-		Name:    "thanos_compact_sync_meta_duration_seconds",
-		Help:    "Time it took to sync meta files.",
-		Buckets: []float64{0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120, 240, 360, 720},
-	})
-
 	m.garbageCollectedBlocks = prometheus.NewCounter(prometheus.CounterOpts{
 		Name: "thanos_compact_garbage_collected_blocks_total",
 		Help: "Total number of deleted blocks by compactor.",
@@ -128,9 +103,6 @@ func newSyncerMetrics(reg prometheus.Registerer) *syncerMetrics {
 
 	if reg != nil {
 		reg.MustRegister(
-			m.syncMetas,
-			m.syncMetaFailures,
-			m.syncMetaDuration,
 			m.garbageCollectedBlocks,
 			m.garbageCollections,
 			m.garbageCollectionFailures,
@@ -145,22 +117,21 @@ func newSyncerMetrics(reg prometheus.Registerer) *syncerMetrics {
 	return &m
 }
 
-// NewSyncer returns a new Syncer for the given Bucket and directory.
+// NewMetaSyncer returns a new Syncer for the given Bucket and directory.
 // Blocks must be at least as old as the sync delay for being considered.
-func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, consistencyDelay time.Duration, blockSyncConcurrency int, acceptMalformedIndex bool, enableVerticalCompaction bool, relabelConfig []*relabel.Config) (*Syncer, error) {
+func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, fetcher block.MetadataFetcher, blockSyncConcurrency int, acceptMalformedIndex bool, enableVerticalCompaction bool) (*Syncer, error) {
 	if logger == nil {
 		logger = log.NewNopLogger()
 	}
 	return &Syncer{
 		logger:               logger,
 		reg:                  reg,
-		consistencyDelay:     consistencyDelay,
-		blocks:               map[ulid.ULID]*metadata.Meta{},
 		bkt:                  bkt,
+		fetcher:              fetcher,
+		blocks:               map[ulid.ULID]*metadata.Meta{},
 		metrics:              newSyncerMetrics(reg),
 		blockSyncConcurrency: blockSyncConcurrency,
 		acceptMalformedIndex: acceptMalformedIndex,
-		relabelConfig:        relabelConfig,
 		// The syncer offers an option to enable vertical compaction, even if it's
 		// not currently used by Thanos, because the compactor is also used by Cortex
 		// which needs vertical compaction.
@@ -168,24 +139,6 @@ func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket
 	}, nil
 }
 
-// SyncMetas synchronizes all meta files from blocks in the bucket into
-// the memory.  It removes any partial blocks older than the max of
-// consistencyDelay and MinimumAgeForRemoval from the bucket.
-func (c *Syncer) SyncMetas(ctx context.Context) error {
-	c.mtx.Lock()
-	defer c.mtx.Unlock()
-
-	begin := time.Now()
-
-	err := c.syncMetas(ctx)
-	if err != nil {
-		c.metrics.syncMetaFailures.Inc()
-	}
-	c.metrics.syncMetas.Inc()
-	c.metrics.syncMetaDuration.Observe(time.Since(begin).Seconds())
-	return err
-}
-
 // UntilNextDownsampling calculates how long it will take until the next downsampling operation.
 // Returns an error if there will be no downsampling.
 func UntilNextDownsampling(m *metadata.Meta) (time.Duration, error) {
@@ -202,155 +155,19 @@ func UntilNextDownsampling(m *metadata.Meta) (time.Duration, error) {
 	}
 }
 
-func (c *Syncer) syncMetas(ctx context.Context) error {
-	var wg sync.WaitGroup
-	defer wg.Wait()
-
-	metaIDsChan := make(chan ulid.ULID)
-	errChan := make(chan error, c.blockSyncConcurrency)
-
-	workCtx, cancel := context.WithCancel(ctx)
-	defer cancel()
-	for i := 0; i < c.blockSyncConcurrency; i++ {
-		wg.Add(1)
-		go func() {
-			defer wg.Done()
-
-			for id := range metaIDsChan {
-				// Check if we already have this block cached locally.
-				c.blocksMtx.Lock()
-				_, seen := c.blocks[id]
-				c.blocksMtx.Unlock()
-				if seen {
-					continue
-				}
-
-				meta, err := c.downloadMeta(workCtx, id)
-				if err == blockTooFreshSentinelError {
-					continue
-				}
-
-				if err != nil {
-					if removedOrIgnored := c.removeIfMetaMalformed(workCtx, id); removedOrIgnored {
-						continue
-					}
-					errChan <- err
-					return
-				}
-
-				// Check for block labels by relabeling.
-				// If output is empty, the block will be dropped.
-				lset := labels.FromMap(meta.Thanos.Labels)
-				processedLabels := relabel.Process(lset, c.relabelConfig...)
-				if processedLabels == nil {
-					level.Debug(c.logger).Log("msg", "dropping block(drop in relabeling)", "block", id)
-					continue
-				}
-
-				c.blocksMtx.Lock()
-				c.blocks[id] = meta
-				c.blocksMtx.Unlock()
-			}
-		}()
-	}
-
-	// Read back all block metas so we can detect deleted blocks.
-	remote := map[ulid.ULID]struct{}{}
-
-	err := c.bkt.Iter(ctx, "", func(name string) error {
-		id, ok := block.IsBlockDir(name)
-		if !ok {
-			return nil
-		}
-
-		remote[id] = struct{}{}
+func (s *Syncer) SyncMetas(ctx context.Context) error {
+	s.mtx.Lock()
+	defer s.mtx.Unlock()
 
-		select {
-		case <-ctx.Done():
-		case metaIDsChan <- id:
-		}
-
-		return nil
-	})
-	close(metaIDsChan)
+	metas, _, err := s.fetcher.Fetch(ctx)
 	if err != nil {
-		return retry(errors.Wrap(err, "retrieve bucket block metas"))
-	}
-
-	wg.Wait()
-	close(errChan)
-
-	if err := <-errChan; err != nil {
 		return retry(err)
 	}
-
-	// Delete all local block dirs that no longer exist in the bucket.
-	for id := range c.blocks {
-		if _, ok := remote[id]; !ok {
-			delete(c.blocks, id)
-		}
-	}
+	s.blocks = metas
 
 	return nil
 }
 
-func (c *Syncer) downloadMeta(ctx context.Context, id ulid.ULID) (*metadata.Meta, error) {
-	level.Debug(c.logger).Log("msg", "download meta", "block", id)
-
-	meta, err := block.DownloadMeta(ctx, c.logger, c.bkt, id)
-	if err != nil {
-		if ulid.Now()-id.Time() < uint64(c.consistencyDelay/time.Millisecond) {
-			level.Debug(c.logger).Log("msg", "block is too fresh for now", "block", id)
-			return nil, blockTooFreshSentinelError
-		}
-		return nil, errors.Wrapf(err, "downloading meta.json for %s", id)
-	}
-
-	// ULIDs contain a millisecond timestamp. We do not consider blocks that have been created too recently to
-	// avoid races when a block is only partially uploaded. This relates to all blocks, excluding:
-	// - repair created blocks
-	// - compactor created blocks
-	// NOTE: It is not safe to miss "old" block (even that it is newly created) in sync step. Compactor needs to aware of ALL old blocks.
-	// TODO(bplotka): https://github.com/thanos-io/thanos/issues/377.
-	if ulid.Now()-id.Time() < uint64(c.consistencyDelay/time.Millisecond) &&
-		meta.Thanos.Source != metadata.BucketRepairSource &&
-		meta.Thanos.Source != metadata.CompactorSource &&
-		meta.Thanos.Source != metadata.CompactorRepairSource {
-
-		level.Debug(c.logger).Log("msg", "block is too fresh for now", "block", id)
-		return nil, blockTooFreshSentinelError
-	}
-
-	return &meta, nil
-}
-
-// removeIfMalformed removes a block from the bucket if that block does not have a meta file.  It ignores blocks that
-// are younger than MinimumAgeForRemoval.
-func (c *Syncer) removeIfMetaMalformed(ctx context.Context, id ulid.ULID) (removedOrIgnored bool) {
-	metaExists, err := c.bkt.Exists(ctx, path.Join(id.String(), block.MetaFilename))
-	if err != nil {
-		level.Warn(c.logger).Log("msg", "failed to check meta exists for block", "block", id, "err", err)
-		return false
-	}
-	if metaExists {
-		// Meta exists, block is not malformed.
-		return false
-	}
-
-	if ulid.Now()-id.Time() <= uint64(MinimumAgeForRemoval/time.Millisecond) {
-		// Minimum delay has not expired, ignore for now.
-		return true
-	}
-
-	if err := block.Delete(ctx, c.logger, c.bkt, id); err != nil {
-		level.Warn(c.logger).Log("msg", "failed to delete malformed block", "block", id, "err", err)
-		return false
-	}
-	level.Info(c.logger).Log("msg", "deleted malformed block", "block", id)
-
-	return true
-}
-
 // GroupKey returns a unique identifier for the group the block belongs to. It considers
 // the downsampling resolution and the block's labels.
 func GroupKey(meta metadata.Thanos) string {
@@ -363,27 +180,27 @@ func groupKey(res int64, lbls labels.Labels) string {
 
 // Groups returns the compaction groups for all blocks currently known to the syncer.
 // It creates all groups from the scratch on every call.
-func (c *Syncer) Groups() (res []*Group, err error) {
-	c.mtx.Lock()
-	defer c.mtx.Unlock()
+func (s *Syncer) Groups() (res []*Group, err error) {
+	s.mtx.Lock()
+	defer s.mtx.Unlock()
 
 	groups := map[string]*Group{}
-	for _, m := range c.blocks {
+	for _, m := range s.blocks {
 		g, ok := groups[GroupKey(m.Thanos)]
 		if !ok {
 			g, err = newGroup(
-				log.With(c.logger, "compactionGroup", GroupKey(m.Thanos)),
-				c.bkt,
+				log.With(s.logger, "compactionGroup", GroupKey(m.Thanos)),
+				s.bkt,
 				labels.FromMap(m.Thanos.Labels),
 				m.Thanos.Downsample.Resolution,
-				c.acceptMalformedIndex,
-				c.enableVerticalCompaction,
-				c.metrics.compactions.WithLabelValues(GroupKey(m.Thanos)),
-				c.metrics.compactionRunsStarted.WithLabelValues(GroupKey(m.Thanos)),
-				c.metrics.compactionRunsCompleted.WithLabelValues(GroupKey(m.Thanos)),
-				c.metrics.compactionFailures.WithLabelValues(GroupKey(m.Thanos)),
-				c.metrics.verticalCompactions.WithLabelValues(GroupKey(m.Thanos)),
-				c.metrics.garbageCollectedBlocks,
+				s.acceptMalformedIndex,
+				s.enableVerticalCompaction,
+				s.metrics.compactions.WithLabelValues(GroupKey(m.Thanos)),
+				s.metrics.compactionRunsStarted.WithLabelValues(GroupKey(m.Thanos)),
+				s.metrics.compactionRunsCompleted.WithLabelValues(GroupKey(m.Thanos)),
+				s.metrics.compactionFailures.WithLabelValues(GroupKey(m.Thanos)),
+				s.metrics.verticalCompactions.WithLabelValues(GroupKey(m.Thanos)),
+				s.metrics.garbageCollectedBlocks,
 			)
 			if err != nil {
 				return nil, errors.Wrap(err, "create compaction group")
@@ -403,9 +220,9 @@ func (c *Syncer) Groups() (res []*Group, err error) {
 
 // GarbageCollect deletes blocks from the bucket if their data is available as part of a
 // block with a higher compaction level.
-func (c *Syncer) GarbageCollect(ctx context.Context) error {
-	c.mtx.Lock()
-	defer c.mtx.Unlock()
+func (s *Syncer) GarbageCollect(ctx context.Context) error {
+	s.mtx.Lock()
+	defer s.mtx.Unlock()
 
 	begin := time.Now()
 
@@ -413,12 +230,12 @@ func (c *Syncer) GarbageCollect(ctx context.Context) error {
 	for _, res := range []int64{
 		downsample.ResLevel0, downsample.ResLevel1, downsample.ResLevel2,
 	} {
-		err := c.garbageCollect(ctx, res)
+		err := s.garbageCollect(ctx, res)
 		if err != nil {
-			c.metrics.garbageCollectionFailures.Inc()
+			s.metrics.garbageCollectionFailures.Inc()
 		}
-		c.metrics.garbageCollections.Inc()
-		c.metrics.garbageCollectionDuration.Observe(time.Since(begin).Seconds())
+		s.metrics.garbageCollections.Inc()
+		s.metrics.garbageCollectionDuration.Observe(time.Since(begin).Seconds())
 
 		if err != nil {
 			return errors.Wrapf(err, "garbage collect resolution %d", res)
@@ -427,13 +244,12 @@ func (c *Syncer) GarbageCollect(ctx context.Context) error {
 	return nil
 }
 
-func (c *Syncer) GarbageBlocks(resolution int64) (ids []ulid.ULID, err error) {
+func (s *Syncer) GarbageBlocks(resolution int64) (ids []ulid.ULID, err error) {
 	// Map each block to its highest priority parent. Initial blocks have themselves
 	// in their source section, i.e. are their own parent.
 	parents := map[ulid.ULID]ulid.ULID{}
 
-	for id, meta := range c.blocks {
-
+	for id, meta := range s.blocks {
 		// Skip any block that has a different resolution.
 		if meta.Thanos.Downsample.Resolution != resolution {
 			continue
@@ -447,7 +263,7 @@ func (c *Syncer) GarbageBlocks(resolution int64) (ids []ulid.ULID, err error) {
 				parents[sid] = id
 				continue
 			}
-			pmeta, ok := c.blocks[pid]
+			pmeta, ok := s.blocks[pid]
 			if !ok {
 				return nil, errors.Errorf("previous parent block %s not found", pid)
 			}
@@ -473,7 +289,7 @@ func (c *Syncer) GarbageBlocks(resolution int64) (ids []ulid.ULID, err error) {
 		topParents[pid] = struct{}{}
 	}
 
-	for id, meta := range c.blocks {
+	for id, meta := range s.blocks {
 		// Skip any block that has a different resolution.
 		if meta.Thanos.Downsample.Resolution != resolution {
 			continue
@@ -487,8 +303,8 @@ func (c *Syncer) GarbageBlocks(resolution int64) (ids []ulid.ULID, err error) {
 	return ids, nil
 }
 
-func (c *Syncer) garbageCollect(ctx context.Context, resolution int64) error {
-	garbageIds, err := c.GarbageBlocks(resolution)
+func (s *Syncer) garbageCollect(ctx context.Context, resolution int64) error {
+	garbageIds, err := s.GarbageBlocks(resolution)
 	if err != nil {
 		return err
 	}
@@ -501,9 +317,9 @@ func (c *Syncer) garbageCollect(ctx context.Context, resolution int64) error {
 		// Spawn a new context so we always delete a block in full on shutdown.
 		delCtx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
 
-		level.Info(c.logger).Log("msg", "deleting outdated block", "block", id)
+		level.Info(s.logger).Log("msg", "deleting outdated block", "block", id)
 
-		err := block.Delete(delCtx, c.logger, c.bkt, id)
+		err := block.Delete(delCtx, s.logger, s.bkt, id)
 		cancel()
 		if err != nil {
 			return retry(errors.Wrapf(err, "delete block %s from bucket", id))
@@ -511,8 +327,8 @@ func (c *Syncer) garbageCollect(ctx context.Context, resolution int64) error {
 
 		// Immediately update our in-memory state so no further call to SyncMetas is needed
 		// after running garbage collection.
-		delete(c.blocks, id)
-		c.metrics.garbageCollectedBlocks.Inc()
+		delete(s.blocks, id)
+		s.metrics.garbageCollectedBlocks.Inc()
 	}
 	return nil
 }
@@ -1161,5 +977,6 @@ func (c *BucketCompactor) Compact(ctx context.Context) error {
 			break
 		}
 	}
+	level.Info(c.logger).Log("msg", "compaction iterations done")
 	return nil
 }
diff --git a/pkg/compact/compact_e2e_test.go b/pkg/compact/compact_e2e_test.go
index 9f35f24bff..9a7411b2e9 100644
--- a/pkg/compact/compact_e2e_test.go
+++ b/pkg/compact/compact_e2e_test.go
@@ -20,7 +20,6 @@ import (
 	"github.com/prometheus/client_golang/prometheus"
 	promtest "github.com/prometheus/client_golang/prometheus/testutil"
 	"github.com/prometheus/prometheus/pkg/labels"
-	"github.com/prometheus/prometheus/pkg/relabel"
 	"github.com/prometheus/prometheus/tsdb"
 	"github.com/prometheus/prometheus/tsdb/index"
 	"github.com/thanos-io/thanos/pkg/block"
@@ -28,57 +27,8 @@ import (
 	"github.com/thanos-io/thanos/pkg/objstore"
 	"github.com/thanos-io/thanos/pkg/objstore/objtesting"
 	"github.com/thanos-io/thanos/pkg/testutil"
-	"gopkg.in/yaml.v2"
 )
 
-func TestSyncer_SyncMetas_e2e(t *testing.T) {
-	objtesting.ForeachStore(t, func(t *testing.T, bkt objstore.Bucket) {
-		ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
-		defer cancel()
-
-		relabelConfig := make([]*relabel.Config, 0)
-		sy, err := NewSyncer(nil, nil, bkt, 0, 1, false, false, relabelConfig)
-		testutil.Ok(t, err)
-
-		// Generate 15 blocks. Initially the first 10 are synced into memory and only the last
-		// 10 are in the bucket.
-		// After the first synchronization the first 5 should be dropped and the
-		// last 5 be loaded from the bucket.
-		var ids []ulid.ULID
-		var metas []*metadata.Meta
-
-		for i := 0; i < 15; i++ {
-			id, err := ulid.New(uint64(i), nil)
-			testutil.Ok(t, err)
-
-			var meta metadata.Meta
-			meta.Version = 1
-			meta.ULID = id
-
-			if i < 10 {
-				sy.blocks[id] = &meta
-			}
-			ids = append(ids, id)
-			metas = append(metas, &meta)
-		}
-		for _, m := range metas[5:] {
-			var buf bytes.Buffer
-			testutil.Ok(t, json.NewEncoder(&buf).Encode(&m))
-			testutil.Ok(t, bkt.Upload(ctx, path.Join(m.ULID.String(), metadata.MetaFilename), &buf))
-		}
-
-		groups, err := sy.Groups()
-		testutil.Ok(t, err)
-		testutil.Equals(t, ids[:10], groups[0].IDs())
-
-		testutil.Ok(t, sy.SyncMetas(ctx))
-
-		groups, err = sy.Groups()
-		testutil.Ok(t, err)
-		testutil.Equals(t, ids[5:], groups[0].IDs())
-	})
-}
-
 func TestSyncer_GarbageCollect_e2e(t *testing.T) {
 	objtesting.ForeachStore(t, func(t *testing.T, bkt objstore.Bucket) {
 		ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
@@ -89,8 +39,6 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) {
 		var metas []*metadata.Meta
 		var ids []ulid.ULID
 
-		relabelConfig := make([]*relabel.Config, 0)
-
 		for i := 0; i < 10; i++ {
 			var m metadata.Meta
 
@@ -125,7 +73,7 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) {
 		m3.Thanos.Downsample.Resolution = 0
 
 		var m4 metadata.Meta
-		m4.Version = 14
+		m4.Version = 1
 		m4.ULID = ulid.MustNew(400, nil)
 		m4.Compaction.Level = 2
 		m4.Compaction.Sources = ids[9:] // covers the last block but is a different resolution. Must not trigger deletion.
@@ -139,11 +87,14 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) {
 			testutil.Ok(t, bkt.Upload(ctx, path.Join(m.ULID.String(), metadata.MetaFilename), &buf))
 		}
 
-		// Do one initial synchronization with the bucket.
-		sy, err := NewSyncer(nil, nil, bkt, 0, 1, false, false, relabelConfig)
+		metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil)
 		testutil.Ok(t, err)
-		testutil.Ok(t, sy.SyncMetas(ctx))
 
+		sy, err := NewSyncer(nil, nil, bkt, metaFetcher, 1, false, false)
+		testutil.Ok(t, err)
+
+		// Do one initial synchronization with the bucket.
+		testutil.Ok(t, sy.SyncMetas(ctx))
 		testutil.Ok(t, sy.GarbageCollect(ctx))
 
 		var rem []ulid.ULID
@@ -209,7 +160,10 @@ func TestGroup_Compact_e2e(t *testing.T) {
 
 		reg := prometheus.NewRegistry()
 
-		sy, err := NewSyncer(logger, reg, bkt, 0*time.Second, 5, false, false, nil)
+		metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil)
+		testutil.Ok(t, err)
+
+		sy, err := NewSyncer(nil, nil, bkt, metaFetcher, 5, false, false)
 		testutil.Ok(t, err)
 
 		comp, err := tsdb.NewLeveledCompactor(ctx, reg, logger, []int64{1000, 3000}, nil)
@@ -220,8 +174,6 @@ func TestGroup_Compact_e2e(t *testing.T) {
 
 		// Compaction on empty should not fail.
 		testutil.Ok(t, bComp.Compact(ctx))
-		testutil.Equals(t, 1.0, promtest.ToFloat64(sy.metrics.syncMetas))
-		testutil.Equals(t, 0.0, promtest.ToFloat64(sy.metrics.syncMetaFailures))
 		testutil.Equals(t, 0.0, promtest.ToFloat64(sy.metrics.garbageCollectedBlocks))
 		testutil.Equals(t, 0.0, promtest.ToFloat64(sy.metrics.garbageCollectionFailures))
 		testutil.Equals(t, 0, MetricCount(sy.metrics.compactions))
@@ -310,8 +262,6 @@ func TestGroup_Compact_e2e(t *testing.T) {
 		})
 
 		testutil.Ok(t, bComp.Compact(ctx))
-		testutil.Equals(t, 3.0, promtest.ToFloat64(sy.metrics.syncMetas))
-		testutil.Equals(t, 0.0, promtest.ToFloat64(sy.metrics.syncMetaFailures))
 		testutil.Equals(t, 5.0, promtest.ToFloat64(sy.metrics.garbageCollectedBlocks))
 		testutil.Equals(t, 0.0, promtest.ToFloat64(sy.metrics.garbageCollectionFailures))
 		testutil.Equals(t, 4, MetricCount(sy.metrics.compactions))
@@ -495,86 +445,3 @@ func createEmptyBlock(dir string, mint int64, maxt int64, extLset labels.Labels,
 
 	return uid, nil
 }
-
-func TestSyncer_SyncMetasFilter_e2e(t *testing.T) {
-	var err error
-
-	relabelContentYaml := `
-    - action: drop
-      regex: "A"
-      source_labels:
-      - cluster
-    `
-	var relabelConfig []*relabel.Config
-	err = yaml.Unmarshal([]byte(relabelContentYaml), &relabelConfig)
-	testutil.Ok(t, err)
-
-	extLsets := []labels.Labels{{{Name: "cluster", Value: "A"}}, {{Name: "cluster", Value: "B"}}}
-
-	objtesting.ForeachStore(t, func(t *testing.T, bkt objstore.Bucket) {
-		ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
-		defer cancel()
-
-		sy, err := NewSyncer(nil, nil, bkt, 0, 1, false, false, relabelConfig)
-		testutil.Ok(t, err)
-
-		var ids []ulid.ULID
-		var metas []*metadata.Meta
-
-		for i := 0; i < 16; i++ {
-			id, err := ulid.New(uint64(i), nil)
-			testutil.Ok(t, err)
-
-			var meta metadata.Meta
-			meta.Version = 1
-			meta.ULID = id
-			meta.Thanos = metadata.Thanos{
-				Labels: extLsets[i%2].Map(),
-			}
-
-			ids = append(ids, id)
-			metas = append(metas, &meta)
-		}
-		for _, m := range metas[:10] {
-			var buf bytes.Buffer
-			testutil.Ok(t, json.NewEncoder(&buf).Encode(&m))
-			testutil.Ok(t, bkt.Upload(ctx, path.Join(m.ULID.String(), metadata.MetaFilename), &buf))
-		}
-
-		testutil.Ok(t, sy.SyncMetas(ctx))
-
-		groups, err := sy.Groups()
-		testutil.Ok(t, err)
-		var evenIds []ulid.ULID
-		for i := 0; i < 10; i++ {
-			if i%2 != 0 {
-				evenIds = append(evenIds, ids[i])
-			}
-		}
-		testutil.Equals(t, evenIds, groups[0].IDs())
-
-		// Upload last 6 blocks.
-		for _, m := range metas[10:] {
-			var buf bytes.Buffer
-			testutil.Ok(t, json.NewEncoder(&buf).Encode(&m))
-			testutil.Ok(t, bkt.Upload(ctx, path.Join(m.ULID.String(), metadata.MetaFilename), &buf))
-		}
-
-		// Delete first 4 blocks.
-		for _, m := range metas[:4] {
-			testutil.Ok(t, block.Delete(ctx, log.NewNopLogger(), bkt, m.ULID))
-		}
-
-		testutil.Ok(t, sy.SyncMetas(ctx))
-
-		groups, err = sy.Groups()
-		testutil.Ok(t, err)
-		evenIds = make([]ulid.ULID, 0)
-		for i := 4; i < 16; i++ {
-			if i%2 != 0 {
-				evenIds = append(evenIds, ids[i])
-			}
-		}
-		testutil.Equals(t, evenIds, groups[0].IDs())
-	})
-}
diff --git a/pkg/compact/compact_test.go b/pkg/compact/compact_test.go
index 3624d27fda..f7364f1147 100644
--- a/pkg/compact/compact_test.go
+++ b/pkg/compact/compact_test.go
@@ -1,19 +1,12 @@
 package compact
 
 import (
-	"bytes"
-	"context"
-	"path"
 	"testing"
-	"time"
 
 	"github.com/thanos-io/thanos/pkg/block/metadata"
 
-	"github.com/oklog/ulid"
 	"github.com/pkg/errors"
-	"github.com/prometheus/prometheus/pkg/relabel"
 	terrors "github.com/prometheus/prometheus/tsdb/errors"
-	"github.com/thanos-io/thanos/pkg/objstore/inmem"
 	"github.com/thanos-io/thanos/pkg/testutil"
 )
 
@@ -73,41 +66,6 @@ func TestRetryError(t *testing.T) {
 	testutil.Assert(t, IsHaltError(err), "not a halt error. Retry should not hide halt error")
 }
 
-func TestSyncer_SyncMetas_HandlesMalformedBlocks(t *testing.T) {
-	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
-	defer cancel()
-
-	bkt := inmem.NewBucket()
-	relabelConfig := make([]*relabel.Config, 0)
-	sy, err := NewSyncer(nil, nil, bkt, 10*time.Second, 1, false, false, relabelConfig)
-	testutil.Ok(t, err)
-
-	// Generate 1 block which is older than MinimumAgeForRemoval which has chunk data but no meta.  Compactor should delete it.
-	shouldDeleteId, err := ulid.New(uint64(time.Now().Add(-time.Hour).Unix()*1000), nil)
-	testutil.Ok(t, err)
-
-	var fakeChunk bytes.Buffer
-	fakeChunk.Write([]byte{0, 1, 2, 3})
-	testutil.Ok(t, bkt.Upload(ctx, path.Join(shouldDeleteId.String(), "chunks", "000001"), &fakeChunk))
-
-	// Generate 1 block which is older than consistencyDelay but younger than MinimumAgeForRemoval, and which has chunk
-	// data but no meta.  Compactor should ignore it.
-	shouldIgnoreId, err := ulid.New(uint64(time.Now().Unix()*1000), nil)
-	testutil.Ok(t, err)
-
-	testutil.Ok(t, bkt.Upload(ctx, path.Join(shouldIgnoreId.String(), "chunks", "000001"), &fakeChunk))
-
-	testutil.Ok(t, sy.SyncMetas(ctx))
-
-	exists, err := bkt.Exists(ctx, path.Join(shouldDeleteId.String(), "chunks", "000001"))
-	testutil.Ok(t, err)
-	testutil.Equals(t, false, exists)
-
-	exists, err = bkt.Exists(ctx, path.Join(shouldIgnoreId.String(), "chunks", "000001"))
-	testutil.Ok(t, err)
-	testutil.Equals(t, true, exists)
-}
-
 func TestGroupKey(t *testing.T) {
 	for _, tcase := range []struct {
 		input    metadata.Thanos
diff --git a/pkg/compact/downsample/streamed_block_writer.go b/pkg/compact/downsample/streamed_block_writer.go
index 0bc84e11c4..531ee6efa9 100644
--- a/pkg/compact/downsample/streamed_block_writer.go
+++ b/pkg/compact/downsample/streamed_block_writer.go
@@ -128,7 +128,7 @@ func NewStreamedBlockWriter(
 	}, nil
 }
 
-// WriteSeries writes chunks data to the chunkWriter, writes lset and chunks Metas to indexWrites and adds label sets to
+// WriteSeries writes chunks data to the chunkWriter, writes lset and chunks MetasFetcher to indexWrites and adds label sets to
 // labelsValues sets and memPostings to be written on the finalize state in the end of downsampling process.
 func (w *streamedBlockWriter) WriteSeries(lset labels.Labels, chunks []chunks.Meta) error {
 	if w.finalized || w.ignoreFinalize {
diff --git a/pkg/compact/retention.go b/pkg/compact/retention.go
index 9021677f2b..aba46963ec 100644
--- a/pkg/compact/retention.go
+++ b/pkg/compact/retention.go
@@ -13,21 +13,17 @@ import (
 
 // ApplyRetentionPolicyByResolution removes blocks depending on the specified retentionByResolution based on blocks MaxTime.
 // A value of 0 disables the retention for its resolution.
-func ApplyRetentionPolicyByResolution(ctx context.Context, logger log.Logger, bkt objstore.Bucket, retentionByResolution map[ResolutionLevel]time.Duration) error {
+func ApplyRetentionPolicyByResolution(ctx context.Context, logger log.Logger, bkt objstore.Bucket, fetcher block.MetadataFetcher, retentionByResolution map[ResolutionLevel]time.Duration) error {
 	level.Info(logger).Log("msg", "start optional retention")
-	if err := bkt.Iter(ctx, "", func(name string) error {
-		id, ok := block.IsBlockDir(name)
-		if !ok {
-			return nil
-		}
-		m, err := block.DownloadMeta(ctx, logger, bkt, id)
-		if err != nil {
-			return errors.Wrap(err, "download metadata")
-		}
+	metas, _, err := fetcher.Fetch(ctx)
+	if err != nil {
+		return errors.Wrap(err, "fetch metas")
+	}
 
+	for id, m := range metas {
 		retentionDuration := retentionByResolution[ResolutionLevel(m.Thanos.Downsample.Resolution)]
 		if retentionDuration.Seconds() == 0 {
-			return nil
+			continue
 		}
 
 		maxTime := time.Unix(m.MaxTime/1000, 0)
@@ -37,10 +33,6 @@ func ApplyRetentionPolicyByResolution(ctx context.Context, logger log.Logger, bk
 				return errors.Wrap(err, "delete block")
 			}
 		}
-
-		return nil
-	}); err != nil {
-		return errors.Wrap(err, "retention")
 	}
 
 	level.Info(logger).Log("msg", "optional retention apply done")
diff --git a/pkg/compact/retention_test.go b/pkg/compact/retention_test.go
index c35fde3064..b34847faf3 100644
--- a/pkg/compact/retention_test.go
+++ b/pkg/compact/retention_test.go
@@ -11,6 +11,7 @@ import (
 	"github.com/go-kit/kit/log"
 	"github.com/oklog/ulid"
 	"github.com/prometheus/prometheus/tsdb"
+	"github.com/thanos-io/thanos/pkg/block"
 	"github.com/thanos-io/thanos/pkg/block/metadata"
 	"github.com/thanos-io/thanos/pkg/compact"
 	"github.com/thanos-io/thanos/pkg/objstore"
@@ -236,7 +237,11 @@ func TestApplyRetentionPolicyByResolution(t *testing.T) {
 			for _, b := range tt.blocks {
 				uploadMockBlock(t, bkt, b.id, b.minTime, b.maxTime, int64(b.resolution))
 			}
-			if err := compact.ApplyRetentionPolicyByResolution(ctx, logger, bkt, tt.retentionByResolution); (err != nil) != tt.wantErr {
+
+			metaFetcher, err := block.NewMetaFetcher(logger, 32, bkt, "", nil)
+			testutil.Ok(t, err)
+
+			if err := compact.ApplyRetentionPolicyByResolution(ctx, logger, bkt, metaFetcher, tt.retentionByResolution); (err != nil) != tt.wantErr {
 				t.Errorf("ApplyRetentionPolicyByResolution() error = %v, wantErr %v", err, tt.wantErr)
 			}