From 8fe44496cc49a9f61d9ea417d0f8ba51386bedc4 Mon Sep 17 00:00:00 2001 From: khyatisoneji Date: Fri, 14 Feb 2020 17:56:13 +0530 Subject: [PATCH] compact: add schedule-delete and delete-delay Signed-off-by: khyatisoneji --- cmd/thanos/compact.go | 37 ++++++++++-- docs/components/compact.md | 15 ++--- pkg/block/block.go | 28 +++++++++ pkg/block/metadata/compactormeta.go | 15 +++++ pkg/compact/clean.go | 2 +- pkg/compact/clean_test.go | 6 +- pkg/compact/compact.go | 6 +- pkg/compact/compact_e2e_test.go | 11 +++- pkg/compact/schedule_delete.go | 92 +++++++++++++++++++++++++++++ 9 files changed, 195 insertions(+), 17 deletions(-) create mode 100644 pkg/block/metadata/compactormeta.go create mode 100644 pkg/compact/schedule_delete.go diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index b0ea2f346cd..7117446b4b5 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -121,6 +121,9 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application) { compactionConcurrency := cmd.Flag("compact.concurrency", "Number of goroutines to use when compacting groups."). Default("1").Int() + deleteDelay := modelDuration(cmd.Flag("delete-delay", fmt.Sprintf("Time before a block marked for deletion is deleted from bucket")). + Default("15m")) + selectorRelabelConf := regSelectorRelabelFlags(cmd) m[component.Compact.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, _ bool) error { @@ -130,6 +133,7 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application) { *dataDir, objStoreConfig, time.Duration(*consistencyDelay), + time.Duration(*deleteDelay), *haltOnError, *acceptMalformedIndex, *wait, @@ -158,6 +162,7 @@ func runCompact( dataDir string, objStoreConfig *extflag.PathOrContent, consistencyDelay time.Duration, + deleteDelay time.Duration, haltOnError bool, acceptMalformedIndex bool, wait bool, @@ -187,7 +192,13 @@ func runCompact( Name: "thanos_compactor_aborted_partial_uploads_deletion_attempts_total", Help: "Total number of started deletions of blocks that are assumed aborted and only partially uploaded.", }) - reg.MustRegister(halted, retried, iterations, partialUploadDeleteAttempts) + deleteDelayMetric := prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Name: "thanos_delete_delay_seconds", + Help: "Configured delete delay in seconds.", + }, func() float64 { + return deleteDelay.Seconds() + }) + reg.MustRegister(halted, retried, iterations, partialUploadDeleteAttempts, deleteDelayMetric) downsampleMetrics := newDownsampleMetrics(reg) @@ -275,9 +286,10 @@ func runCompact( } var ( - compactDir = path.Join(dataDir, "compact") - downsamplingDir = path.Join(dataDir, "downsample") - indexCacheDir = path.Join(dataDir, "index_cache") + compactDir = path.Join(dataDir, "compact") + compactorMetaDir = path.Join(dataDir, "compactor-metas") + downsamplingDir = path.Join(dataDir, "downsample") + indexCacheDir = path.Join(dataDir, "index_cache") ) if err := os.RemoveAll(downsamplingDir); err != nil { @@ -285,6 +297,7 @@ func runCompact( return errors.Wrap(err, "clean working downsample directory") } + blockDeletionScheduler := compact.NewScheduleBlockDelete(logger, compactorMetaDir, bkt, deleteDelay) compactor, err := compact.NewBucketCompactor(logger, sy, comp, compactDir, bkt, concurrency) if err != nil { cancel() @@ -334,6 +347,22 @@ func runCompact( return nil } + g.Add(func() error { + if !wait { + return blockDeletionScheduler.ScheduleDelete(ctx) + } + + // --wait=true is specified. + return runutil.Repeat(deleteDelay, ctx.Done(), func() error { + if err := blockDeletionScheduler.ScheduleDelete(ctx); err != nil { + return errors.Wrap(err, "error cleaning blocks") + } + return nil + }) + }, func(error) { + cancel() + }) + g.Add(func() error { defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client") diff --git a/docs/components/compact.md b/docs/components/compact.md index edd71eb7b11..fbff1264773 100644 --- a/docs/components/compact.md +++ b/docs/components/compact.md @@ -11,8 +11,8 @@ It is generally not semantically concurrency safe and must be deployed as a sing It is also responsible for downsampling of data: -* creating 5m downsampling for blocks larger than **40 hours** (2d, 2w) -* creating 1h downsampling for blocks larger than **10 days** (2w). +- creating 5m downsampling for blocks larger than **40 hours** (2d, 2w) +- creating 1h downsampling for blocks larger than **10 days** (2w). Example: @@ -35,9 +35,9 @@ On-disk data is safe to delete between restarts and should be the first attempt Resolution - distance between data points on your graphs. E.g. -* raw - the same as scrape interval at the moment of data ingestion -* 5m - data point is every 5 minutes -* 1h - data point is every 1h +- raw - the same as scrape interval at the moment of data ingestion +- 5m - data point is every 5 minutes +- 1h - data point is every 1h Keep in mind, that the initial goal of downsampling is not saving disk space (Read further for elaboration on storage space consumption). The goal of downsampling is providing an opportunity to get fast results for range queries of big time intervals like months or years. In other words, if you set `--retention.resolution-raw` less then `--retention.resolution-5m` and `--retention.resolution-1h` - you might run into a problem of not being able to "zoom in" to your historical data. @@ -66,7 +66,8 @@ compacting blocks from an instance even when a Prometheus instance goes down for ## Flags -[embedmd]:# (flags/compact.txt $) +[embedmd]: # "flags/compact.txt $" + ```$ usage: thanos compact [] @@ -144,5 +145,5 @@ Flags: selecting blocks. It follows native Prometheus relabel-config syntax. See format details: https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config - + --delete-delay=15m Time before a block marked for deletion is deleted from bucket. ``` diff --git a/pkg/block/block.go b/pkg/block/block.go index 740b3d63060..6769fbeabab 100644 --- a/pkg/block/block.go +++ b/pkg/block/block.go @@ -6,6 +6,7 @@ package block import ( + "bytes" "context" "encoding/json" "fmt" @@ -14,6 +15,7 @@ import ( "path" "path/filepath" "strings" + "time" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" @@ -126,6 +128,32 @@ func cleanUp(logger log.Logger, bkt objstore.Bucket, id ulid.ULID, err error) er return err } +// MarkForDeletion creates a file +// which stores information about when the block was marked for deletion. +func MarkForDeletion(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID) error { + compactorMetaExists, err := objstore.Exists(ctx, bkt, path.Join(id.String(), metadata.CompactorMetaFilename)) + if err != nil { + return errors.Wrap(err, fmt.Sprintf("check compactor meta for id %s in bucket", id.String())) + } + if compactorMetaExists { + level.Info(logger).Log("msg", "compactor-meta already exists for block", "id", id.String()) + return nil + } + + compactorMeta, err := json.Marshal(metadata.CompactorMeta{ + ID: id, + DeletionTime: time.Now().Unix(), + }) + if err != nil { + return errors.Wrap(err, "compactor meta json marshalling") + } + + if err := bkt.Upload(ctx, path.Join(id.String(), metadata.CompactorMetaFilename), bytes.NewReader(compactorMeta)); err != nil { + return errors.Wrap(err, "upload meta file to debug dir") + } + return nil +} + // Delete removes directory that is meant to be block directory. // NOTE: Always prefer this method for deleting blocks. // * We have to delete block's files in the certain order (meta.json first) diff --git a/pkg/block/metadata/compactormeta.go b/pkg/block/metadata/compactormeta.go new file mode 100644 index 00000000000..56f29c5211b --- /dev/null +++ b/pkg/block/metadata/compactormeta.go @@ -0,0 +1,15 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package metadata + +import "github.com/oklog/ulid" + +// CompactorMetaFilename is the known json filename to store compactor metadata. +const CompactorMetaFilename = "compactor-meta.json" + +// CompactorMeta stores block id and when block was marked for deletion. +type CompactorMeta struct { + ID ulid.ULID `json:"id"` + DeletionTime int64 `json:"deletion_time"` +} diff --git a/pkg/compact/clean.go b/pkg/compact/clean.go index af681a940e4..215b951854f 100644 --- a/pkg/compact/clean.go +++ b/pkg/compact/clean.go @@ -41,7 +41,7 @@ func BestEffortCleanAbortedPartialUploads(ctx context.Context, logger log.Logger } deleteAttempts.Inc() - if err := block.Delete(ctx, logger, bkt, id); err != nil { + if err := block.MarkForDeletion(ctx, logger, bkt, id); err != nil { level.Warn(logger).Log("msg", "failed to delete aborted partial upload; skipping", "block", id, "thresholdAge", PartialUploadThresholdAge, "err", err) return } diff --git a/pkg/compact/clean_test.go b/pkg/compact/clean_test.go index 85654f8c8a3..43cef1636e2 100644 --- a/pkg/compact/clean_test.go +++ b/pkg/compact/clean_test.go @@ -63,7 +63,11 @@ func TestBestEffortCleanAbortedPartialUploads(t *testing.T) { exists, err := bkt.Exists(ctx, path.Join(shouldDeleteID.String(), "chunks", "000001")) testutil.Ok(t, err) - testutil.Equals(t, false, exists) + testutil.Equals(t, true, exists) + + exists, err = bkt.Exists(ctx, path.Join(shouldDeleteID.String(), metadata.CompactorMetaFilename)) + testutil.Ok(t, err) + testutil.Equals(t, true, exists) exists, err = bkt.Exists(ctx, path.Join(shouldIgnoreID1.String(), "chunks", "000001")) testutil.Ok(t, err) diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index b268724e108..c3d6e6c45b8 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -245,7 +245,7 @@ func (s *Syncer) GarbageCollect(ctx context.Context) error { level.Info(s.logger).Log("msg", "deleting outdated block", "block", id) - err := block.Delete(delCtx, s.logger, s.bkt, id) + err := block.MarkForDeletion(delCtx, s.logger, s.bkt, id) cancel() if err != nil { s.metrics.garbageCollectionFailures.Inc() @@ -559,7 +559,7 @@ func RepairIssue347(ctx context.Context, logger log.Logger, bkt objstore.Bucket, defer cancel() // TODO(bplotka): Issue with this will introduce overlap that will halt compactor. Automate that (fix duplicate overlaps caused by this). - if err := block.Delete(delCtx, logger, bkt, ie.id); err != nil { + if err := block.MarkForDeletion(delCtx, logger, bkt, ie.id); err != nil { return errors.Wrapf(err, "deleting old block %s failed. You need to delete this block manually", ie.id) } @@ -761,7 +761,7 @@ func (cg *Group) deleteBlock(b string) error { delCtx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) defer cancel() level.Info(cg.logger).Log("msg", "deleting compacted block", "old_block", id) - if err := block.Delete(delCtx, cg.logger, cg.bkt, id); err != nil { + if err := block.MarkForDeletion(delCtx, cg.logger, cg.bkt, id); err != nil { return errors.Wrapf(err, "delete block %s from bucket", id) } return nil diff --git a/pkg/compact/compact_e2e_test.go b/pkg/compact/compact_e2e_test.go index 43ce2974341..f1950b786da 100644 --- a/pkg/compact/compact_e2e_test.go +++ b/pkg/compact/compact_e2e_test.go @@ -103,7 +103,16 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) { var rem []ulid.ULID err = bkt.Iter(ctx, "", func(n string) error { - rem = append(rem, ulid.MustParse(n[:len(n)-1])) + id := ulid.MustParse(n[:len(n)-1]) + compactorMetaFile := path.Join(id.String(), metadata.CompactorMetaFilename) + + exists, err := bkt.Exists(ctx, compactorMetaFile) + if err != nil { + return err + } + if !exists { + rem = append(rem, id) + } return nil }) testutil.Ok(t, err) diff --git a/pkg/compact/schedule_delete.go b/pkg/compact/schedule_delete.go new file mode 100644 index 00000000000..9e9849e9130 --- /dev/null +++ b/pkg/compact/schedule_delete.go @@ -0,0 +1,92 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package compact + +import ( + "context" + "encoding/json" + "io/ioutil" + "os" + "path" + "path/filepath" + "time" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/pkg/errors" + "github.com/thanos-io/thanos/pkg/block" + "github.com/thanos-io/thanos/pkg/block/metadata" + "github.com/thanos-io/thanos/pkg/objstore" + "github.com/thanos-io/thanos/pkg/runutil" +) + +// ScheduleBlockDelete marks the block to be deleted. +type ScheduleBlockDelete struct { + dir string + logger log.Logger + deleteDelay time.Duration + bkt objstore.Bucket +} + +// NewScheduleBlockDelete creates a new ScheduleBlockDelete. +func NewScheduleBlockDelete(logger log.Logger, dir string, bkt objstore.Bucket, deleteDelay time.Duration) *ScheduleBlockDelete { + return &ScheduleBlockDelete{ + dir: dir, + logger: logger, + deleteDelay: deleteDelay, + bkt: bkt, + } +} + +// ScheduleDelete deletes blocks from bucket +// deleteDelay duration after block is marked for deletion. +func (s *ScheduleBlockDelete) ScheduleDelete(ctx context.Context) error { + defer func() { + if err := os.RemoveAll(s.dir); err != nil { + level.Error(s.logger).Log("msg", "failed to remove compactor meta dir", s.dir, "err", err) + } + }() + + if err := os.RemoveAll(s.dir); err != nil { + return errors.Wrap(err, "clean compaction compactor meta dir") + } + + return s.bkt.Iter(ctx, "", func(name string) error { + id, ok := block.IsBlockDir(name) + if ok { + compactorMetaFile := path.Join(id.String(), metadata.CompactorMetaFilename) + + r, err := s.bkt.Get(ctx, compactorMetaFile) + if s.bkt.IsObjNotFoundErr(err) { + return nil + } + if err != nil { + return errors.Wrapf(err, "get compactor-meta file: %v", compactorMetaFile) + } + + defer runutil.CloseWithLogOnErr(s.logger, r, "close bkt compactor-meta get") + + metaContent, err := ioutil.ReadAll(r) + if err != nil { + return errors.Wrapf(err, "read compactor-meta file: %v", compactorMetaFile) + } + + compactorMeta := metadata.CompactorMeta{} + if err := json.Unmarshal(metaContent, &compactorMeta); err != nil { + return errors.Wrap(err, "unmarshal compactor meta") + } + + if time.Since(time.Unix(compactorMeta.DeletionTime, 0)) > s.deleteDelay { + if err := block.Delete(ctx, s.logger, s.bkt, compactorMeta.ID); err != nil { + return errors.Wrap(err, "delete block") + } + + if err := os.RemoveAll(filepath.Join(s.dir, compactorMeta.ID.String())); err != nil { + return errors.Wrap(err, "delete compactor-meta.json") + } + } + } + return nil + }) +}