Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tools: Added thanos bucket tool mark #3415

Merged
merged 3 commits into from
Nov 12, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions cmd/thanos/tools_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ func registerBucket(app extkingpin.AppClause) {
registerBucketReplicate(cmd, objStoreConfig)
registerBucketDownsample(cmd, objStoreConfig)
registerBucketCleanup(cmd, objStoreConfig)
registerBucketMarkBlock(cmd, objStoreConfig)
}

func registerBucketVerify(app extkingpin.AppClause, objStoreConfig *extflag.PathOrContent) {
Expand Down Expand Up @@ -710,3 +711,53 @@ func compare(s1, s2 string) bool {
}
return s1Time.Before(s2Time)
}

func registerBucketMarkBlock(app extkingpin.AppClause, objStoreConfig *extflag.PathOrContent) {
cmd := app.Command(component.Mark.String(), "Mark block for deletion or no-compact in a safe way. NOTE: If the compactor is currently running compacting same block, this operation would be potentially a noop.")
blockIDs := cmd.Flag("id", "ID (ULID) of the blocks to be marked for deletion (repeated flag)").Required().Strings()
marker := cmd.Flag("marker", "Marker to be put.").Required().Enum(metadata.DeletionMarkFilename, metadata.NoCompactMarkFilename)
details := cmd.Flag("details", "Human readable details to be put into marker.").Required().String()
cmd.Setup(func(g *run.Group, logger log.Logger, reg *prometheus.Registry, _ opentracing.Tracer, _ <-chan struct{}, _ bool) error {
confContentYaml, err := objStoreConfig.Content()
if err != nil {
return err
}

bkt, err := client.NewBucket(logger, confContentYaml, reg, component.Cleanup.String())
bwplotka marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}

var ids []ulid.ULID
for _, id := range *blockIDs {
u, err := ulid.Parse(id)
if err != nil {
return errors.Errorf("id is not a valid block ULID, got: %v", id)
}
ids = append(ids, u)
}

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
bwplotka marked this conversation as resolved.
Show resolved Hide resolved
g.Add(func() error {
for _, id := range ids {
switch *marker {
case metadata.DeletionMarkFilename:
if err := block.MarkForDeletion(ctx, logger, bkt, id, *details, promauto.With(nil).NewCounter(prometheus.CounterOpts{})); err != nil {
return errors.Wrapf(err, "mark %v for %v", id, *marker)
}
case metadata.NoCompactMarkFilename:
if err := block.MarkForNoCompact(ctx, logger, bkt, id, metadata.ManualNoCompactReason, *details, promauto.With(nil).NewCounter(prometheus.CounterOpts{})); err != nil {
return errors.Wrapf(err, "mark %v for %v", id, *marker)
}
default:
return errors.Errorf("not supported marker %v", *marker)
}
}
level.Info(logger).Log("msg", "marking done", "marker", *marker, "IDs", strings.Join(*blockIDs, ","))
return nil
}, func(err error) {
cancel()
})
return nil
})
}
72 changes: 71 additions & 1 deletion docs/components/tools.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ Subcommands:
tools bucket cleanup [<flags>]
Cleans up all blocks marked for deletion

tools bucket mark --id=ID --marker=MARKER --details=DETAILS
Mark block for deletion or no-compact in a safe way. NOTE: If the compactor
is currently running compacting same block, this operation would be
potentially a noop.

tools rules-check --rules=RULES
Check if the rule files are valid or not.

Expand Down Expand Up @@ -144,6 +149,11 @@ Subcommands:
tools bucket cleanup [<flags>]
Cleans up all blocks marked for deletion

tools bucket mark --id=ID --marker=MARKER --details=DETAILS
Mark block for deletion or no-compact in a safe way. NOTE: If the compactor
is currently running compacting same block, this operation would be
potentially a noop.


```

Expand Down Expand Up @@ -493,7 +503,7 @@ Flags:

### Bucket downsample

`tools bucket downsample` is used to continuously downsample blocks in an object store bucket as a service.
`tools bucket downsample` is used to downsample blocks in an object store bucket as a service.
It implements the downsample API on top of historical data in an object storage bucket.

```bash
Expand Down Expand Up @@ -549,6 +559,66 @@ Flags:
process downsamplings.

```

### Bucket mark

`tools bucket mark` can be used to manually mark block for deletion.

NOTE: If the [Compactor](compact.md) is currently running and compacting exactly same block, this operation would be potentially a noop."

```bash
thanos tools bucket mark \
--id "01C8320GCGEWBZF51Q46TTQEH9" --id "01C8J352831FXGZQMN2NTJ08DY"
--objstore.config-file "bucket.yml"
```

The example content of `bucket.yml`:

```yaml
type: GCS
config:
bucket: example-bucket
```

[embedmd]:# (flags/tools_bucket_mark.txt $)
```$
usage: thanos tools bucket mark --id=ID --marker=MARKER --details=DETAILS

