Skip to content

Commit

Permalink
tools: Added thanos bucket tool mark (#3415)
Browse files Browse the repository at this point in the history
* tools: Added thanos bucket tool mark

Signed-off-by: Bartlomiej Plotka <[email protected]>

* Rebase.

Signed-off-by: Bartlomiej Plotka <[email protected]>

* Fixed build errors.

Signed-off-by: Bartlomiej Plotka <[email protected]>
  • Loading branch information
bwplotka authored Nov 12, 2020
1 parent 82e7da7 commit 69a045a
Show file tree
Hide file tree
Showing 11 changed files with 151 additions and 25 deletions.
51 changes: 51 additions & 0 deletions cmd/thanos/tools_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ func registerBucket(app extkingpin.AppClause) {
registerBucketReplicate(cmd, objStoreConfig)
registerBucketDownsample(cmd, objStoreConfig)
registerBucketCleanup(cmd, objStoreConfig)
registerBucketMarkBlock(cmd, objStoreConfig)
}

func registerBucketVerify(app extkingpin.AppClause, objStoreConfig *extflag.PathOrContent) {
Expand Down Expand Up @@ -710,3 +711,53 @@ func compare(s1, s2 string) bool {
}
return s1Time.Before(s2Time)
}

func registerBucketMarkBlock(app extkingpin.AppClause, objStoreConfig *extflag.PathOrContent) {
cmd := app.Command(component.Mark.String(), "Mark block for deletion or no-compact in a safe way. NOTE: If the compactor is currently running compacting same block, this operation would be potentially a noop.")
blockIDs := cmd.Flag("id", "ID (ULID) of the blocks to be marked for deletion (repeated flag)").Required().Strings()
marker := cmd.Flag("marker", "Marker to be put.").Required().Enum(metadata.DeletionMarkFilename, metadata.NoCompactMarkFilename)
details := cmd.Flag("details", "Human readable details to be put into marker.").Required().String()
cmd.Setup(func(g *run.Group, logger log.Logger, reg *prometheus.Registry, _ opentracing.Tracer, _ <-chan struct{}, _ bool) error {
confContentYaml, err := objStoreConfig.Content()
if err != nil {
return err
}

bkt, err := client.NewBucket(logger, confContentYaml, reg, component.Cleanup.String())
if err != nil {
return err
}

var ids []ulid.ULID
for _, id := range *blockIDs {
u, err := ulid.Parse(id)
if err != nil {
return errors.Errorf("id is not a valid block ULID, got: %v", id)
}
ids = append(ids, u)
}

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
g.Add(func() error {
for _, id := range ids {
switch *marker {
case metadata.DeletionMarkFilename:
if err := block.MarkForDeletion(ctx, logger, bkt, id, *details, promauto.With(nil).NewCounter(prometheus.CounterOpts{})); err != nil {
return errors.Wrapf(err, "mark %v for %v", id, *marker)
}
case metadata.NoCompactMarkFilename:
if err := block.MarkForNoCompact(ctx, logger, bkt, id, metadata.ManualNoCompactReason, *details, promauto.With(nil).NewCounter(prometheus.CounterOpts{})); err != nil {
return errors.Wrapf(err, "mark %v for %v", id, *marker)
}
default:
return errors.Errorf("not supported marker %v", *marker)
}
}
level.Info(logger).Log("msg", "marking done", "marker", *marker, "IDs", strings.Join(*blockIDs, ","))
return nil
}, func(err error) {
cancel()
})
return nil
})
}
72 changes: 71 additions & 1 deletion docs/components/tools.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ Subcommands:
tools bucket cleanup [<flags>]
Cleans up all blocks marked for deletion
tools bucket mark --id=ID --marker=MARKER --details=DETAILS
Mark block for deletion or no-compact in a safe way. NOTE: If the compactor
is currently running compacting same block, this operation would be
potentially a noop.
tools rules-check --rules=RULES
Check if the rule files are valid or not.
Expand Down Expand Up @@ -144,6 +149,11 @@ Subcommands:
tools bucket cleanup [<flags>]
Cleans up all blocks marked for deletion
tools bucket mark --id=ID --marker=MARKER --details=DETAILS
Mark block for deletion or no-compact in a safe way. NOTE: If the compactor
is currently running compacting same block, this operation would be
potentially a noop.
```

Expand Down Expand Up @@ -493,7 +503,7 @@ Flags:

### Bucket downsample

`tools bucket downsample` is used to continuously downsample blocks in an object store bucket as a service.
`tools bucket downsample` is used to downsample blocks in an object store bucket as a service.
It implements the downsample API on top of historical data in an object storage bucket.

```bash
Expand Down Expand Up @@ -549,6 +559,66 @@ Flags:
process downsamplings.
```

### Bucket mark

`tools bucket mark` can be used to manually mark block for deletion.

NOTE: If the [Compactor](compact.md) is currently running and compacting exactly same block, this operation would be potentially a noop."

```bash
thanos tools bucket mark \
--id "01C8320GCGEWBZF51Q46TTQEH9" --id "01C8J352831FXGZQMN2NTJ08DY"
--objstore.config-file "bucket.yml"
```

The example content of `bucket.yml`:

```yaml
type: GCS
config:
bucket: example-bucket
```

[embedmd]:# (flags/tools_bucket_mark.txt $)
```$
usage: thanos tools bucket mark --id=ID --marker=MARKER --details=DETAILS
Mark block for deletion or no-compact in a safe way. NOTE: If the compactor is
currently running compacting same block, this operation would be potentially a
noop.
Flags:
-h, --help Show context-sensitive help (also try --help-long and
--help-man).
--version Show application version.
--log.level=info Log filtering level.
--log.format=logfmt Log format to use. Possible options: logfmt or json.
--tracing.config-file=<file-path>
Path to YAML file with tracing configuration. See
format details:
https://thanos.io/tip/thanos/tracing.md/#configuration
--tracing.config=<content>
Alternative to 'tracing.config-file' flag (lower
priority). Content of YAML file with tracing
configuration. See format details:
https://thanos.io/tip/thanos/tracing.md/#configuration
--objstore.config-file=<file-path>
Path to YAML file that contains object store
configuration. See format details:
https://thanos.io/tip/thanos/storage.md/#configuration
--objstore.config=<content>
Alternative to 'objstore.config-file' flag (lower
priority). Content of YAML file that contains object
store configuration. See format details:
https://thanos.io/tip/thanos/storage.md/#configuration
--id=ID ... ID (ULID) of the blocks to be marked for deletion
(repeated flag)
--marker=MARKER Marker to be put.
--details=DETAILS Human readable details to be put into marker.
```

## Rules-check

The `tools rules-check` subcommand contains tools for validation of Prometheus rules.
Expand Down
11 changes: 6 additions & 5 deletions pkg/block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func cleanUp(logger log.Logger, bkt objstore.Bucket, id ulid.ULID, err error) er
}

// MarkForDeletion creates a file which stores information about when the block was marked for deletion.
func MarkForDeletion(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID, markedForDeletion prometheus.Counter) error {
func MarkForDeletion(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID, details string, markedForDeletion prometheus.Counter) error {
deletionMarkFile := path.Join(id.String(), metadata.DeletionMarkFilename)
deletionMarkExists, err := bkt.Exists(ctx, deletionMarkFile)
if err != nil {
Expand All @@ -148,6 +148,7 @@ func MarkForDeletion(ctx context.Context, logger log.Logger, bkt objstore.Bucket
ID: id,
DeletionTime: time.Now().Unix(),
Version: metadata.DeletionMarkVersion1,
Details: details,
})
if err != nil {
return errors.Wrap(err, "json encode deletion mark")
Expand Down Expand Up @@ -286,7 +287,7 @@ func gatherFileStats(blockDir string) (res []metadata.File, _ error) {
}

// MarkForNoCompact creates a file which marks block to be not compacted.
func MarkForNoCompact(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID, reason metadata.NoCompactReason, noCompactDetails string, markedForNoCompact prometheus.Counter) error {
func MarkForNoCompact(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID, reason metadata.NoCompactReason, details string, markedForNoCompact prometheus.Counter) error {
m := path.Join(id.String(), metadata.NoCompactMarkFilename)
noCompactMarkExists, err := bkt.Exists(ctx, m)
if err != nil {
Expand All @@ -301,9 +302,9 @@ func MarkForNoCompact(ctx context.Context, logger log.Logger, bkt objstore.Bucke
ID: id,
Version: metadata.NoCompactMarkVersion1,

Time: time.Now().Unix(),
Reason: reason,
Details: noCompactDetails,
NoCompactTime: time.Now().Unix(),
Reason: reason,
Details: details,
})
if err != nil {
return errors.Wrap(err, "json encode no compact mark")
Expand Down
8 changes: 4 additions & 4 deletions pkg/block/block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ func TestMarkForDeletion(t *testing.T) {
testutil.Ok(t, Upload(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, id.String())))

c := promauto.With(nil).NewCounter(prometheus.CounterOpts{})
err = MarkForDeletion(ctx, log.NewNopLogger(), bkt, id, c)
err = MarkForDeletion(ctx, log.NewNopLogger(), bkt, id, "", c)
testutil.Ok(t, err)
testutil.Equals(t, float64(tcase.blocksMarked), promtest.ToFloat64(c))
})
Expand Down Expand Up @@ -335,9 +335,9 @@ func TestMarkForNoCompact(t *testing.T) {
name: "block with no-compact mark already, expected log and no metric increment",
preUpload: func(t testing.TB, id ulid.ULID, bkt objstore.Bucket) {
m, err := json.Marshal(metadata.NoCompactMark{
ID: id,
Time: time.Now().Unix(),
Version: metadata.NoCompactMarkVersion1,
ID: id,
NoCompactTime: time.Now().Unix(),
Version: metadata.NoCompactMarkVersion1,
})
testutil.Ok(t, err)
testutil.Ok(t, bkt.Upload(ctx, path.Join(id.String(), metadata.NoCompactMarkFilename), bytes.NewReader(m)))
Expand Down
12 changes: 7 additions & 5 deletions pkg/block/metadata/markers.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ type DeletionMark struct {
ID ulid.ULID `json:"id"`
// Version of the file.
Version int `json:"version"`
// Details is a human readable string giving details of reason.
Details string `json:"details,omitempty"`

// DeletionTime is a unix timestamp of when the block was marked to be deleted.
DeletionTime int64 `json:"deletion_time"`
Expand All @@ -73,12 +75,12 @@ type NoCompactMark struct {
ID ulid.ULID `json:"id"`
// Version of the file.
Version int `json:"version"`

// Time is a unix timestamp of when the block was marked for no compact.
Time int64 `json:"time"`
Reason NoCompactReason `json:"reason"`
// Details is a human readable string giving details of reason.
Details string `json:"details"`
Details string `json:"details,omitempty"`

// NoCompactTime is a unix timestamp of when the block was marked for no compact.
NoCompactTime int64 `json:"no_compact_time"`
Reason NoCompactReason `json:"reason"`
}

func (n *NoCompactMark) markerFilename() string { return NoCompactMarkFilename }
Expand Down
6 changes: 3 additions & 3 deletions pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func (s *Syncer) GarbageCollect(ctx context.Context) error {
delCtx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)

level.Info(s.logger).Log("msg", "marking outdated block for deletion", "block", id)
err := block.MarkForDeletion(delCtx, s.logger, s.bkt, id, s.metrics.blocksMarkedForDeletion)
err := block.MarkForDeletion(delCtx, s.logger, s.bkt, id, "outdated block", s.metrics.blocksMarkedForDeletion)
cancel()
if err != nil {
s.metrics.garbageCollectionFailures.Inc()
Expand Down Expand Up @@ -671,7 +671,7 @@ func RepairIssue347(ctx context.Context, logger log.Logger, bkt objstore.Bucket,
defer cancel()

// TODO(bplotka): Issue with this will introduce overlap that will halt compactor. Automate that (fix duplicate overlaps caused by this).
if err := block.MarkForDeletion(delCtx, logger, bkt, ie.id, blocksMarkedForDeletion); err != nil {
if err := block.MarkForDeletion(delCtx, logger, bkt, ie.id, "source of repaired block", blocksMarkedForDeletion); err != nil {
return errors.Wrapf(err, "marking old block %s for deletion has failed", ie.id)
}
return nil
Expand Down Expand Up @@ -830,7 +830,7 @@ func (cg *Group) deleteBlock(id ulid.ULID, bdir string) error {
delCtx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
level.Info(cg.logger).Log("msg", "marking compacted block for deletion", "old_block", id)
if err := block.MarkForDeletion(delCtx, cg.logger, cg.bkt, id, cg.blocksMarkedForDeletion); err != nil {
if err := block.MarkForDeletion(delCtx, cg.logger, cg.bkt, id, "source of compacted block", cg.blocksMarkedForDeletion); err != nil {
return errors.Wrapf(err, "mark block %s for deletion from bucket", id)
}
return nil
Expand Down
3 changes: 2 additions & 1 deletion pkg/compact/retention.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package compact

import (
"context"
"fmt"
"time"

"github.com/go-kit/kit/log"
Expand Down Expand Up @@ -37,7 +38,7 @@ func ApplyRetentionPolicyByResolution(
maxTime := time.Unix(m.MaxTime/1000, 0)
if time.Now().After(maxTime.Add(retentionDuration)) {
level.Info(logger).Log("msg", "applying retention: marking block for deletion", "id", id, "maxTime", maxTime.String())
if err := block.MarkForDeletion(ctx, logger, bkt, id, blocksMarkedForDeletion); err != nil {
if err := block.MarkForDeletion(ctx, logger, bkt, id, fmt.Sprintf("block exceeding retention of %v", retentionDuration), blocksMarkedForDeletion); err != nil {
return errors.Wrap(err, "delete block")
}
}
Expand Down
1 change: 1 addition & 0 deletions pkg/component/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ func FromProto(storeType storepb.StoreType) StoreAPI {
var (
Bucket = source{component: component{name: "bucket"}}
Cleanup = source{component: component{name: "cleanup"}}
Mark = source{component: component{name: "mark"}}
Compact = source{component: component{name: "compact"}}
Downsample = source{component: component{name: "downsample"}}
Replicate = source{component: component{name: "replicate"}}
Expand Down
4 changes: 2 additions & 2 deletions pkg/verifier/safe_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func BackupAndDelete(ctx context.Context, conf Config, id ulid.ULID) error {
}

level.Info(conf.Logger).Log("msg", "Marking block as deleted", "id", id.String())
if err := block.MarkForDeletion(ctx, conf.Logger, conf.Bkt, id, conf.metrics.blocksMarkedForDeletion); err != nil {
if err := block.MarkForDeletion(ctx, conf.Logger, conf.Bkt, id, "manual verify-repair", conf.metrics.blocksMarkedForDeletion); err != nil {
return errors.Wrap(err, "marking delete from source")
}
return nil
Expand Down Expand Up @@ -115,7 +115,7 @@ func BackupAndDeleteDownloaded(ctx context.Context, conf Config, bdir string, id
}

level.Info(conf.Logger).Log("msg", "Marking block as deleted", "id", id.String())
if err := block.MarkForDeletion(ctx, conf.Logger, conf.Bkt, id, conf.metrics.blocksMarkedForDeletion); err != nil {
if err := block.MarkForDeletion(ctx, conf.Logger, conf.Bkt, id, "manual verify-repair", conf.metrics.blocksMarkedForDeletion); err != nil {
return errors.Wrap(err, "marking delete from source")
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion scripts/genflagdocs.sh
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ for x in "${toolsCommands[@]}"; do
${THANOS_BIN} tools "${x}" --help &>"docs/components/flags/tools_${x}.txt"
done

toolsBucketCommands=("verify" "ls" "inspect" "web" "replicate" "downsample" "cleanup")
toolsBucketCommands=("verify" "ls" "inspect" "web" "replicate" "downsample" "cleanup" "mark")
for x in "${toolsBucketCommands[@]}"; do
${THANOS_BIN} tools bucket "${x}" --help &>"docs/components/flags/tools_bucket_${x}.txt"
done
Expand Down
6 changes: 3 additions & 3 deletions test/e2e/compact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ func TestCompactWithStoreGateway(t *testing.T) {
id, err = malformedBase.Create(ctx, dir, 0*time.Second)
testutil.Ok(t, err)
testutil.Ok(t, os.Remove(path.Join(dir, id.String(), metadata.MetaFilename)))
testutil.Ok(t, block.MarkForDeletion(ctx, logger, bkt, id, promauto.With(nil).NewCounter(prometheus.CounterOpts{})))
testutil.Ok(t, block.MarkForDeletion(ctx, logger, bkt, id, "", promauto.With(nil).NewCounter(prometheus.CounterOpts{})))
testutil.Ok(t, objstore.UploadDir(ctx, logger, bkt, path.Join(dir, id.String()), id.String()))

// Partial block after consistency delay.
Expand All @@ -385,7 +385,7 @@ func TestCompactWithStoreGateway(t *testing.T) {
id, err = malformedBase.Create(ctx, dir, justAfterConsistencyDelay)
testutil.Ok(t, err)
testutil.Ok(t, os.Remove(path.Join(dir, id.String(), metadata.MetaFilename)))
testutil.Ok(t, block.MarkForDeletion(ctx, logger, bkt, id, promauto.With(nil).NewCounter(prometheus.CounterOpts{})))
testutil.Ok(t, block.MarkForDeletion(ctx, logger, bkt, id, "", promauto.With(nil).NewCounter(prometheus.CounterOpts{})))
testutil.Ok(t, objstore.UploadDir(ctx, logger, bkt, path.Join(dir, id.String()), id.String()))

// Partial block after consistency delay + old deletion mark ready to be deleted.
Expand All @@ -412,7 +412,7 @@ func TestCompactWithStoreGateway(t *testing.T) {
id, err = malformedBase.Create(ctx, dir, 50*time.Hour)
testutil.Ok(t, err)
testutil.Ok(t, os.Remove(path.Join(dir, id.String(), metadata.MetaFilename)))
testutil.Ok(t, block.MarkForDeletion(ctx, logger, bkt, id, promauto.With(nil).NewCounter(prometheus.CounterOpts{})))
testutil.Ok(t, block.MarkForDeletion(ctx, logger, bkt, id, "", promauto.With(nil).NewCounter(prometheus.CounterOpts{})))
testutil.Ok(t, objstore.UploadDir(ctx, logger, bkt, path.Join(dir, id.String()), id.String()))
}

Expand Down

0 comments on commit 69a045a

Please sign in to comment.