diff --git a/CHANGELOG.md b/CHANGELOG.md index 9c954efa65a..d59c4b35665 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -29,6 +29,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#5889](https://github.com/thanos-io/thanos/pull/5889) Query Frontend: Support sharding vertical sharding `label_replace` and `label_join` functions. - [#5819](https://github.com/thanos-io/thanos/pull/5819) Store: Add a few objectives for Store's data touched/fetched amount and sizes. They are: 50, 95, and 99 quantiles. - [#5940](https://github.com/thanos-io/thanos/pull/5940) Objstore: Support for authenticating to Swift using application credentials. +- [#5977](https://github.com/thanos-io/thanos/pull/5977) Tools: Added remove flag on bucket mark command to remove deletion, no-downsample or no-compact markers on the block. ### Changed diff --git a/cmd/thanos/tools_bucket.go b/cmd/thanos/tools_bucket.go index bc48a56ff76..66939853b91 100644 --- a/cmd/thanos/tools_bucket.go +++ b/cmd/thanos/tools_bucket.go @@ -147,9 +147,10 @@ type bucketRetentionConfig struct { } type bucketMarkBlockConfig struct { - details string - marker string - blockIDs []string + details string + marker string + blockIDs []string + removeMarker bool } func (tbc *bucketVerifyConfig) registerBucketVerifyFlag(cmd extkingpin.FlagClause) *bucketVerifyConfig { @@ -239,8 +240,8 @@ func (tbc *bucketDownsampleConfig) registerBucketDownsampleFlag(cmd extkingpin.F func (tbc *bucketMarkBlockConfig) registerBucketMarkBlockFlag(cmd extkingpin.FlagClause) *bucketMarkBlockConfig { cmd.Flag("id", "ID (ULID) of the blocks to be marked for deletion (repeated flag)").Required().StringsVar(&tbc.blockIDs) cmd.Flag("marker", "Marker to be put.").Required().EnumVar(&tbc.marker, metadata.DeletionMarkFilename, metadata.NoCompactMarkFilename, metadata.NoDownsampleMarkFilename) - cmd.Flag("details", "Human readable details to be put into marker.").Required().StringVar(&tbc.details) - + cmd.Flag("details", "Human readable details to be put into marker.").StringVar(&tbc.details) + cmd.Flag("remove", "Remove the marker.").Default("false").BoolVar(&tbc.removeMarker) return tbc } @@ -1047,9 +1048,20 @@ func registerBucketMarkBlock(app extkingpin.AppClause, objStoreConfig *extflag.P ids = append(ids, u) } + if !tbc.removeMarker && tbc.details == "" { + return errors.Errorf("required flag --details not provided") + } + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) g.Add(func() error { for _, id := range ids { + if tbc.removeMarker { + err := block.RemoveMark(ctx, logger, bkt, id, promauto.With(nil).NewCounter(prometheus.CounterOpts{}), tbc.marker) + if err != nil { + return errors.Wrapf(err, "remove mark %v for %v", id, tbc.marker) + } + continue + } switch tbc.marker { case metadata.DeletionMarkFilename: if err := block.MarkForDeletion(ctx, logger, bkt, id, tbc.details, promauto.With(nil).NewCounter(prometheus.CounterOpts{})); err != nil { diff --git a/docs/components/tools.md b/docs/components/tools.md index b137f1e40ac..bf78655bdd9 100644 --- a/docs/components/tools.md +++ b/docs/components/tools.md @@ -52,7 +52,7 @@ Subcommands: tools bucket cleanup [] Cleans up all blocks marked for deletion. - tools bucket mark --id=ID --marker=MARKER --details=DETAILS + tools bucket mark --id=ID --marker=MARKER [] Mark block for deletion or no-compact in a safe way. NOTE: If the compactor is currently running compacting same block, this operation would be potentially a noop. @@ -161,7 +161,7 @@ Subcommands: tools bucket cleanup [] Cleans up all blocks marked for deletion. - tools bucket mark --id=ID --marker=MARKER --details=DETAILS + tools bucket mark --id=ID --marker=MARKER [] Mark block for deletion or no-compact in a safe way. NOTE: If the compactor is currently running compacting same block, this operation would be potentially a noop. @@ -681,7 +681,7 @@ prefix: "" ``` ```$ mdox-exec="thanos tools bucket mark --help" -usage: thanos tools bucket mark --id=ID --marker=MARKER --details=DETAILS +usage: thanos tools bucket mark --id=ID --marker=MARKER [] Mark block for deletion or no-compact in a safe way. NOTE: If the compactor is currently running compacting same block, this operation would be potentially a @@ -705,6 +705,7 @@ Flags: Path to YAML file that contains object store configuration. See format details: https://thanos.io/tip/thanos/storage.md/#configuration + --remove Remove the marker. --tracing.config= Alternative to 'tracing.config-file' flag (mutually exclusive). Content of YAML file diff --git a/pkg/block/block.go b/pkg/block/block.go index 5edf27f2423..41625dc17d7 100644 --- a/pkg/block/block.go +++ b/pkg/block/block.go @@ -433,3 +433,22 @@ func MarkForNoDownsample(ctx context.Context, logger log.Logger, bkt objstore.Bu level.Info(logger).Log("msg", "block has been marked for no downsample", "block", id) return nil } + +// RemoveMark removes the file which marked the block for deletion, no-downsample or no-compact. +func RemoveMark(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID, removeMark prometheus.Counter, markedFilename string) error { + markedFile := path.Join(id.String(), markedFilename) + markedFileExists, err := bkt.Exists(ctx, markedFile) + if err != nil { + return errors.Wrapf(err, "check if %s file exists in bucket", markedFile) + } + if !markedFileExists { + level.Warn(logger).Log("msg", "requested to remove the mark, but file does not exist", "err", errors.Errorf("file %s does not exist in bucket", markedFile)) + return nil + } + if err := bkt.Delete(ctx, markedFile); err != nil { + return errors.Wrapf(err, "delete file %s from bucket", markedFile) + } + removeMark.Inc() + level.Info(logger).Log("msg", "mark has been removed from the block", "block", id) + return nil +} diff --git a/pkg/block/block_test.go b/pkg/block/block_test.go index d48543d0551..a6bf733d6b4 100644 --- a/pkg/block/block_test.go +++ b/pkg/block/block_test.go @@ -593,3 +593,144 @@ func (eb errBucket) Upload(ctx context.Context, name string, r io.Reader) error } return nil } + +func TestRemoveMarkForDeletion(t *testing.T) { + defer custom.TolerantVerifyLeak(t) + ctx := context.Background() + tmpDir := t.TempDir() + for _, testcases := range []struct { + name string + preDelete func(t testing.TB, id ulid.ULID, bkt objstore.Bucket) + blocksUnmarked int + }{ + { + name: "unmarked block for deletion", + preDelete: func(t testing.TB, id ulid.ULID, bkt objstore.Bucket) { + deletionMark, err := json.Marshal(metadata.DeletionMark{ + ID: id, + DeletionTime: time.Now().Unix(), + Version: metadata.DeletionMarkVersion1, + }) + testutil.Ok(t, err) + testutil.Ok(t, bkt.Upload(ctx, path.Join(id.String(), metadata.DeletionMarkFilename), bytes.NewReader(deletionMark))) + }, + blocksUnmarked: 1, + }, + { + name: "block not marked for deletion, message logged and metric not incremented", + preDelete: func(t testing.TB, id ulid.ULID, bkt objstore.Bucket) {}, + blocksUnmarked: 0, + }, + } { + t.Run(testcases.name, func(t *testing.T) { + bkt := objstore.NewInMemBucket() + id, err := e2eutil.CreateBlock(ctx, tmpDir, []labels.Labels{ + {{Name: "cluster-eu1", Value: "service-1"}}, + {{Name: "cluster-eu1", Value: "service-2"}}, + {{Name: "cluster-eu1", Value: "service-3"}}, + {{Name: "cluster-us1", Value: "service-1"}}, + {{Name: "cluster-us1", Value: "service-2"}}, + }, 100, 0, 1000, labels.Labels{{Name: "region-1", Value: "eu-west"}}, 124, metadata.NoneFunc) + testutil.Ok(t, err) + testcases.preDelete(t, id, bkt) + counter := promauto.With(nil).NewCounter(prometheus.CounterOpts{}) + err = RemoveMark(ctx, log.NewNopLogger(), bkt, id, counter, metadata.DeletionMarkFilename) + testutil.Ok(t, err) + testutil.Equals(t, float64(testcases.blocksUnmarked), promtest.ToFloat64(counter)) + }) + } +} + +func TestRemoveMarkForNoCompact(t *testing.T) { + defer custom.TolerantVerifyLeak(t) + ctx := context.Background() + tmpDir := t.TempDir() + for _, testCases := range []struct { + name string + preDelete func(t testing.TB, id ulid.ULID, bkt objstore.Bucket) + blocksUnmarked int + }{ + { + name: "unmarked block for no-compact", + preDelete: func(t testing.TB, id ulid.ULID, bkt objstore.Bucket) { + m, err := json.Marshal(metadata.NoCompactMark{ + ID: id, + NoCompactTime: time.Now().Unix(), + Version: metadata.NoCompactMarkVersion1, + }) + testutil.Ok(t, err) + testutil.Ok(t, bkt.Upload(ctx, path.Join(id.String(), metadata.NoCompactMarkFilename), bytes.NewReader(m))) + }, + blocksUnmarked: 1, + }, + { + name: "block not marked for no-compact, message logged and metric not incremented", + preDelete: func(t testing.TB, id ulid.ULID, bkt objstore.Bucket) {}, + blocksUnmarked: 0, + }, + } { + t.Run(testCases.name, func(t *testing.T) { + bkt := objstore.NewInMemBucket() + id, err := e2eutil.CreateBlock(ctx, tmpDir, []labels.Labels{ + {{Name: "cluster-eu1", Value: "service-1"}}, + {{Name: "cluster-eu1", Value: "service-2"}}, + {{Name: "cluster-eu1", Value: "service-3"}}, + {{Name: "cluster-us1", Value: "service-1"}}, + {{Name: "cluster-us1", Value: "service-2"}}, + }, 100, 0, 1000, labels.Labels{{Name: "region-1", Value: "eu-west"}}, 124, metadata.NoneFunc) + testutil.Ok(t, err) + testCases.preDelete(t, id, bkt) + counter := promauto.With(nil).NewCounter(prometheus.CounterOpts{}) + err = RemoveMark(ctx, log.NewNopLogger(), bkt, id, counter, metadata.NoCompactMarkFilename) + testutil.Ok(t, err) + testutil.Equals(t, float64(testCases.blocksUnmarked), promtest.ToFloat64(counter)) + }) + } +} + +func TestRemoveMmarkForNoDownsample(t *testing.T) { + defer custom.TolerantVerifyLeak(t) + ctx := context.Background() + tmpDir := t.TempDir() + for _, testCases := range []struct { + name string + preDelete func(t testing.TB, id ulid.ULID, bkt objstore.Bucket) + blocksUnmarked int + }{ + { + name: "unmarked block for no-downsample", + preDelete: func(t testing.TB, id ulid.ULID, bkt objstore.Bucket) { + m, err := json.Marshal(metadata.NoDownsampleMark{ + ID: id, + NoDownsampleTime: time.Now().Unix(), + Version: metadata.NoDownsampleMarkVersion1, + }) + testutil.Ok(t, err) + testutil.Ok(t, bkt.Upload(ctx, path.Join(id.String(), metadata.NoDownsampleMarkFilename), bytes.NewReader(m))) + }, + blocksUnmarked: 1, + }, + { + name: "block not marked for no-downsample, message logged and metric not incremented", + preDelete: func(t testing.TB, id ulid.ULID, bkt objstore.Bucket) {}, + blocksUnmarked: 0, + }, + } { + t.Run(testCases.name, func(t *testing.T) { + bkt := objstore.NewInMemBucket() + id, err := e2eutil.CreateBlock(ctx, tmpDir, []labels.Labels{ + {{Name: "cluster-eu1", Value: "service-1"}}, + {{Name: "cluster-eu1", Value: "service-2"}}, + {{Name: "cluster-eu1", Value: "service-3"}}, + {{Name: "cluster-us1", Value: "service-1"}}, + {{Name: "cluster-us1", Value: "service-2"}}, + }, 100, 0, 1000, labels.Labels{{Name: "region-1", Value: "eu-west"}}, 124, metadata.NoneFunc) + testutil.Ok(t, err) + testCases.preDelete(t, id, bkt) + counter := promauto.With(nil).NewCounter(prometheus.CounterOpts{}) + err = RemoveMark(ctx, log.NewNopLogger(), bkt, id, counter, metadata.NoDownsampleMarkFilename) + testutil.Ok(t, err) + testutil.Equals(t, float64(testCases.blocksUnmarked), promtest.ToFloat64(counter)) + }) + } +}