Skip to content

Commit

Permalink
bucket: Improved bucket flag handling when no store is specifed, simp…
Browse files Browse the repository at this point in the history
…lified factory anf bucket commands code.

Signed-off-by: Bartek Plotka <[email protected]>
  • Loading branch information
bwplotka committed Jan 11, 2019
1 parent b6dcbf5 commit 1b82733
Show file tree
Hide file tree
Showing 12 changed files with 102 additions and 109 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel

### Fixed

- [#396](https://github.com/improbable-eng/thanos/issues/396) - Fixed sidecar missing proxying samples if Prometheus result for single series was longer than 2^16
- [#649](https://github.com/improbable-eng/thanos/issues/649) - Fixed store label values api to add also external label values.
- [#708](https://github.com/improbable-eng/thanos/issues/708) - `"X-Amz-Acl": "bucket-owner-full-control"` metadata for s3 upload operation is no longer set by default which was breaking some providers handled by minio client.

Expand Down
63 changes: 38 additions & 25 deletions cmd/thanos/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,9 @@ import (
"text/template"
"time"

"github.com/prometheus/tsdb/labels"

"github.com/go-kit/kit/log"
"github.com/improbable-eng/thanos/pkg/block"
"github.com/improbable-eng/thanos/pkg/objstore"
"github.com/improbable-eng/thanos/pkg/objstore/client"
"github.com/improbable-eng/thanos/pkg/runutil"
"github.com/improbable-eng/thanos/pkg/verifier"
Expand All @@ -23,6 +22,7 @@ import (
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/tsdb/labels"
"golang.org/x/text/language"
"golang.org/x/text/message"
"gopkg.in/alecthomas/kingpin.v2"
Expand All @@ -46,18 +46,24 @@ var (
)

func registerBucket(m map[string]setupFunc, app *kingpin.Application, name string) {
cmd := app.Command(name, "Inspect metric data in an object storage bucket")
cmd := app.Command(name, "Bucket utility commands")

objStoreConfig := regCommonObjStoreFlags(cmd, "")
objStoreBackupConfig := regCommonObjStoreFlags(cmd, "-backup")
objStoreConfig := regCommonObjStoreFlags(cmd, "", true)

// Verify command.
verify := cmd.Command("verify", "Verify all blocks in the bucket against specified issues")
verifyRepair := verify.Flag("repair", "Attempt to repair blocks for which issues were detected").
registerBucketVerify(m, cmd, name, objStoreConfig)
registerBucketLs(m, cmd, name, objStoreConfig)
registerBucketInspect(m, cmd, name, objStoreConfig)
return
}

func registerBucketVerify(m map[string]setupFunc, root *kingpin.CmdClause, name string, objStoreConfig *pathOrContent) {
cmd := root.Command("verify", "Verify all blocks in the bucket against specified issues")
objStoreBackupConfig := regCommonObjStoreFlags(cmd, "-backup", false)
repair := cmd.Flag("repair", "Attempt to repair blocks for which issues were detected").
Short('r').Default("false").Bool()
verifyIssues := verify.Flag("issues", fmt.Sprintf("Issues to verify (and optionally repair). Possible values: %v", allIssues())).
issuesToVerify := cmd.Flag("issues", fmt.Sprintf("Issues to verify (and optionally repair). Possible values: %v", allIssues())).
Short('i').Default(verifier.IndexIssueID, verifier.OverlappedBlocksIssueID).Strings()
verifyIDWhitelist := verify.Flag("id-whitelist", "Block IDs to verify (and optionally repair) only. "+
idWhitelist := cmd.Flag("id-whitelist", "Block IDs to verify (and optionally repair) only. "+
"If none is specified, all blocks will be verified. Repeated field").Strings()
m[name+" verify"] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, _ opentracing.Tracer, _ bool) error {
bucketConfig, err := objStoreConfig.Content()
Expand All @@ -76,14 +82,17 @@ func registerBucket(m map[string]setupFunc, app *kingpin.Application, name strin
return err
}

backupBkt, err := client.NewBucket(logger, backupBucketConfig, reg, name)
if err == client.ErrNotFound {
if *verifyRepair {
var backupBkt objstore.Bucket
if len(backupBucketConfig) == 0 {
if *repair {
return errors.Wrap(err, "repair is specified, so backup client is required")
}
} else if err != nil {
return err
} else {
backupBkt, err = client.NewBucket(logger, backupBucketConfig, reg, name)
if err != nil {
return err
}

defer runutil.CloseWithLogOnErr(logger, backupBkt, "backup bucket client")
}

Expand All @@ -96,24 +105,24 @@ func registerBucket(m map[string]setupFunc, app *kingpin.Application, name strin
issues []verifier.Issue
)

for _, i := range *verifyIssues {
for _, i := range *issuesToVerify {
issueFn, ok := issuesMap[i]
if !ok {
return errors.Errorf("no such issue name %s", i)
}
issues = append(issues, issueFn)
}

if *verifyRepair {
if *repair {
v = verifier.NewWithRepair(logger, bkt, backupBkt, issues)
} else {
v = verifier.New(logger, bkt, issues)
}

var idMatcher func(ulid.ULID) bool = nil
if len(*verifyIDWhitelist) > 0 {
if len(*idWhitelist) > 0 {
whilelistIDs := map[string]struct{}{}
for _, bid := range *verifyIDWhitelist {
for _, bid := range *idWhitelist {
id, err := ulid.Parse(bid)
if err != nil {
return errors.Wrap(err, "invalid ULID found in --id-whitelist flag")
Expand All @@ -131,9 +140,11 @@ func registerBucket(m map[string]setupFunc, app *kingpin.Application, name strin

return v.Verify(ctx, idMatcher)
}
}

ls := cmd.Command("ls", "List all blocks in the bucket")
lsOutput := ls.Flag("output", "Format in which to print each block's information. May be 'json' or custom template.").
func registerBucketLs(m map[string]setupFunc, root *kingpin.CmdClause, name string, objStoreConfig *pathOrContent) {
cmd := root.Command("ls", "List all blocks in the bucket")
output := cmd.Flag("output", "Format in which to print each block's information. May be 'json' or custom template.").
Short('o').Default("").String()
m[name+" ls"] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, _ opentracing.Tracer, _ bool) error {
bucketConfig, err := objStoreConfig.Content()
Expand All @@ -155,7 +166,7 @@ func registerBucket(m map[string]setupFunc, app *kingpin.Application, name strin
defer cancel()

var (
format = *lsOutput
format = *output
printBlock func(id ulid.ULID) error
)

Expand Down Expand Up @@ -220,11 +231,13 @@ func registerBucket(m map[string]setupFunc, app *kingpin.Application, name strin
return printBlock(id)
})
}
}

inspect := cmd.Command("inspect", "Inspect all blocks in the bucket")
selector := inspect.Flag("selector", "Selects blocks based on label, e.g. '-l key1=\"value1\" -l key2=\"value2\"'. All key value pairs must match.").Short('l').
func registerBucketInspect(m map[string]setupFunc, root *kingpin.CmdClause, name string, objStoreConfig *pathOrContent) {
cmd := root.Command("inspect", "Inspect all blocks in the bucket")
selector := cmd.Flag("selector", "Selects blocks based on label, e.g. '-l key1=\"value1\" -l key2=\"value2\"'. All key value pairs must match.").Short('l').
PlaceHolder("<name>=\"<value>\"").Strings()
sortBy := inspect.Flag("sort-by", "Sort by columns. It's also possible to sort by multiple columns, e.g. '--sort-by FROM --sort-by UNTIL'. I.e., if the 'FROM' value is equal the rows are then further sorted by the 'UNTIL' value.").
sortBy := cmd.Flag("sort-by", "Sort by columns. It's also possible to sort by multiple columns, e.g. '--sort-by FROM --sort-by UNTIL'. I.e., if the 'FROM' value is equal the rows are then further sorted by the 'UNTIL' value.").
Default("FROM", "UNTIL").Enums(inspectColumns...)

m[name+" inspect"] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, _ opentracing.Tracer, _ bool) error {
Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application, name stri
dataDir := cmd.Flag("data-dir", "Data directory in which to cache blocks and process compactions.").
Default("./data").String()

objStoreConfig := regCommonObjStoreFlags(cmd, "")
objStoreConfig := regCommonObjStoreFlags(cmd, "", true)

syncDelay := modelDuration(cmd.Flag("sync-delay", "Minimum age of fresh (non-compacted) blocks before they are being processed.").
Default("30m"))
Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/downsample.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func registerDownsample(m map[string]setupFunc, app *kingpin.Application, name s
dataDir := cmd.Flag("data-dir", "Data directory in which to cache blocks and process downsamplings.").
Default("./data").String()

objStoreConfig := regCommonObjStoreFlags(cmd, "")
objStoreConfig := regCommonObjStoreFlags(cmd, "", true)

m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error {
return runDownsample(g, logger, reg, *dataDir, objStoreConfig, name)
Expand Down
26 changes: 18 additions & 8 deletions cmd/thanos/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,21 +134,23 @@ func modelDuration(flags *kingpin.FlagClause) *model.Duration {
}

type pathOrContent struct {
name string
fileFlagName string
contentFlagName string

path *string
content *string
required bool
path *string
content *string
}

func (p *pathOrContent) Content() ([]byte, error) {
if len(*p.path) > 0 && len(*p.content) > 0 {
return nil, errors.Errorf("Both file and content are set for %s", p.name)
return nil, errors.Errorf("Both %s and %s flags set.", p.fileFlagName, p.contentFlagName)
}

if len(*p.path) > 0 {
c, err := ioutil.ReadFile(*p.path)
if err != nil {
return nil, errors.Wrap(err, fmt.Sprintf("loading YAML file %s for %s", *p.path, p.name))
return nil, errors.Wrapf(err, "loading YAML file %s for %s", *p.path, p.fileFlagName)
}
return c, nil
}
Expand All @@ -157,19 +159,27 @@ func (p *pathOrContent) Content() ([]byte, error) {
return []byte(*p.content), nil
}

if p.required {
return nil, errors.Errorf("flag %s or %s is required for running this command", p.fileFlagName, p.contentFlagName)
}

return nil, nil
}

func regCommonObjStoreFlags(cmd *kingpin.CmdClause, suffix string) *pathOrContent {
func regCommonObjStoreFlags(cmd *kingpin.CmdClause, suffix string, required bool) *pathOrContent {
fileFlagName := fmt.Sprintf("objstore%s.config-file", suffix)
contentFlagName := fmt.Sprintf("objstore%s.config", suffix)

bucketConfFile := cmd.Flag(fileFlagName, fmt.Sprintf("Path to YAML file that contains object store%s configuration.", suffix)).
PlaceHolder("<bucket.config-yaml-path>").String()

bucketConf := cmd.Flag(fmt.Sprintf("objstore%s.config", suffix), fmt.Sprintf("Alternative to '%s' flag. Object store%s configuration in YAML.", fileFlagName, suffix)).
bucketConf := cmd.Flag(contentFlagName, fmt.Sprintf("Alternative to '%s' flag. Object store%s configuration in YAML.", fileFlagName, suffix)).
PlaceHolder("<bucket.config-yaml>").String()

return &pathOrContent{
name: fmt.Sprintf("objstore%s.config", suffix),
fileFlagName: fileFlagName,
contentFlagName: contentFlagName,
required: required,

path: bucketConfFile,
content: bucketConf,
Expand Down
20 changes: 10 additions & 10 deletions cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func registerRule(m map[string]setupFunc, app *kingpin.Application, name string)
alertExcludeLabels := cmd.Flag("alert.label-drop", "Labels by name to drop before sending to alertmanager. This allows alert to be deduplicated on replica label (repeated). Similar Prometheus alert relabelling").
Strings()

objStoreConfig := regCommonObjStoreFlags(cmd, "")
objStoreConfig := regCommonObjStoreFlags(cmd, "", false)

queries := cmd.Flag("query", "Addresses of statically configured query API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect query API servers through respective DNS lookups.").
PlaceHolder("<query>").Strings()
Expand Down Expand Up @@ -553,25 +553,25 @@ func runRule(
})
}

var uploads = true

bucketConfig, err := objStoreConfig.Content()
if err != nil {
return err
}
// The background shipper continuously scans the data directory and uploads
// new blocks to Google Cloud Storage or an S3-compatible storage service.
bkt, err := client.NewBucket(logger, bucketConfig, reg, component)
if err != nil && err != client.ErrNotFound {
return err
}

if err == client.ErrNotFound {
var uploads = true
if len(bucketConfig) == 0 {
level.Info(logger).Log("msg", "No supported bucket was configured, uploads will be disabled")
uploads = false
}

if uploads {
// The background shipper continuously scans the data directory and uploads
// new blocks to Google Cloud Storage or an S3-compatible storage service.
bkt, err := client.NewBucket(logger, bucketConfig, reg, component)
if err != nil {
return err
}

// Ensure we close up everything properly.
defer func() {
if err != nil {
Expand Down
41 changes: 30 additions & 11 deletions cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ import (
"net"
"net/http"
"net/url"
"os"
"path"
"path/filepath"
"sync"
"time"

Expand Down Expand Up @@ -51,7 +53,7 @@ func registerSidecar(m map[string]setupFunc, app *kingpin.Application, name stri

reloaderRuleDirs := cmd.Flag("reloader.rule-dir", "Rule directories for the reloader to refresh (repeated field).").Strings()

objStoreConfig := regCommonObjStoreFlags(cmd, "")
objStoreConfig := regCommonObjStoreFlags(cmd, "", false)

m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error {
rl := reloader.New(
Expand Down Expand Up @@ -225,33 +227,36 @@ func runSidecar(
})
}

var uploads = true

bucketConfig, err := objStoreConfig.Content()
if err != nil {
return err
}

// The background shipper continuously scans the data directory and uploads
// new blocks to Google Cloud Storage or an S3-compatible storage service.
bkt, err := client.NewBucket(logger, bucketConfig, reg, component)
if err != nil && err != client.ErrNotFound {
return err
}

if err == client.ErrNotFound {
var uploads = true
if len(bucketConfig) == 0 {
level.Info(logger).Log("msg", "No supported bucket was configured, uploads will be disabled")
uploads = false
}

if uploads {
// The background shipper continuously scans the data directory and uploads
// new blocks to Google Cloud Storage or an S3-compatible storage service.
bkt, err := client.NewBucket(logger, bucketConfig, reg, component)
if err != nil {
return err
}

// Ensure we close up everything properly.
defer func() {
if err != nil {
runutil.CloseWithLogOnErr(logger, bkt, "bucket client")
}
}()

if err := isPrometheusDirAccesible(dataDir); err != nil {
level.Error(logger).Log("err", err)
}

s := shipper.New(logger, nil, dataDir, bkt, metadata.Labels, block.SidecarSource)
ctx, cancel := context.WithCancel(context.Background())

Expand Down Expand Up @@ -281,6 +286,20 @@ func runSidecar(
return nil
}

func isPrometheusDirAccesible(dir string) error {
const errMsg = "WAL file is not accessible. Does block shipper dir is a TSDB directory? If yes it is shared with TSDB?"
f, err := os.Stat(filepath.Join(dir, "WAL"))
if err != nil {
return errors.Wrap(err, errMsg)
}

if f.IsDir() {
return errors.New(errMsg)
}

return nil
}

type metadata struct {
promURL *url.URL

Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application, name string
chunkPoolSize := cmd.Flag("chunk-pool-size", "Maximum size of concurrently allocatable bytes for chunks.").
Default("2GB").Bytes()

objStoreConfig := regCommonObjStoreFlags(cmd, "")
objStoreConfig := regCommonObjStoreFlags(cmd, "", true)

syncInterval := cmd.Flag("sync-block-duration", "Repeat interval for syncing the blocks between local and remote view.").
Default("3m").Duration()
Expand Down
Loading

0 comments on commit 1b82733

Please sign in to comment.