Mark block for deletion or no-compact in a safe way. NOTE: If the compactor is
currently running compacting same block, this operation would be potentially a
noop.

Flags:
-h, --help Show context-sensitive help (also try --help-long and
--help-man).
--version Show application version.
--log.level=info Log filtering level.
--log.format=logfmt Log format to use. Possible options: logfmt or json.
--tracing.config-file=<file-path>
Path to YAML file with tracing configuration. See
format details:
https://thanos.io/tip/thanos/tracing.md/#configuration
--tracing.config=<content>
Alternative to 'tracing.config-file' flag (lower
priority). Content of YAML file with tracing
configuration. See format details:
https://thanos.io/tip/thanos/tracing.md/#configuration
--objstore.config-file=<file-path>
Path to YAML file that contains object store
configuration. See format details:
https://thanos.io/tip/thanos/storage.md/#configuration
--objstore.config=<content>
Alternative to 'objstore.config-file' flag (lower
priority). Content of YAML file that contains object
store configuration. See format details:
https://thanos.io/tip/thanos/storage.md/#configuration
--id=ID ... ID (ULID) of the blocks to be marked for deletion
(repeated flag)
--marker=MARKER Marker to be put.
--details=DETAILS Human readable details to be put into marker.

```

## Rules-check

The `tools rules-check` subcommand contains tools for validation of Prometheus rules.
Expand Down
11 changes: 6 additions & 5 deletions pkg/block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func cleanUp(logger log.Logger, bkt objstore.Bucket, id ulid.ULID, err error) er
}

// MarkForDeletion creates a file which stores information about when the block was marked for deletion.
func MarkForDeletion(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID, markedForDeletion prometheus.Counter) error {
func MarkForDeletion(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID, details string, markedForDeletion prometheus.Counter) error {
deletionMarkFile := path.Join(id.String(), metadata.DeletionMarkFilename)
deletionMarkExists, err := bkt.Exists(ctx, deletionMarkFile)
if err != nil {
Expand All @@ -148,6 +148,7 @@ func MarkForDeletion(ctx context.Context, logger log.Logger, bkt objstore.Bucket
ID: id,
DeletionTime: time.Now().Unix(),
Version: metadata.DeletionMarkVersion1,
Details: details,
})
if err != nil {
return errors.Wrap(err, "json encode deletion mark")
Expand Down Expand Up @@ -286,7 +287,7 @@ func gatherFileStats(blockDir string) (res []metadata.File, _ error) {
}

// MarkForNoCompact creates a file which marks block to be not compacted.
func MarkForNoCompact(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID, reason metadata.NoCompactReason, noCompactDetails string, markedForNoCompact prometheus.Counter) error {
func MarkForNoCompact(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID, reason metadata.NoCompactReason, details string, markedForNoCompact prometheus.Counter) error {
m := path.Join(id.String(), metadata.NoCompactMarkFilename)
noCompactMarkExists, err := bkt.Exists(ctx, m)
if err != nil {
Expand All @@ -301,9 +302,9 @@ func MarkForNoCompact(ctx context.Context, logger log.Logger, bkt objstore.Bucke
ID: id,
Version: metadata.NoCompactMarkVersion1,

Time: time.Now().Unix(),
Reason: reason,
Details: noCompactDetails,
NoCompactTime: time.Now().Unix(),
Reason: reason,
Details: details,
})
if err != nil {
return errors.Wrap(err, "json encode no compact mark")
Expand Down
8 changes: 4 additions & 4 deletions pkg/block/block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ func TestMarkForDeletion(t *testing.T) {
testutil.Ok(t, Upload(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, id.String())))

c := promauto.With(nil).NewCounter(prometheus.CounterOpts{})
err = MarkForDeletion(ctx, log.NewNopLogger(), bkt, id, c)
err = MarkForDeletion(ctx, log.NewNopLogger(), bkt, id, "", c)
testutil.Ok(t, err)
testutil.Equals(t, float64(tcase.blocksMarked), promtest.ToFloat64(c))
})
Expand Down Expand Up @@ -335,9 +335,9 @@ func TestMarkForNoCompact(t *testing.T) {
name: "block with no-compact mark already, expected log and no metric increment",
preUpload: func(t testing.TB, id ulid.ULID, bkt objstore.Bucket) {
m, err := json.Marshal(metadata.NoCompactMark{
ID: id,
Time: time.Now().Unix(),
Version: metadata.NoCompactMarkVersion1,
ID: id,
NoCompactTime: time.Now().Unix(),
Version: metadata.NoCompactMarkVersion1,
})
testutil.Ok(t, err)
testutil.Ok(t, bkt.Upload(ctx, path.Join(id.String(), metadata.NoCompactMarkFilename), bytes.NewReader(m)))
Expand Down
12 changes: 7 additions & 5 deletions pkg/block/metadata/markers.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ type DeletionMark struct {
ID ulid.ULID `json:"id"`
// Version of the file.
Version int `json:"version"`
// Details is a human readable string giving details of reason.
Details string `json:"details,omitempty"`

// DeletionTime is a unix timestamp of when the block was marked to be deleted.
DeletionTime int64 `json:"deletion_time"`
Expand All @@ -73,12 +75,12 @@ type NoCompactMark struct {
ID ulid.ULID `json:"id"`
// Version of the file.
Version int `json:"version"`

// Time is a unix timestamp of when the block was marked for no compact.
Time int64 `json:"time"`
Reason NoCompactReason `json:"reason"`
// Details is a human readable string giving details of reason.
Details string `json:"details"`
Details string `json:"details,omitempty"`

// NoCompactTime is a unix timestamp of when the block was marked for no compact.
NoCompactTime int64 `json:"no_compact_time"`
Reason NoCompactReason `json:"reason"`
}

func (n *NoCompactMark) markerFilename() string { return NoCompactMarkFilename }
Expand Down
6 changes: 3 additions & 3 deletions pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func (s *Syncer) GarbageCollect(ctx context.Context) error {
delCtx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)

level.Info(s.logger).Log("msg", "marking outdated block for deletion", "block", id)
err := block.MarkForDeletion(delCtx, s.logger, s.bkt, id, s.metrics.blocksMarkedForDeletion)
err := block.MarkForDeletion(delCtx, s.logger, s.bkt, id, "outdated block", s.metrics.blocksMarkedForDeletion)
cancel()
if err != nil {
s.metrics.garbageCollectionFailures.Inc()
Expand Down Expand Up @@ -671,7 +671,7 @@ func RepairIssue347(ctx context.Context, logger log.Logger, bkt objstore.Bucket,
defer cancel()

// TODO(bplotka): Issue with this will introduce overlap that will halt compactor. Automate that (fix duplicate overlaps caused by this).
if err := block.MarkForDeletion(delCtx, logger, bkt, ie.id, blocksMarkedForDeletion); err != nil {
if err := block.MarkForDeletion(delCtx, logger, bkt, ie.id, "source of repaired block", blocksMarkedForDeletion); err != nil {
return errors.Wrapf(err, "marking old block %s for deletion has failed", ie.id)
}
return nil
Expand Down Expand Up @@ -830,7 +830,7 @@ func (cg *Group) deleteBlock(id ulid.ULID, bdir string) error {
delCtx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
level.Info(cg.logger).Log("msg", "marking compacted block for deletion", "old_block", id)
if err := block.MarkForDeletion(delCtx, cg.logger, cg.bkt, id, cg.blocksMarkedForDeletion); err != nil {
if err := block.MarkForDeletion(delCtx, cg.logger, cg.bkt, id, "source of compacted block", cg.blocksMarkedForDeletion); err != nil {
return errors.Wrapf(err, "mark block %s for deletion from bucket", id)
}
return nil
Expand Down
3 changes: 2 additions & 1 deletion pkg/compact/retention.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package compact

import (
"context"
"fmt"
"time"

"github.com/go-kit/kit/log"
Expand Down Expand Up @@ -37,7 +38,7 @@ func ApplyRetentionPolicyByResolution(
maxTime := time.Unix(m.MaxTime/1000, 0)
if time.Now().After(maxTime.Add(retentionDuration)) {
level.Info(logger).Log("msg", "applying retention: marking block for deletion", "id", id, "maxTime", maxTime.String())
if err := block.MarkForDeletion(ctx, logger, bkt, id, blocksMarkedForDeletion); err != nil {
if err := block.MarkForDeletion(ctx, logger, bkt, id, fmt.Sprintf("block exceeding retention of %v", retentionDuration), blocksMarkedForDeletion); err != nil {
return errors.Wrap(err, "delete block")
}
}
Expand Down
1 change: 1 addition & 0 deletions pkg/component/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ func FromProto(storeType storepb.StoreType) StoreAPI {
var (
Bucket = source{component: component{name: "bucket"}}
Cleanup = source{component: component{name: "cleanup"}}
Mark = source{component: component{name: "mark"}}
Compact = source{component: component{name: "compact"}}
Downsample = source{component: component{name: "downsample"}}
Replicate = source{component: component{name: "replicate"}}
Expand Down
4 changes: 2 additions & 2 deletions pkg/verifier/safe_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func BackupAndDelete(ctx context.Context, conf Config, id ulid.ULID) error {
}

level.Info(conf.Logger).Log("msg", "Marking block as deleted", "id", id.String())
if err := block.MarkForDeletion(ctx, conf.Logger, conf.Bkt, id, conf.metrics.blocksMarkedForDeletion); err != nil {
if err := block.MarkForDeletion(ctx, conf.Logger, conf.Bkt, id, "manual verify-repair", conf.metrics.blocksMarkedForDeletion); err != nil {
return errors.Wrap(err, "marking delete from source")
}
return nil
Expand Down Expand Up @@ -115,7 +115,7 @@ func BackupAndDeleteDownloaded(ctx context.Context, conf Config, bdir string, id
}

level.Info(conf.Logger).Log("msg", "Marking block as deleted", "id", id.String())
if err := block.MarkForDeletion(ctx, conf.Logger, conf.Bkt, id, conf.metrics.blocksMarkedForDeletion); err != nil {
if err := block.MarkForDeletion(ctx, conf.Logger, conf.Bkt, id, "manual verify-repair", conf.metrics.blocksMarkedForDeletion); err != nil {
return errors.Wrap(err, "marking delete from source")
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion scripts/genflagdocs.sh
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ for x in "${toolsCommands[@]}"; do
${THANOS_BIN} tools "${x}" --help &>"docs/components/flags/tools_${x}.txt"
done

toolsBucketCommands=("verify" "ls" "inspect" "web" "replicate" "downsample" "cleanup")
toolsBucketCommands=("verify" "ls" "inspect" "web" "replicate" "downsample" "cleanup" "mark")
for x in "${toolsBucketCommands[@]}"; do
${THANOS_BIN} tools bucket "${x}" --help &>"docs/components/flags/tools_bucket_${x}.txt"
done
Expand Down
6 changes: 3 additions & 3 deletions test/e2e/compact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ func TestCompactWithStoreGateway(t *testing.T) {
id, err = malformedBase.Create(ctx, dir, 0*time.Second)
testutil.Ok(t, err)
testutil.Ok(t, os.Remove(path.Join(dir, id.String(), metadata.MetaFilename)))
testutil.Ok(t, block.MarkForDeletion(ctx, logger, bkt, id, promauto.With(nil).NewCounter(prometheus.CounterOpts{})))
testutil.Ok(t, block.MarkForDeletion(ctx, logger, bkt, id, "", promauto.With(nil).NewCounter(prometheus.CounterOpts{})))
testutil.Ok(t, objstore.UploadDir(ctx, logger, bkt, path.Join(dir, id.String()), id.String()))

// Partial block after consistency delay.
Expand All @@ -385,7 +385,7 @@ func TestCompactWithStoreGateway(t *testing.T) {
id, err = malformedBase.Create(ctx, dir, justAfterConsistencyDelay)
testutil.Ok(t, err)
testutil.Ok(t, os.Remove(path.Join(dir, id.String(), metadata.MetaFilename)))
testutil.Ok(t, block.MarkForDeletion(ctx, logger, bkt, id, promauto.With(nil).NewCounter(prometheus.CounterOpts{})))
testutil.Ok(t, block.MarkForDeletion(ctx, logger, bkt, id, "", promauto.With(nil).NewCounter(prometheus.CounterOpts{})))
testutil.Ok(t, objstore.UploadDir(ctx, logger, bkt, path.Join(dir, id.String()), id.String()))

// Partial block after consistency delay + old deletion mark ready to be deleted.
Expand All @@ -412,7 +412,7 @@ func TestCompactWithStoreGateway(t *testing.T) {
id, err = malformedBase.Create(ctx, dir, 50*time.Hour)
testutil.Ok(t, err)
testutil.Ok(t, os.Remove(path.Join(dir, id.String(), metadata.MetaFilename)))
testutil.Ok(t, block.MarkForDeletion(ctx, logger, bkt, id, promauto.With(nil).NewCounter(prometheus.CounterOpts{})))
testutil.Ok(t, block.MarkForDeletion(ctx, logger, bkt, id, "", promauto.With(nil).NewCounter(prometheus.CounterOpts{})))
testutil.Ok(t, objstore.UploadDir(ctx, logger, bkt, path.Join(dir, id.String()), id.String()))
}

Expand Down