From 1e3f8f29e619304b1c0c82d6bf5cc3994bf9b16c Mon Sep 17 00:00:00 2001 From: Bartlomiej Plotka Date: Mon, 6 Apr 2020 13:16:12 +0100 Subject: [PATCH] compact: Made MarkForDeletion less strict; Added more debugability to block deletion logic, made meta sync explicit. Also: * Changed order: Now BestEffortCleanAbortedPartialUploads is before DeleteMarkedBlocks. * Increment markedForDeletion counter only when we actually uploaded block. * Fixed logging issues. Signed-off-by: Bartlomiej Plotka --- cmd/thanos/compact.go | 88 +++++++++++++++++++++++------------ cmd/thanos/downsample.go | 21 +++++---- cmd/thanos/main_test.go | 8 +++- pkg/block/block.go | 9 ++-- pkg/block/block_test.go | 82 +++++++++++++++++--------------- pkg/block/fetcher.go | 8 ++-- pkg/compact/blocks_cleaner.go | 3 +- pkg/compact/clean.go | 17 ++++--- pkg/compact/clean_test.go | 6 ++- pkg/compact/compact.go | 51 ++++++++++++-------- pkg/compact/retention.go | 20 ++++---- pkg/compact/retention_test.go | 6 ++- pkg/verifier/safe_delete.go | 7 +-- 13 files changed, 197 insertions(+), 129 deletions(-) diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index 0b1c22769f..1bad45f773 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -16,6 +16,7 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/oklog/run" + "github.com/oklog/ulid" "github.com/opentracing/opentracing-go" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" @@ -113,7 +114,7 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application) { waitInterval := cmd.Flag("wait-interval", "Wait interval between consecutive compaction runs and bucket refreshes. Only works when --wait flag specified."). Default("5m").Duration() - generateMissingIndexCacheFiles := cmd.Flag("index.generate-missing-cache-file", "If enabled, on startup compactor runs an on-off job that scans all the blocks to find all blocks with missing index cache file. It generates those if needed and upload."). + generateMissingIndexCacheFiles := cmd.Flag("index.generate-missing-cache-file", "DEPRECATED flag. Will be removed in next release. If enabled, on startup compactor runs an on-off job that scans all the blocks to find all blocks with missing index cache file. It generates those if needed and upload."). Hidden().Default("false").Bool() disableDownsampling := cmd.Flag("downsampling.disable", "Disables downsampling. This is not recommended "+ @@ -290,29 +291,48 @@ func runCompact( }() // While fetching blocks, we filter out blocks that were marked for deletion by using IgnoreDeletionMarkFilter. - // The delay of deleteDelay/2 is added to ensure we fetch blocks that are meant to be deleted but do not have a replacement yet. - ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, bkt, time.Duration(deleteDelay.Seconds()/2)*time.Second) + // The delay of deleteDelay/2 is added to ensure we fetch blocks that are meant to be deleted but do not have a replacement yet. + // This is to make sure compactor will not accidentally perform compactions with gap instead. + ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, bkt, deleteDelay/2) duplicateBlocksFilter := block.NewDeduplicateFilter() baseMetaFetcher, err := block.NewBaseFetcher(logger, 32, bkt, "", extprom.WrapRegistererWithPrefix("thanos_", reg)) if err != nil { return errors.Wrap(err, "create meta fetcher") } - compactFetcher := baseMetaFetcher.NewMetaFetcher(extprom.WrapRegistererWithPrefix("thanos_", reg), []block.MetadataFilter{ - block.NewLabelShardedMetaFilter(relabelConfig), - block.NewConsistencyDelayMetaFilter(logger, consistencyDelay, extprom.WrapRegistererWithPrefix("thanos_", reg)), - ignoreDeletionMarkFilter, - duplicateBlocksFilter, - }, []block.MetadataModifier{block.NewReplicaLabelRemover(logger, dedupReplicaLabels)}) + enableVerticalCompaction := false if len(dedupReplicaLabels) > 0 { enableVerticalCompaction = true level.Info(logger).Log("msg", "deduplication.replica-label specified, vertical compaction is enabled", "dedupReplicaLabels", strings.Join(dedupReplicaLabels, ",")) } - sy, err := compact.NewSyncer(logger, reg, bkt, compactFetcher, duplicateBlocksFilter, ignoreDeletionMarkFilter, blocksMarkedForDeletion, blockSyncConcurrency, acceptMalformedIndex, enableVerticalCompaction) - if err != nil { - return errors.Wrap(err, "create syncer") + compactorView := ui.NewBucketUI(logger, label, path.Join(externalPrefix, "/loaded"), prefixHeader) + var sy *compact.Syncer + { + // Make sure all compactor meta syncs are done through Syncer.SyncMeta for readability. + cf := baseMetaFetcher.NewMetaFetcher( + extprom.WrapRegistererWithPrefix("thanos_", reg), []block.MetadataFilter{ + block.NewLabelShardedMetaFilter(relabelConfig), + block.NewConsistencyDelayMetaFilter(logger, consistencyDelay, extprom.WrapRegistererWithPrefix("thanos_", reg)), + ignoreDeletionMarkFilter, + duplicateBlocksFilter, + }, []block.MetadataModifier{block.NewReplicaLabelRemover(logger, dedupReplicaLabels)}, + ) + cf.UpdateOnChange(compactorView.Set) + sy, err = compact.NewSyncer( + logger, + reg, + bkt, + cf, + duplicateBlocksFilter, + ignoreDeletionMarkFilter, + blocksMarkedForDeletion, + blockSyncConcurrency, + acceptMalformedIndex, enableVerticalCompaction) + if err != nil { + return errors.Wrap(err, "create syncer") + } } levels, err := compactions.levels(maxCompactionLevel) @@ -371,39 +391,52 @@ func runCompact( // We run two passes of this to ensure that the 1h downsampling is generated // for 5m downsamplings created in the first run. level.Info(logger).Log("msg", "start first pass of downsampling") - - if err := downsampleBucket(ctx, logger, downsampleMetrics, bkt, compactFetcher, downsamplingDir); err != nil { + if err := sy.SyncMetas(ctx); err != nil { + return errors.Wrap(err, "sync before first pass of downsampling") + } + if err := downsampleBucket(ctx, logger, downsampleMetrics, bkt, sy.Metas(), downsamplingDir); err != nil { return errors.Wrap(err, "first pass of downsampling failed") } level.Info(logger).Log("msg", "start second pass of downsampling") - - if err := downsampleBucket(ctx, logger, downsampleMetrics, bkt, compactFetcher, downsamplingDir); err != nil { + if err := sy.SyncMetas(ctx); err != nil { + return errors.Wrap(err, "sync before second pass of downsampling") + } + if err := downsampleBucket(ctx, logger, downsampleMetrics, bkt, sy.Metas(), downsamplingDir); err != nil { return errors.Wrap(err, "second pass of downsampling failed") } level.Info(logger).Log("msg", "downsampling iterations done") } else { - level.Warn(logger).Log("msg", "downsampling was explicitly disabled") + level.Info(logger).Log("msg", "downsampling was explicitly disabled") } - if err := compact.ApplyRetentionPolicyByResolution(ctx, logger, bkt, compactFetcher, retentionByResolution, blocksMarkedForDeletion); err != nil { + // TODO(bwplotka): Find a way to avoid syncing if no op was done. + if err := sy.SyncMetas(ctx); err != nil { + return errors.Wrap(err, "sync before first pass of downsampling") + } + + if err := compact.ApplyRetentionPolicyByResolution(ctx, logger, bkt, sy.Metas(), retentionByResolution, blocksMarkedForDeletion); err != nil { return errors.Wrap(err, fmt.Sprintf("retention failed")) } + // No need to resync before partial uploads and delete marked blocks. Last sync should be valid. + compact.BestEffortCleanAbortedPartialUploads(ctx, logger, sy.Partial(), bkt, partialUploadDeleteAttempts, blocksMarkedForDeletion) if err := blocksCleaner.DeleteMarkedBlocks(ctx); err != nil { return errors.Wrap(err, "error cleaning blocks") } - - compact.BestEffortCleanAbortedPartialUploads(ctx, logger, compactFetcher, bkt, partialUploadDeleteAttempts, blocksMarkedForDeletion) return nil } g.Add(func() error { defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client") - // Generate index file. + // Generate index files. + // TODO(bwplotka): Remove this in next release. if generateMissingIndexCacheFiles { - if err := genMissingIndexCacheFiles(ctx, logger, reg, bkt, compactFetcher, indexCacheDir); err != nil { + if err := sy.SyncMetas(ctx); err != nil { + return err + } + if err := genMissingIndexCacheFiles(ctx, logger, reg, bkt, sy.Metas(), indexCacheDir); err != nil { return err } } @@ -451,16 +484,14 @@ func runCompact( r := route.New() ins := extpromhttp.NewInstrumentationMiddleware(reg) - compactorView := ui.NewBucketUI(logger, label, path.Join(externalPrefix, "/loaded"), prefixHeader) compactorView.Register(r, ins) - compactFetcher.UpdateOnChange(compactorView.Set) global := ui.NewBucketUI(logger, label, path.Join(externalPrefix, "/global"), prefixHeader) global.Register(r, ins) // Separate fetcher for global view. // TODO(bwplotka): Allow Bucket UI to visualize the state of the block as well. - f := baseMetaFetcher.NewMetaFetcher(extprom.WrapRegistererWithPrefix("thanos_bucket_ui", reg), nil, nil) + f := baseMetaFetcher.NewMetaFetcher(extprom.WrapRegistererWithPrefix("thanos_bucket_ui", reg), nil, nil, "component", "globalBucketUI") f.UpdateOnChange(global.Set) srv.Handle("/", r) @@ -494,7 +525,7 @@ func runCompact( } // genMissingIndexCacheFiles scans over all blocks, generates missing index cache files and uploads them to object storage. -func genMissingIndexCacheFiles(ctx context.Context, logger log.Logger, reg *prometheus.Registry, bkt objstore.Bucket, fetcher block.MetadataFetcher, dir string) error { +func genMissingIndexCacheFiles(ctx context.Context, logger log.Logger, reg *prometheus.Registry, bkt objstore.Bucket, metas map[ulid.ULID]*metadata.Meta, dir string) error { genIndex := promauto.With(reg).NewCounter(prometheus.CounterOpts{ Name: metricIndexGenerateName, Help: metricIndexGenerateHelp, @@ -515,11 +546,6 @@ func genMissingIndexCacheFiles(ctx context.Context, logger log.Logger, reg *prom level.Info(logger).Log("msg", "start index cache processing") - metas, _, err := fetcher.Fetch(ctx) - if err != nil { - return errors.Wrap(err, "fetch metas") - } - for _, meta := range metas { // New version of compactor pushes index cache along with data block. // Skip uncompacted blocks. diff --git a/cmd/thanos/downsample.go b/cmd/thanos/downsample.go index c98335fd0c..13e1aa8a68 100644 --- a/cmd/thanos/downsample.go +++ b/cmd/thanos/downsample.go @@ -102,14 +102,20 @@ func RunDownsample( statusProber.Ready() level.Info(logger).Log("msg", "start first pass of downsampling") - - if err := downsampleBucket(ctx, logger, metrics, bkt, metaFetcher, dataDir); err != nil { + metas, _, err := metaFetcher.Fetch(ctx) + if err != nil { + return errors.Wrap(err, "sync before first pass of downsampling") + } + if err := downsampleBucket(ctx, logger, metrics, bkt, metas, dataDir); err != nil { return errors.Wrap(err, "downsampling failed") } level.Info(logger).Log("msg", "start second pass of downsampling") - - if err := downsampleBucket(ctx, logger, metrics, bkt, metaFetcher, dataDir); err != nil { + metas, _, err = metaFetcher.Fetch(ctx) + if err != nil { + return errors.Wrap(err, "sync before second pass of downsampling") + } + if err := downsampleBucket(ctx, logger, metrics, bkt, metas, dataDir); err != nil { return errors.Wrap(err, "downsampling failed") } @@ -144,7 +150,7 @@ func downsampleBucket( logger log.Logger, metrics *DownsampleMetrics, bkt objstore.Bucket, - fetcher block.MetadataFetcher, + metas map[ulid.ULID]*metadata.Meta, dir string, ) error { if err := os.RemoveAll(dir); err != nil { @@ -160,11 +166,6 @@ func downsampleBucket( } }() - metas, _, err := fetcher.Fetch(ctx) - if err != nil { - return errors.Wrap(err, "downsampling meta fetch") - } - // mapping from a hash over all source IDs to blocks. We don't need to downsample a block // if a downsampled version with the same hash already exists. sources5m := map[ulid.ULID]struct{}{} diff --git a/cmd/thanos/main_test.go b/cmd/thanos/main_test.go index ff0aa2863b..ffd6598bd8 100644 --- a/cmd/thanos/main_test.go +++ b/cmd/thanos/main_test.go @@ -79,7 +79,9 @@ func TestCleanupIndexCacheFolder(t *testing.T) { metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil, nil, nil) testutil.Ok(t, err) - testutil.Ok(t, genMissingIndexCacheFiles(ctx, logger, reg, bkt, metaFetcher, dir)) + metas, _, err := metaFetcher.Fetch(ctx) + testutil.Ok(t, err) + testutil.Ok(t, genMissingIndexCacheFiles(ctx, logger, reg, bkt, metas, dir)) genIndexExp.Inc() testutil.GatherAndCompare(t, expReg, reg, metricIndexGenerateName) @@ -119,7 +121,9 @@ func TestCleanupDownsampleCacheFolder(t *testing.T) { metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil, nil, nil) testutil.Ok(t, err) - testutil.Ok(t, downsampleBucket(ctx, logger, metrics, bkt, metaFetcher, dir)) + metas, _, err := metaFetcher.Fetch(ctx) + testutil.Ok(t, err) + testutil.Ok(t, downsampleBucket(ctx, logger, metrics, bkt, metas, dir)) testutil.Equals(t, 1.0, promtest.ToFloat64(metrics.downsamples.WithLabelValues(compact.GroupKey(meta.Thanos)))) _, err = os.Stat(dir) diff --git a/pkg/block/block.go b/pkg/block/block.go index ddc877fb57..2bc7f86aae 100644 --- a/pkg/block/block.go +++ b/pkg/block/block.go @@ -21,6 +21,7 @@ import ( "github.com/go-kit/kit/log/level" "github.com/oklog/ulid" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/objstore" @@ -129,14 +130,15 @@ func cleanUp(logger log.Logger, bkt objstore.Bucket, id ulid.ULID, err error) er } // MarkForDeletion creates a file which stores information about when the block was marked for deletion. -func MarkForDeletion(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID) error { +func MarkForDeletion(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID, markedForDeletion prometheus.Counter) error { deletionMarkFile := path.Join(id.String(), metadata.DeletionMarkFilename) deletionMarkExists, err := bkt.Exists(ctx, deletionMarkFile) if err != nil { return errors.Wrapf(err, "check exists %s in bucket", deletionMarkFile) } if deletionMarkExists { - return errors.Errorf("file %s already exists in bucket", deletionMarkFile) + level.Warn(logger).Log("msg", "requested to mark for deletion, but file already exists; this should not happen; investigate", "err", errors.Errorf("file %s already exists in bucket", deletionMarkFile)) + return nil } deletionMark, err := json.Marshal(metadata.DeletionMark{ @@ -151,7 +153,7 @@ func MarkForDeletion(ctx context.Context, logger log.Logger, bkt objstore.Bucket if err := bkt.Upload(ctx, deletionMarkFile, bytes.NewBuffer(deletionMark)); err != nil { return errors.Wrapf(err, "upload file %s to bucket", deletionMarkFile) } - + markedForDeletion.Inc() level.Info(logger).Log("msg", "block has been marked for deletion", "block", id) return nil } @@ -168,6 +170,7 @@ func Delete(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid if err != nil { return errors.Wrapf(err, "stat %s", metaFile) } + if ok { if err := bkt.Delete(ctx, metaFile); err != nil { return errors.Wrapf(err, "delete %s", metaFile) diff --git a/pkg/block/block_test.go b/pkg/block/block_test.go index 503d09db75..2c35197e7c 100644 --- a/pkg/block/block_test.go +++ b/pkg/block/block_test.go @@ -7,7 +7,6 @@ import ( "bytes" "context" "encoding/json" - "fmt" "io/ioutil" "os" "path" @@ -17,6 +16,9 @@ import ( "github.com/fortytw2/leaktest" "github.com/go-kit/kit/log" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + promtest "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/prometheus/pkg/labels" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/objstore" @@ -232,44 +234,50 @@ func TestMarkForDeletion(t *testing.T) { testutil.Ok(t, err) defer func() { testutil.Ok(t, os.RemoveAll(tmpDir)) }() - bkt := objstore.NewInMemBucket() - { - blockWithoutDeletionMark, err := e2eutil.CreateBlock(ctx, tmpDir, []labels.Labels{ - {{Name: "a", Value: "1"}}, - {{Name: "a", Value: "2"}}, - {{Name: "a", Value: "3"}}, - {{Name: "a", Value: "4"}}, - {{Name: "b", Value: "1"}}, - }, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "val1"}}, 124) - testutil.Ok(t, err) - testutil.Ok(t, Upload(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, blockWithoutDeletionMark.String()))) + for _, tcase := range []struct { + name string + preUpload func(t testing.TB, id ulid.ULID, bkt objstore.Bucket) - testutil.Ok(t, MarkForDeletion(ctx, log.NewNopLogger(), bkt, blockWithoutDeletionMark)) - exists, err := bkt.Exists(ctx, path.Join(blockWithoutDeletionMark.String(), metadata.DeletionMarkFilename)) - testutil.Ok(t, err) - testutil.Equals(t, true, exists) - } - { - blockWithDeletionMark, err := e2eutil.CreateBlock(ctx, tmpDir, []labels.Labels{ - {{Name: "a", Value: "1"}}, - {{Name: "a", Value: "2"}}, - {{Name: "a", Value: "3"}}, - {{Name: "a", Value: "4"}}, - {{Name: "b", Value: "1"}}, - }, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "val1"}}, 124) - testutil.Ok(t, err) - testutil.Ok(t, Upload(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, blockWithDeletionMark.String()))) + blocksMarked int + }{ + { + name: "block marked for deletion", + preUpload: func(t testing.TB, id ulid.ULID, bkt objstore.Bucket) {}, + blocksMarked: 1, + }, + { + name: "block with deletion mark already, expected log and no metric increment", + preUpload: func(t testing.TB, id ulid.ULID, bkt objstore.Bucket) { + deletionMark, err := json.Marshal(metadata.DeletionMark{ + ID: id, + DeletionTime: time.Now().Unix(), + Version: metadata.DeletionMarkVersion1, + }) + testutil.Ok(t, err) + testutil.Ok(t, bkt.Upload(ctx, path.Join(id.String(), metadata.DeletionMarkFilename), bytes.NewReader(deletionMark))) + }, + blocksMarked: 0, + }, + } { + t.Run(tcase.name, func(t *testing.T) { + bkt := objstore.NewInMemBucket() + id, err := e2eutil.CreateBlock(ctx, tmpDir, []labels.Labels{ + {{Name: "a", Value: "1"}}, + {{Name: "a", Value: "2"}}, + {{Name: "a", Value: "3"}}, + {{Name: "a", Value: "4"}}, + {{Name: "b", Value: "1"}}, + }, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "val1"}}, 124) + testutil.Ok(t, err) - deletionMark, err := json.Marshal(metadata.DeletionMark{ - ID: blockWithDeletionMark, - DeletionTime: time.Now().Unix(), - Version: metadata.DeletionMarkVersion1, - }) - testutil.Ok(t, err) - testutil.Ok(t, bkt.Upload(ctx, path.Join(blockWithDeletionMark.String(), metadata.DeletionMarkFilename), bytes.NewReader(deletionMark))) + tcase.preUpload(t, id, bkt) - err = MarkForDeletion(ctx, log.NewNopLogger(), bkt, blockWithDeletionMark) - testutil.NotOk(t, err) - testutil.Equals(t, fmt.Sprintf("file %s already exists in bucket", path.Join(blockWithDeletionMark.String(), metadata.DeletionMarkFilename)), err.Error()) + testutil.Ok(t, Upload(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, id.String()))) + + c := promauto.With(nil).NewCounter(prometheus.CounterOpts{}) + err = MarkForDeletion(ctx, log.NewNopLogger(), bkt, id, c) + testutil.Ok(t, err) + testutil.Equals(t, float64(tcase.blocksMarked), promtest.ToFloat64(c)) + }) } } diff --git a/pkg/block/fetcher.go b/pkg/block/fetcher.go index 0a8ca11196..7f21d4d801 100644 --- a/pkg/block/fetcher.go +++ b/pkg/block/fetcher.go @@ -189,8 +189,8 @@ func NewMetaFetcher(logger log.Logger, concurrency int, bkt objstore.Instrumente } // NewMetaFetcher transforms BaseFetcher into actually usable *MetaFetcher. -func (f *BaseFetcher) NewMetaFetcher(reg prometheus.Registerer, filters []MetadataFilter, modifiers []MetadataModifier) *MetaFetcher { - return &MetaFetcher{metrics: newFetcherMetrics(reg), wrapped: f, filters: filters, modifiers: modifiers} +func (f *BaseFetcher) NewMetaFetcher(reg prometheus.Registerer, filters []MetadataFilter, modifiers []MetadataModifier, logTags ...interface{}) *MetaFetcher { + return &MetaFetcher{metrics: newFetcherMetrics(reg), wrapped: f, filters: filters, modifiers: modifiers, logger: log.With(f.logger, logTags...)} } var ( @@ -448,7 +448,7 @@ func (f *BaseFetcher) fetch(ctx context.Context, metrics *fetcherMetrics, filter return metas, resp.partial, errors.Wrap(resp.metaErrs, "incomplete view") } - level.Debug(f.logger).Log("msg", "successfully fetched block metadata", "duration", time.Since(start).String(), "cached", len(f.cached), "returned", len(metas), "partial", len(resp.partial)) + level.Info(f.logger).Log("msg", "successfully synchronized block metadata", "duration", time.Since(start).String(), "cached", len(f.cached), "returned", len(metas), "partial", len(resp.partial)) return metas, resp.partial, nil } @@ -460,6 +460,8 @@ type MetaFetcher struct { modifiers []MetadataModifier listener func([]metadata.Meta, error) + + logger log.Logger } // Fetch returns all block metas as well as partial blocks (blocks without or with corrupted meta file) from the bucket. diff --git a/pkg/compact/blocks_cleaner.go b/pkg/compact/blocks_cleaner.go index 820346a1bf..7381505118 100644 --- a/pkg/compact/blocks_cleaner.go +++ b/pkg/compact/blocks_cleaner.go @@ -37,7 +37,8 @@ func NewBlocksCleaner(logger log.Logger, bkt objstore.Bucket, ignoreDeletionMark } } -// DeleteMarkedBlocks uses ignoreDeletionMarkFilter to delete the blocks that are marked for deletion. +// DeleteMarkedBlocks uses ignoreDeletionMarkFilter to gather the blocks that are marked for deletion and deletes those +// if older than given deleteDelay. func (s *BlocksCleaner) DeleteMarkedBlocks(ctx context.Context) error { level.Info(s.logger).Log("msg", "started cleaning of blocks marked for deletion") diff --git a/pkg/compact/clean.go b/pkg/compact/clean.go index 14fdfc4d7c..53cfb16908 100644 --- a/pkg/compact/clean.go +++ b/pkg/compact/clean.go @@ -21,12 +21,15 @@ const ( PartialUploadThresholdAge = 2 * 24 * time.Hour ) -func BestEffortCleanAbortedPartialUploads(ctx context.Context, logger log.Logger, fetcher block.MetadataFetcher, bkt objstore.Bucket, deleteAttempts prometheus.Counter, blocksMarkedForDeletion prometheus.Counter) { +func BestEffortCleanAbortedPartialUploads( + ctx context.Context, + logger log.Logger, + partial map[ulid.ULID]error, + bkt objstore.Bucket, + deleteAttempts prometheus.Counter, + blocksMarkedForDeletion prometheus.Counter, +) { level.Info(logger).Log("msg", "started cleaning of aborted partial uploads") - _, partial, err := fetcher.Fetch(ctx) - if err != nil { - level.Warn(logger).Log("msg", "failed to fetch metadata for cleaning of aborted partial uploads; skipping", "err", err) - } // Delete partial blocks that are older than partialUploadThresholdAge. // TODO(bwplotka): This is can cause data loss if blocks are: @@ -41,11 +44,11 @@ func BestEffortCleanAbortedPartialUploads(ctx context.Context, logger log.Logger } deleteAttempts.Inc() - if err := block.MarkForDeletion(ctx, logger, bkt, id); err != nil { + level.Info(logger).Log("msg", "found partially uploaded block; marking for deletion", "block", id) + if err := block.MarkForDeletion(ctx, logger, bkt, id, blocksMarkedForDeletion); err != nil { level.Warn(logger).Log("msg", "failed to delete aborted partial upload; skipping", "block", id, "thresholdAge", PartialUploadThresholdAge, "err", err) return } - blocksMarkedForDeletion.Inc() level.Info(logger).Log("msg", "deleted aborted partial upload", "block", id, "thresholdAge", PartialUploadThresholdAge) } level.Info(logger).Log("msg", "cleaning of aborted partial uploads done") diff --git a/pkg/compact/clean_test.go b/pkg/compact/clean_test.go index 7d80b03201..1321c888ba 100644 --- a/pkg/compact/clean_test.go +++ b/pkg/compact/clean_test.go @@ -60,7 +60,11 @@ func TestBestEffortCleanAbortedPartialUploads(t *testing.T) { deleteAttempts := promauto.With(nil).NewCounter(prometheus.CounterOpts{}) blocksMarkedForDeletion := promauto.With(nil).NewCounter(prometheus.CounterOpts{}) - BestEffortCleanAbortedPartialUploads(ctx, logger, metaFetcher, bkt, deleteAttempts, blocksMarkedForDeletion) + + _, partial, err := metaFetcher.Fetch(ctx) + testutil.Ok(t, err) + + BestEffortCleanAbortedPartialUploads(ctx, logger, partial, bkt, deleteAttempts, blocksMarkedForDeletion) testutil.Equals(t, 1.0, promtest.ToFloat64(deleteAttempts)) exists, err := bkt.Exists(ctx, path.Join(shouldDeleteID.String(), "chunks", "000001")) diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index 24d2c86ce1..c0bb41b964 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -46,6 +46,7 @@ type Syncer struct { fetcher block.MetadataFetcher mtx sync.Mutex blocks map[ulid.ULID]*metadata.Meta + partial map[ulid.ULID]error blockSyncConcurrency int metrics *syncerMetrics acceptMalformedIndex bool @@ -153,19 +154,36 @@ func UntilNextDownsampling(m *metadata.Meta) (time.Duration, error) { } } +// SyncMetas synchronises local state of block metas with what we have in the bucket. func (s *Syncer) SyncMetas(ctx context.Context) error { s.mtx.Lock() defer s.mtx.Unlock() - metas, _, err := s.fetcher.Fetch(ctx) + metas, partial, err := s.fetcher.Fetch(ctx) if err != nil { return retry(err) } s.blocks = metas - + s.partial = partial return nil } +// Partial returns partial blocks since last sync. +func (s *Syncer) Partial() map[ulid.ULID]error { + s.mtx.Lock() + defer s.mtx.Unlock() + + return s.partial +} + +// Metas returns loaded metadata blocks since last sync. +func (s *Syncer) Metas() map[ulid.ULID]*metadata.Meta { + s.mtx.Lock() + defer s.mtx.Unlock() + + return s.blocks +} + // GroupKey returns a unique identifier for the group the block belongs to. It considers // the downsampling resolution and the block's labels. func GroupKey(meta metadata.Thanos) string { @@ -228,16 +246,18 @@ func (s *Syncer) GarbageCollect(ctx context.Context) error { begin := time.Now() - duplicateIDs := s.duplicateBlocksFilter.DuplicateIDs() + // Ignore filter exists before deduplicate filter. deletionMarkMap := s.ignoreDeletionMarkFilter.DeletionMarkBlocks() + duplicateIDs := s.duplicateBlocksFilter.DuplicateIDs() // GarbageIDs contains the duplicateIDs, since these blocks can be replaced with other blocks. // We also remove ids present in deletionMarkMap since these blocks are already marked for deletion. garbageIDs := []ulid.ULID{} for _, id := range duplicateIDs { - if _, exists := deletionMarkMap[id]; !exists { - garbageIDs = append(garbageIDs, id) + if _, exists := deletionMarkMap[id]; exists { + continue } + garbageIDs = append(garbageIDs, id) } for _, id := range garbageIDs { @@ -249,14 +269,12 @@ func (s *Syncer) GarbageCollect(ctx context.Context) error { delCtx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) level.Info(s.logger).Log("msg", "marking outdated block for deletion", "block", id) - - err := block.MarkForDeletion(delCtx, s.logger, s.bkt, id) + err := block.MarkForDeletion(delCtx, s.logger, s.bkt, id, s.metrics.blocksMarkedForDeletion) cancel() if err != nil { s.metrics.garbageCollectionFailures.Inc() - return retry(errors.Wrapf(err, "delete block %s from bucket", id)) + return retry(errors.Wrapf(err, "mark block %s for deletion", id)) } - s.metrics.blocksMarkedForDeletion.Inc() // Immediately update our in-memory state so no further call to SyncMetas is needed // after running garbage collection. @@ -568,10 +586,9 @@ func RepairIssue347(ctx context.Context, logger log.Logger, bkt objstore.Bucket, defer cancel() // TODO(bplotka): Issue with this will introduce overlap that will halt compactor. Automate that (fix duplicate overlaps caused by this). - if err := block.MarkForDeletion(delCtx, logger, bkt, ie.id); err != nil { + if err := block.MarkForDeletion(delCtx, logger, bkt, ie.id, blocksMarkedForDeletion); err != nil { return errors.Wrapf(err, "deleting old block %s failed. You need to delete this block manually", ie.id) } - blocksMarkedForDeletion.Inc() return nil } @@ -702,7 +719,7 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) ( if overlappingBlocks { cg.verticalCompactions.Inc() } - level.Info(cg.logger).Log("msg", "compacted blocks", + level.Info(cg.logger).Log("msg", "compacted blocks", "new", compID, "blocks", fmt.Sprintf("%v", plan), "duration", time.Since(begin), "overlapping_blocks", overlappingBlocks) bdir := filepath.Join(dir, compID.String()) @@ -773,10 +790,9 @@ func (cg *Group) deleteBlock(b string) error { delCtx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) defer cancel() level.Info(cg.logger).Log("msg", "marking compacted block for deletion", "old_block", id) - if err := block.MarkForDeletion(delCtx, cg.logger, cg.bkt, id); err != nil { + if err := block.MarkForDeletion(delCtx, cg.logger, cg.bkt, id, cg.blocksMarkedForDeletion); err != nil { return errors.Wrapf(err, "delete block %s from bucket", id) } - cg.blocksMarkedForDeletion.Inc() return nil } @@ -869,29 +885,26 @@ func (c *BucketCompactor) Compact(ctx context.Context) error { } level.Info(c.logger).Log("msg", "start sync of metas") - if err := c.sy.SyncMetas(ctx); err != nil { return errors.Wrap(err, "sync") } level.Info(c.logger).Log("msg", "start of GC") - // Blocks that were compacted are garbage collected after each Compaction. // However if compactor crashes we need to resolve those on startup. if err := c.sy.GarbageCollect(ctx); err != nil { return errors.Wrap(err, "garbage") } - level.Info(c.logger).Log("msg", "start of compaction") - groups, err := c.sy.Groups() if err != nil { return errors.Wrap(err, "build compaction groups") } + level.Info(c.logger).Log("msg", "start of compactions") + // Send all groups found during this pass to the compaction workers. var groupErrs terrors.MultiError - groupLoop: for _, g := range groups { select { diff --git a/pkg/compact/retention.go b/pkg/compact/retention.go index eb07507d9d..8d1ba7d5fb 100644 --- a/pkg/compact/retention.go +++ b/pkg/compact/retention.go @@ -9,21 +9,25 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" + "github.com/oklog/ulid" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/thanos-io/thanos/pkg/block" + "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/objstore" ) // ApplyRetentionPolicyByResolution removes blocks depending on the specified retentionByResolution based on blocks MaxTime. // A value of 0 disables the retention for its resolution. -func ApplyRetentionPolicyByResolution(ctx context.Context, logger log.Logger, bkt objstore.Bucket, fetcher block.MetadataFetcher, retentionByResolution map[ResolutionLevel]time.Duration, blocksMarkedForDeletion prometheus.Counter) error { +func ApplyRetentionPolicyByResolution( + ctx context.Context, + logger log.Logger, + bkt objstore.Bucket, + metas map[ulid.ULID]*metadata.Meta, + retentionByResolution map[ResolutionLevel]time.Duration, + blocksMarkedForDeletion prometheus.Counter, +) error { level.Info(logger).Log("msg", "start optional retention") - metas, _, err := fetcher.Fetch(ctx) - if err != nil { - return errors.Wrap(err, "fetch metas") - } - for id, m := range metas { retentionDuration := retentionByResolution[ResolutionLevel(m.Thanos.Downsample.Resolution)] if retentionDuration.Seconds() == 0 { @@ -33,13 +37,11 @@ func ApplyRetentionPolicyByResolution(ctx context.Context, logger log.Logger, bk maxTime := time.Unix(m.MaxTime/1000, 0) if time.Now().After(maxTime.Add(retentionDuration)) { level.Info(logger).Log("msg", "applying retention: marking block for deletion", "id", id, "maxTime", maxTime.String()) - if err := block.MarkForDeletion(ctx, logger, bkt, id); err != nil { + if err := block.MarkForDeletion(ctx, logger, bkt, id, blocksMarkedForDeletion); err != nil { return errors.Wrap(err, "delete block") } - blocksMarkedForDeletion.Inc() } } - level.Info(logger).Log("msg", "optional retention apply done") return nil } diff --git a/pkg/compact/retention_test.go b/pkg/compact/retention_test.go index ca27429890..8d3cee397d 100644 --- a/pkg/compact/retention_test.go +++ b/pkg/compact/retention_test.go @@ -248,7 +248,11 @@ func TestApplyRetentionPolicyByResolution(t *testing.T) { testutil.Ok(t, err) blocksMarkedForDeletion := promauto.With(nil).NewCounter(prometheus.CounterOpts{}) - if err := compact.ApplyRetentionPolicyByResolution(ctx, logger, bkt, metaFetcher, tt.retentionByResolution, blocksMarkedForDeletion); (err != nil) != tt.wantErr { + + metas, _, err := metaFetcher.Fetch(ctx) + testutil.Ok(t, err) + + if err := compact.ApplyRetentionPolicyByResolution(ctx, logger, bkt, metas, tt.retentionByResolution, blocksMarkedForDeletion); (err != nil) != tt.wantErr { t.Errorf("ApplyRetentionPolicyByResolution() error = %v, wantErr %v", err, tt.wantErr) } diff --git a/pkg/verifier/safe_delete.go b/pkg/verifier/safe_delete.go index b124317e29..5e5e5e5316 100644 --- a/pkg/verifier/safe_delete.go +++ b/pkg/verifier/safe_delete.go @@ -80,11 +80,9 @@ func BackupAndDelete(ctx context.Context, logger log.Logger, bkt, backupBkt objs } level.Info(logger).Log("msg", "Marking block as deleted", "id", id.String()) - if err := block.MarkForDeletion(ctx, logger, bkt, id); err != nil { + if err := block.MarkForDeletion(ctx, logger, bkt, id, blocksMarkedForDeletion); err != nil { return errors.Wrap(err, "marking delete from source") } - blocksMarkedForDeletion.Inc() - return nil } @@ -119,10 +117,9 @@ func BackupAndDeleteDownloaded(ctx context.Context, logger log.Logger, bdir stri } level.Info(logger).Log("msg", "Marking block as deleted", "id", id.String()) - if err := block.MarkForDeletion(ctx, logger, bkt, id); err != nil { + if err := block.MarkForDeletion(ctx, logger, bkt, id, blocksMarkedForDeletion); err != nil { return errors.Wrap(err, "marking delete from source") } - blocksMarkedForDeletion.Inc() return nil }