From 69a045a0895b9280d52443dc2a489b96753a0517 Mon Sep 17 00:00:00 2001 From: Bartlomiej Plotka Date: Thu, 12 Nov 2020 15:59:31 +0100 Subject: [PATCH] tools: Added thanos bucket tool mark (#3415) * tools: Added thanos bucket tool mark Signed-off-by: Bartlomiej Plotka * Rebase. Signed-off-by: Bartlomiej Plotka * Fixed build errors. Signed-off-by: Bartlomiej Plotka --- cmd/thanos/tools_bucket.go | 51 +++++++++++++++++++++++++ docs/components/tools.md | 72 ++++++++++++++++++++++++++++++++++- pkg/block/block.go | 11 +++--- pkg/block/block_test.go | 8 ++-- pkg/block/metadata/markers.go | 12 +++--- pkg/compact/compact.go | 6 +-- pkg/compact/retention.go | 3 +- pkg/component/component.go | 1 + pkg/verifier/safe_delete.go | 4 +- scripts/genflagdocs.sh | 2 +- test/e2e/compact_test.go | 6 +-- 11 files changed, 151 insertions(+), 25 deletions(-) diff --git a/cmd/thanos/tools_bucket.go b/cmd/thanos/tools_bucket.go index a2e944a7cf..63b15b8bb3 100644 --- a/cmd/thanos/tools_bucket.go +++ b/cmd/thanos/tools_bucket.go @@ -73,6 +73,7 @@ func registerBucket(app extkingpin.AppClause) { registerBucketReplicate(cmd, objStoreConfig) registerBucketDownsample(cmd, objStoreConfig) registerBucketCleanup(cmd, objStoreConfig) + registerBucketMarkBlock(cmd, objStoreConfig) } func registerBucketVerify(app extkingpin.AppClause, objStoreConfig *extflag.PathOrContent) { @@ -710,3 +711,53 @@ func compare(s1, s2 string) bool { } return s1Time.Before(s2Time) } + +func registerBucketMarkBlock(app extkingpin.AppClause, objStoreConfig *extflag.PathOrContent) { + cmd := app.Command(component.Mark.String(), "Mark block for deletion or no-compact in a safe way. NOTE: If the compactor is currently running compacting same block, this operation would be potentially a noop.") + blockIDs := cmd.Flag("id", "ID (ULID) of the blocks to be marked for deletion (repeated flag)").Required().Strings() + marker := cmd.Flag("marker", "Marker to be put.").Required().Enum(metadata.DeletionMarkFilename, metadata.NoCompactMarkFilename) + details := cmd.Flag("details", "Human readable details to be put into marker.").Required().String() + cmd.Setup(func(g *run.Group, logger log.Logger, reg *prometheus.Registry, _ opentracing.Tracer, _ <-chan struct{}, _ bool) error { + confContentYaml, err := objStoreConfig.Content() + if err != nil { + return err + } + + bkt, err := client.NewBucket(logger, confContentYaml, reg, component.Cleanup.String()) + if err != nil { + return err + } + + var ids []ulid.ULID + for _, id := range *blockIDs { + u, err := ulid.Parse(id) + if err != nil { + return errors.Errorf("id is not a valid block ULID, got: %v", id) + } + ids = append(ids, u) + } + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) + g.Add(func() error { + for _, id := range ids { + switch *marker { + case metadata.DeletionMarkFilename: + if err := block.MarkForDeletion(ctx, logger, bkt, id, *details, promauto.With(nil).NewCounter(prometheus.CounterOpts{})); err != nil { + return errors.Wrapf(err, "mark %v for %v", id, *marker) + } + case metadata.NoCompactMarkFilename: + if err := block.MarkForNoCompact(ctx, logger, bkt, id, metadata.ManualNoCompactReason, *details, promauto.With(nil).NewCounter(prometheus.CounterOpts{})); err != nil { + return errors.Wrapf(err, "mark %v for %v", id, *marker) + } + default: + return errors.Errorf("not supported marker %v", *marker) + } + } + level.Info(logger).Log("msg", "marking done", "marker", *marker, "IDs", strings.Join(*blockIDs, ",")) + return nil + }, func(err error) { + cancel() + }) + return nil + }) +} diff --git a/docs/components/tools.md b/docs/components/tools.md index 2179f9d552..5ebcf95511 100644 --- a/docs/components/tools.md +++ b/docs/components/tools.md @@ -60,6 +60,11 @@ Subcommands: tools bucket cleanup [] Cleans up all blocks marked for deletion + tools bucket mark --id=ID --marker=MARKER --details=DETAILS + Mark block for deletion or no-compact in a safe way. NOTE: If the compactor + is currently running compacting same block, this operation would be + potentially a noop. + tools rules-check --rules=RULES Check if the rule files are valid or not. @@ -144,6 +149,11 @@ Subcommands: tools bucket cleanup [] Cleans up all blocks marked for deletion + tools bucket mark --id=ID --marker=MARKER --details=DETAILS + Mark block for deletion or no-compact in a safe way. NOTE: If the compactor + is currently running compacting same block, this operation would be + potentially a noop. + ``` @@ -493,7 +503,7 @@ Flags: ### Bucket downsample -`tools bucket downsample` is used to continuously downsample blocks in an object store bucket as a service. +`tools bucket downsample` is used to downsample blocks in an object store bucket as a service. It implements the downsample API on top of historical data in an object storage bucket. ```bash @@ -549,6 +559,66 @@ Flags: process downsamplings. ``` + +### Bucket mark + +`tools bucket mark` can be used to manually mark block for deletion. + +NOTE: If the [Compactor](compact.md) is currently running and compacting exactly same block, this operation would be potentially a noop." + +```bash +thanos tools bucket mark \ + --id "01C8320GCGEWBZF51Q46TTQEH9" --id "01C8J352831FXGZQMN2NTJ08DY" + --objstore.config-file "bucket.yml" +``` + +The example content of `bucket.yml`: + +```yaml +type: GCS +config: + bucket: example-bucket +``` + +[embedmd]:# (flags/tools_bucket_mark.txt $) +```$ +usage: thanos tools bucket mark --id=ID --marker=MARKER --details=DETAILS + +Mark block for deletion or no-compact in a safe way. NOTE: If the compactor is +currently running compacting same block, this operation would be potentially a +noop. + +Flags: + -h, --help Show context-sensitive help (also try --help-long and + --help-man). + --version Show application version. + --log.level=info Log filtering level. + --log.format=logfmt Log format to use. Possible options: logfmt or json. + --tracing.config-file= + Path to YAML file with tracing configuration. See + format details: + https://thanos.io/tip/thanos/tracing.md/#configuration + --tracing.config= + Alternative to 'tracing.config-file' flag (lower + priority). Content of YAML file with tracing + configuration. See format details: + https://thanos.io/tip/thanos/tracing.md/#configuration + --objstore.config-file= + Path to YAML file that contains object store + configuration. See format details: + https://thanos.io/tip/thanos/storage.md/#configuration + --objstore.config= + Alternative to 'objstore.config-file' flag (lower + priority). Content of YAML file that contains object + store configuration. See format details: + https://thanos.io/tip/thanos/storage.md/#configuration + --id=ID ... ID (ULID) of the blocks to be marked for deletion + (repeated flag) + --marker=MARKER Marker to be put. + --details=DETAILS Human readable details to be put into marker. + +``` + ## Rules-check The `tools rules-check` subcommand contains tools for validation of Prometheus rules. diff --git a/pkg/block/block.go b/pkg/block/block.go index 164f8f4ba7..c8e63d609b 100644 --- a/pkg/block/block.go +++ b/pkg/block/block.go @@ -133,7 +133,7 @@ func cleanUp(logger log.Logger, bkt objstore.Bucket, id ulid.ULID, err error) er } // MarkForDeletion creates a file which stores information about when the block was marked for deletion. -func MarkForDeletion(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID, markedForDeletion prometheus.Counter) error { +func MarkForDeletion(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID, details string, markedForDeletion prometheus.Counter) error { deletionMarkFile := path.Join(id.String(), metadata.DeletionMarkFilename) deletionMarkExists, err := bkt.Exists(ctx, deletionMarkFile) if err != nil { @@ -148,6 +148,7 @@ func MarkForDeletion(ctx context.Context, logger log.Logger, bkt objstore.Bucket ID: id, DeletionTime: time.Now().Unix(), Version: metadata.DeletionMarkVersion1, + Details: details, }) if err != nil { return errors.Wrap(err, "json encode deletion mark") @@ -286,7 +287,7 @@ func gatherFileStats(blockDir string) (res []metadata.File, _ error) { } // MarkForNoCompact creates a file which marks block to be not compacted. -func MarkForNoCompact(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID, reason metadata.NoCompactReason, noCompactDetails string, markedForNoCompact prometheus.Counter) error { +func MarkForNoCompact(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID, reason metadata.NoCompactReason, details string, markedForNoCompact prometheus.Counter) error { m := path.Join(id.String(), metadata.NoCompactMarkFilename) noCompactMarkExists, err := bkt.Exists(ctx, m) if err != nil { @@ -301,9 +302,9 @@ func MarkForNoCompact(ctx context.Context, logger log.Logger, bkt objstore.Bucke ID: id, Version: metadata.NoCompactMarkVersion1, - Time: time.Now().Unix(), - Reason: reason, - Details: noCompactDetails, + NoCompactTime: time.Now().Unix(), + Reason: reason, + Details: details, }) if err != nil { return errors.Wrap(err, "json encode no compact mark") diff --git a/pkg/block/block_test.go b/pkg/block/block_test.go index 15598eaa18..ee473f0b09 100644 --- a/pkg/block/block_test.go +++ b/pkg/block/block_test.go @@ -305,7 +305,7 @@ func TestMarkForDeletion(t *testing.T) { testutil.Ok(t, Upload(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, id.String()))) c := promauto.With(nil).NewCounter(prometheus.CounterOpts{}) - err = MarkForDeletion(ctx, log.NewNopLogger(), bkt, id, c) + err = MarkForDeletion(ctx, log.NewNopLogger(), bkt, id, "", c) testutil.Ok(t, err) testutil.Equals(t, float64(tcase.blocksMarked), promtest.ToFloat64(c)) }) @@ -335,9 +335,9 @@ func TestMarkForNoCompact(t *testing.T) { name: "block with no-compact mark already, expected log and no metric increment", preUpload: func(t testing.TB, id ulid.ULID, bkt objstore.Bucket) { m, err := json.Marshal(metadata.NoCompactMark{ - ID: id, - Time: time.Now().Unix(), - Version: metadata.NoCompactMarkVersion1, + ID: id, + NoCompactTime: time.Now().Unix(), + Version: metadata.NoCompactMarkVersion1, }) testutil.Ok(t, err) testutil.Ok(t, bkt.Upload(ctx, path.Join(id.String(), metadata.NoCompactMarkFilename), bytes.NewReader(m))) diff --git a/pkg/block/metadata/markers.go b/pkg/block/metadata/markers.go index 494544be36..b3c8b9d1f0 100644 --- a/pkg/block/metadata/markers.go +++ b/pkg/block/metadata/markers.go @@ -49,6 +49,8 @@ type DeletionMark struct { ID ulid.ULID `json:"id"` // Version of the file. Version int `json:"version"` + // Details is a human readable string giving details of reason. + Details string `json:"details,omitempty"` // DeletionTime is a unix timestamp of when the block was marked to be deleted. DeletionTime int64 `json:"deletion_time"` @@ -73,12 +75,12 @@ type NoCompactMark struct { ID ulid.ULID `json:"id"` // Version of the file. Version int `json:"version"` - - // Time is a unix timestamp of when the block was marked for no compact. - Time int64 `json:"time"` - Reason NoCompactReason `json:"reason"` // Details is a human readable string giving details of reason. - Details string `json:"details"` + Details string `json:"details,omitempty"` + + // NoCompactTime is a unix timestamp of when the block was marked for no compact. + NoCompactTime int64 `json:"no_compact_time"` + Reason NoCompactReason `json:"reason"` } func (n *NoCompactMark) markerFilename() string { return NoCompactMarkFilename } diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index 4cce1cc8b0..5fdec0b3b7 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -183,7 +183,7 @@ func (s *Syncer) GarbageCollect(ctx context.Context) error { delCtx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) level.Info(s.logger).Log("msg", "marking outdated block for deletion", "block", id) - err := block.MarkForDeletion(delCtx, s.logger, s.bkt, id, s.metrics.blocksMarkedForDeletion) + err := block.MarkForDeletion(delCtx, s.logger, s.bkt, id, "outdated block", s.metrics.blocksMarkedForDeletion) cancel() if err != nil { s.metrics.garbageCollectionFailures.Inc() @@ -671,7 +671,7 @@ func RepairIssue347(ctx context.Context, logger log.Logger, bkt objstore.Bucket, defer cancel() // TODO(bplotka): Issue with this will introduce overlap that will halt compactor. Automate that (fix duplicate overlaps caused by this). - if err := block.MarkForDeletion(delCtx, logger, bkt, ie.id, blocksMarkedForDeletion); err != nil { + if err := block.MarkForDeletion(delCtx, logger, bkt, ie.id, "source of repaired block", blocksMarkedForDeletion); err != nil { return errors.Wrapf(err, "marking old block %s for deletion has failed", ie.id) } return nil @@ -830,7 +830,7 @@ func (cg *Group) deleteBlock(id ulid.ULID, bdir string) error { delCtx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) defer cancel() level.Info(cg.logger).Log("msg", "marking compacted block for deletion", "old_block", id) - if err := block.MarkForDeletion(delCtx, cg.logger, cg.bkt, id, cg.blocksMarkedForDeletion); err != nil { + if err := block.MarkForDeletion(delCtx, cg.logger, cg.bkt, id, "source of compacted block", cg.blocksMarkedForDeletion); err != nil { return errors.Wrapf(err, "mark block %s for deletion from bucket", id) } return nil diff --git a/pkg/compact/retention.go b/pkg/compact/retention.go index 8d1ba7d5fb..703bad5dda 100644 --- a/pkg/compact/retention.go +++ b/pkg/compact/retention.go @@ -5,6 +5,7 @@ package compact import ( "context" + "fmt" "time" "github.com/go-kit/kit/log" @@ -37,7 +38,7 @@ func ApplyRetentionPolicyByResolution( maxTime := time.Unix(m.MaxTime/1000, 0) if time.Now().After(maxTime.Add(retentionDuration)) { level.Info(logger).Log("msg", "applying retention: marking block for deletion", "id", id, "maxTime", maxTime.String()) - if err := block.MarkForDeletion(ctx, logger, bkt, id, blocksMarkedForDeletion); err != nil { + if err := block.MarkForDeletion(ctx, logger, bkt, id, fmt.Sprintf("block exceeding retention of %v", retentionDuration), blocksMarkedForDeletion); err != nil { return errors.Wrap(err, "delete block") } } diff --git a/pkg/component/component.go b/pkg/component/component.go index c7451c4767..a418b9461f 100644 --- a/pkg/component/component.go +++ b/pkg/component/component.go @@ -91,6 +91,7 @@ func FromProto(storeType storepb.StoreType) StoreAPI { var ( Bucket = source{component: component{name: "bucket"}} Cleanup = source{component: component{name: "cleanup"}} + Mark = source{component: component{name: "mark"}} Compact = source{component: component{name: "compact"}} Downsample = source{component: component{name: "downsample"}} Replicate = source{component: component{name: "replicate"}} diff --git a/pkg/verifier/safe_delete.go b/pkg/verifier/safe_delete.go index 337eb0c84a..30b38562ac 100644 --- a/pkg/verifier/safe_delete.go +++ b/pkg/verifier/safe_delete.go @@ -78,7 +78,7 @@ func BackupAndDelete(ctx context.Context, conf Config, id ulid.ULID) error { } level.Info(conf.Logger).Log("msg", "Marking block as deleted", "id", id.String()) - if err := block.MarkForDeletion(ctx, conf.Logger, conf.Bkt, id, conf.metrics.blocksMarkedForDeletion); err != nil { + if err := block.MarkForDeletion(ctx, conf.Logger, conf.Bkt, id, "manual verify-repair", conf.metrics.blocksMarkedForDeletion); err != nil { return errors.Wrap(err, "marking delete from source") } return nil @@ -115,7 +115,7 @@ func BackupAndDeleteDownloaded(ctx context.Context, conf Config, bdir string, id } level.Info(conf.Logger).Log("msg", "Marking block as deleted", "id", id.String()) - if err := block.MarkForDeletion(ctx, conf.Logger, conf.Bkt, id, conf.metrics.blocksMarkedForDeletion); err != nil { + if err := block.MarkForDeletion(ctx, conf.Logger, conf.Bkt, id, "manual verify-repair", conf.metrics.blocksMarkedForDeletion); err != nil { return errors.Wrap(err, "marking delete from source") } return nil diff --git a/scripts/genflagdocs.sh b/scripts/genflagdocs.sh index 3dc4cbba40..b661e03002 100755 --- a/scripts/genflagdocs.sh +++ b/scripts/genflagdocs.sh @@ -47,7 +47,7 @@ for x in "${toolsCommands[@]}"; do ${THANOS_BIN} tools "${x}" --help &>"docs/components/flags/tools_${x}.txt" done -toolsBucketCommands=("verify" "ls" "inspect" "web" "replicate" "downsample" "cleanup") +toolsBucketCommands=("verify" "ls" "inspect" "web" "replicate" "downsample" "cleanup" "mark") for x in "${toolsBucketCommands[@]}"; do ${THANOS_BIN} tools bucket "${x}" --help &>"docs/components/flags/tools_bucket_${x}.txt" done diff --git a/test/e2e/compact_test.go b/test/e2e/compact_test.go index b8af42b5e7..eaa813ef50 100644 --- a/test/e2e/compact_test.go +++ b/test/e2e/compact_test.go @@ -372,7 +372,7 @@ func TestCompactWithStoreGateway(t *testing.T) { id, err = malformedBase.Create(ctx, dir, 0*time.Second) testutil.Ok(t, err) testutil.Ok(t, os.Remove(path.Join(dir, id.String(), metadata.MetaFilename))) - testutil.Ok(t, block.MarkForDeletion(ctx, logger, bkt, id, promauto.With(nil).NewCounter(prometheus.CounterOpts{}))) + testutil.Ok(t, block.MarkForDeletion(ctx, logger, bkt, id, "", promauto.With(nil).NewCounter(prometheus.CounterOpts{}))) testutil.Ok(t, objstore.UploadDir(ctx, logger, bkt, path.Join(dir, id.String()), id.String())) // Partial block after consistency delay. @@ -385,7 +385,7 @@ func TestCompactWithStoreGateway(t *testing.T) { id, err = malformedBase.Create(ctx, dir, justAfterConsistencyDelay) testutil.Ok(t, err) testutil.Ok(t, os.Remove(path.Join(dir, id.String(), metadata.MetaFilename))) - testutil.Ok(t, block.MarkForDeletion(ctx, logger, bkt, id, promauto.With(nil).NewCounter(prometheus.CounterOpts{}))) + testutil.Ok(t, block.MarkForDeletion(ctx, logger, bkt, id, "", promauto.With(nil).NewCounter(prometheus.CounterOpts{}))) testutil.Ok(t, objstore.UploadDir(ctx, logger, bkt, path.Join(dir, id.String()), id.String())) // Partial block after consistency delay + old deletion mark ready to be deleted. @@ -412,7 +412,7 @@ func TestCompactWithStoreGateway(t *testing.T) { id, err = malformedBase.Create(ctx, dir, 50*time.Hour) testutil.Ok(t, err) testutil.Ok(t, os.Remove(path.Join(dir, id.String(), metadata.MetaFilename))) - testutil.Ok(t, block.MarkForDeletion(ctx, logger, bkt, id, promauto.With(nil).NewCounter(prometheus.CounterOpts{}))) + testutil.Ok(t, block.MarkForDeletion(ctx, logger, bkt, id, "", promauto.With(nil).NewCounter(prometheus.CounterOpts{}))) testutil.Ok(t, objstore.UploadDir(ctx, logger, bkt, path.Join(dir, id.String()), id.String())) }